This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c0041a5 [GOBBLIN-1178] Propagation of exception from task creation to
helix and request for new container
c0041a5 is described below
commit c0041a56238695da8411033bd43b0389b18e4a89
Author: Lei Sun <[email protected]>
AuthorDate: Thu Jun 4 12:05:11 2020 -0700
[GOBBLIN-1178] Propagation of exception from task creation to helix and
request for new container
Closes #3024 from autumnust/HelixTaskRetry
---
.../apache/gobblin/cluster/GobblinHelixTask.java | 29 ++++++++
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 65 +++++++++++-------
.../gobblin/runtime/TaskCreationException.java | 31 +++++++++
.../runtime/GobblinMultiTaskAttemptTest.java | 79 ++++++++++++++++++++++
.../apache/gobblin/TaskErrorIntegrationTest.java | 20 ++++--
.../gobblin/broker/SharedResourcesBrokerImpl.java | 2 +-
.../event/ContainerHealthCheckFailureEvent.java | 1 +
7 files changed, 197 insertions(+), 30 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 50dc5b3..f53282f 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -17,15 +17,20 @@
package org.apache.gobblin.cluster;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.TaskCreationException;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
import org.apache.gobblin.util.retry.RetryerFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,8 +44,10 @@ import org.slf4j.MDC;
import com.github.rholder.retry.Retryer;
import com.google.common.base.Throwables;
+import com.google.common.eventbus.EventBus;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
@@ -77,6 +84,7 @@ public class GobblinHelixTask implements Task {
private GobblinHelixTaskMetrics taskMetrics;
private SingleTask task;
private String helixTaskId;
+ private EventBus eventBus;
public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
TaskCallbackContext taskCallbackContext,
@@ -116,6 +124,9 @@ public class GobblinHelixTask implements Task {
Retryer<SingleTask> retryer =
RetryerFactory.newInstance(builder.getConfig());
try {
+ eventBus =
EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
this.task = retryer.call(new Callable<SingleTask>() {
@Override
public SingleTask call()
@@ -158,6 +169,12 @@ public class GobblinHelixTask implements Task {
log.error("Actual task {} interrupted.", this.taskId);
this.taskMetrics.helixTaskTotalFailed.incrementAndGet();
return new TaskResult(TaskResult.Status.CANCELED, "");
+ } catch (TaskCreationException te) {
+ eventBus.post(createTaskCreationEvent());
+ log.error("Actual task {} failed in creation due to {}, will request new
container to schedule it",
+ this.taskId, te.getMessage());
+ this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
+ return new TaskResult(TaskResult.Status.FAILED,
Throwables.getStackTraceAsString(te));
} catch (Throwable t) {
log.error("Actual task {} failed due to {}", this.taskId,
t.getMessage());
this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
@@ -168,6 +185,18 @@ public class GobblinHelixTask implements Task {
}
}
+ private ContainerHealthCheckFailureEvent createTaskCreationEvent() {
+ ContainerHealthCheckFailureEvent event = new
ContainerHealthCheckFailureEvent(
+ ConfigFactory.parseMap(this.taskConfig.getConfigMap()) ,
getClass().getName());
+ event.addMetadata("jobName", this.jobName);
+ event.addMetadata("AppName", this.applicationName);
+ event.addMetadata("InstanceName", this.instanceName);
+ event.addMetadata("helixJobId", this.helixJobId);
+ event.addMetadata("helixTaskId", this.helixTaskId);
+ event.addMetadata("WUPath", this.workUnitFilePath.toString());
+ return event;
+ }
+
private Integer getPartitionForHelixTask(TaskDriver taskDriver) {
//Get Helix partition id for this task
JobContext jobContext = taskDriver.getJobContext(this.helixJobId);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index eded48c..284413b 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -24,6 +24,7 @@ import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -31,6 +32,22 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
+import org.apache.commons.math3.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import lombok.Setter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
@@ -56,21 +73,6 @@ import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.TaskEventMetadataUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.gobblin.util.retry.RetryerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.rholder.retry.RetryException;
-import com.github.rholder.retry.Retryer;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
-
-import javax.annotation.Nullable;
-import lombok.Setter;
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIME_OUT_MS;
@@ -152,7 +154,14 @@ public class GobblinMultiTaskAttempt {
}
CountUpAndDownLatch countDownLatch = new CountUpAndDownLatch(0);
- this.tasks = runWorkUnits(countDownLatch);
+ Pair<List<Task>, Boolean> executionResult = runWorkUnits(countDownLatch);
+ this.tasks = executionResult.getFirst();
+
+ // Indicating task creation failure, propagating exception as it should be
noticeable to job launcher
+ if (!executionResult.getSecond()) {
+ throw new TaskCreationException("Failing in creating task before
execution.");
+ }
+
log.info("Waiting for submitted tasks of job {} to complete in container
{}...", jobId, containerIdOptional.or(""));
try {
while (countDownLatch.getCount() > 0) {
@@ -370,11 +379,13 @@ public class GobblinMultiTaskAttempt {
* </p>
*
* @param countDownLatch a {@link java.util.concurrent.CountDownLatch}
waited on for job completion
- * @return a list of {@link Task}s from the {@link WorkUnit}s
+ * @return a list of {@link Task}s from the {@link WorkUnit}s, as well as if
there's a failure in task creation
+ * which should be handled separately to avoid silently starving on certain
workunit.
*/
- private List<Task> runWorkUnits(CountUpAndDownLatch countDownLatch) {
+ private Pair<List<Task>, Boolean> runWorkUnits(CountUpAndDownLatch
countDownLatch) {
List<Task> tasks = Lists.newArrayList();
+ boolean isTaskCreatedSuccessfully = true;
while (this.workUnits.hasNext()) {
WorkUnit workUnit = this.workUnits.next();
String taskId = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY);
@@ -413,6 +424,10 @@ public class GobblinMultiTaskAttempt {
}
if (task == null) {
+ if (e instanceof RetryException) {
+ // Indicating task being null due to failure in creation even
after retrying.
+ isTaskCreatedSuccessfully = false;
+ }
// task could not be created, so directly count down
countDownLatch.countDown();
log.error("Could not create task for workunit {}", workUnit, e);
@@ -434,7 +449,7 @@ public class GobblinMultiTaskAttempt {
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState,
JobEvent.TASKS_SUBMITTED));
eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED,
"tasksCount", Long.toString(countDownLatch.getRegisteredParties()));
- return tasks;
+ return new Pair<>(tasks, isTaskCreatedSuccessfully);
}
private void printMemoryUsage() {
@@ -469,10 +484,13 @@ public class GobblinMultiTaskAttempt {
* As the initialization of {@link Task} could have unstable external
connection which could be healed through
* retry, adding retry-wrapper here for the sake of fault-tolerance.
*/
- private Task createTaskWithRetry(WorkUnitState workUnitState, CountDownLatch
countDownLatch) {
+ @VisibleForTesting
+ Task createTaskWithRetry(WorkUnitState workUnitState, CountDownLatch
countDownLatch) throws RetryException {
+ Properties defaultRetryConfig = new Properties();
+ defaultRetryConfig.setProperty(RETRY_TIME_OUT_MS,
TimeUnit.MINUTES.toMillis(1L) + "");
+ defaultRetryConfig.setProperty(RETRY_INTERVAL_MS,
TimeUnit.SECONDS.toMillis(2L) + "");
Config config =
ConfigUtils.propertiesToConfig(this.jobState.getProperties())
- .withValue(RETRY_TIME_OUT_MS,
ConfigValueFactory.fromAnyRef(TimeUnit.MINUTES.toMillis(1L)))
- .withValue(RETRY_INTERVAL_MS,
ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.toMillis(2L)));
+ .withFallback(ConfigUtils.propertiesToConfig(defaultRetryConfig));
Retryer<Task> retryer = RetryerFactory.newInstance(config);
// An "effectively final" variable for counting how many retried has been
done, mostly for logging purpose.
final AtomicInteger counter = new AtomicInteger(0);
@@ -487,9 +505,6 @@ public class GobblinMultiTaskAttempt {
return createTaskRunnable(workUnitState, countDownLatch);
}
});
- } catch (RetryException re) {
- log.error(String.format("Fatal Exception creating Task after %s
retries", counter));
- throw
Throwables.propagate(re.getLastFailedAttempt().getExceptionCause());
} catch (ExecutionException ee) {
throw new RuntimeException("Failure in executing retryer due to, ", ee);
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskCreationException.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskCreationException.java
new file mode 100644
index 0000000..725b2dd
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskCreationException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.runtime;
+
+import java.io.IOException;
+
+
+/**
+ * Exception thrown while creating task for execution within {@link
GobblinMultiTaskAttempt}.
+ */
+public class TaskCreationException extends IOException {
+ public TaskCreationException(String message) {
+ super(message);
+ }
+}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
new file mode 100644
index 0000000..e40b23d
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.SharedResourcesBrokerImpl;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIME_OUT_MS;
+
+
+public class GobblinMultiTaskAttemptTest {
+ private GobblinMultiTaskAttempt taskAttempt;
+
+ @Test
+ public void testRunWithTaskCreationFailure()
+ throws Exception {
+ // Preparing Instance of TaskAttempt with designed failure on task creation
+ WorkUnit tmpWU = new WorkUnit();
+ // Put necessary attributes in workunit
+ tmpWU.setProp(ConfigurationKeys.TASK_ID_KEY, "task_test");
+ List<WorkUnit> workUnit = ImmutableList.of(tmpWU);
+ JobState jobState = new JobState();
+ // Limit the number of times of retry in task-creation.
+ jobState.setProp(RETRY_TIME_OUT_MS, 1000);
+ TaskStateTracker stateTrackerMock = Mockito.mock(TaskStateTracker.class);
+ TaskExecutor taskExecutorMock = Mockito.mock(TaskExecutor.class);
+ Config config = ConfigFactory.empty();
+ SharedResourcesBrokerImpl<GobblinScopeTypes> topBroker =
SharedResourcesBrokerFactory.createDefaultTopLevelBroker(config,
+ GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+ SharedResourcesBrokerImpl<GobblinScopeTypes> jobBroker =
+ topBroker.newSubscopedBuilder(new JobScopeInstance("testJob",
"job123")).build();
+ taskAttempt =
+ new GobblinMultiTaskAttempt(workUnit.iterator(), "testJob", jobState,
stateTrackerMock, taskExecutorMock,
+ Optional.absent(), Optional.absent(), jobBroker);
+
+ try {
+ // This attempt will automatically fail due to missing required config in
+ // org.apache.gobblin.runtime.TaskContext.getSource
+ taskAttempt.run();
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof IOException);
+ return;
+ }
+
+ // Should never reach here.
+ Assert.fail();
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
index 473719f..77a3c92 100644
---
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
+++
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
@@ -89,7 +89,11 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
// Disable retry
jobProperties.setProperty(RETRY_TIMES, "1");
- GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+ try {
+ GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+ } catch (Exception e){
+ // Expecting to get exception, do nothing
+ }
Assert.assertTrue(testAppender.events.stream().anyMatch(e ->
e.getRenderedMessage()
.startsWith("Could not create task for workunit")));
@@ -143,7 +147,11 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY,
BaseTestSource.class.getName());
jobProperties.setProperty(TestExtractor.RAISE_ERROR, "false");
- GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+ try {
+ GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+ } catch (Exception e){
+ // Expect to get exception, do nothing
+ }
Assert.assertTrue(testAppender.events.stream().anyMatch(e ->
e.getRenderedMessage()
.startsWith("Could not submit task for workunit")));
@@ -164,8 +172,12 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
jobProperties.setProperty(RETRY_TIMES, "1");
jobProperties.setProperty(RETRY_TYPE,
RetryerFactory.RetryType.FIXED_ATTEMPT.name());
- GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
- Assert.assertTrue(testAppender.events.stream().anyMatch(e ->
e.getRenderedMessage().contains("Encountering memory error")));
+ try {
+ GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+ } catch (Throwable t){
+ // Expect to get exception, do nothing
+ }
+ Assert.assertTrue(testAppender.events.stream().anyMatch(e ->
e.getRenderedMessage().contains("Could not create task for workunit")));
logger.removeAppender(testAppender);
}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
index ff066f4..95502f9 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
@@ -260,7 +260,7 @@ public class SharedResourcesBrokerImpl<S extends
ScopeType<S>> implements Shared
parentScopes.add(createWrappedScope(tpe.defaultScopeInstance(),
scopeMap, mainScopeType));
} else {
throw new IllegalArgumentException(String.format(
- "Scope %s is an ancestor of %s, however it does not have a
default id and is not provided as an acestor scope.",
+ "Scope %s is an ancestor of %s, however it does not have a
default id and is not provided as an ancestor scope.",
tpe, mainScopeType));
}
}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
index e10e675..8a9dd8b 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
@@ -32,6 +32,7 @@ import lombok.Getter;
public class ContainerHealthCheckFailureEvent {
public static final String CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME =
"ContainerHealthCheckEventBus";
+ // Context of emission of this event, like the task's state.
@Getter
private final Config config;