This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c0041a5  [GOBBLIN-1178] Propagation of exception from task creation to 
helix and request for new container
c0041a5 is described below

commit c0041a56238695da8411033bd43b0389b18e4a89
Author: Lei Sun <[email protected]>
AuthorDate: Thu Jun 4 12:05:11 2020 -0700

    [GOBBLIN-1178] Propagation of exception from task creation to helix and 
request for new container
    
    Closes #3024 from autumnust/HelixTaskRetry
---
 .../apache/gobblin/cluster/GobblinHelixTask.java   | 29 ++++++++
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   | 65 +++++++++++-------
 .../gobblin/runtime/TaskCreationException.java     | 31 +++++++++
 .../runtime/GobblinMultiTaskAttemptTest.java       | 79 ++++++++++++++++++++++
 .../apache/gobblin/TaskErrorIntegrationTest.java   | 20 ++++--
 .../gobblin/broker/SharedResourcesBrokerImpl.java  |  2 +-
 .../event/ContainerHealthCheckFailureEvent.java    |  1 +
 7 files changed, 197 insertions(+), 30 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 50dc5b3..f53282f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -17,15 +17,20 @@
 
 package org.apache.gobblin.cluster;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.TaskCreationException;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
 import org.apache.gobblin.util.retry.RetryerFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,8 +44,10 @@ import org.slf4j.MDC;
 
 import com.github.rholder.retry.Retryer;
 import com.google.common.base.Throwables;
+import com.google.common.eventbus.EventBus;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import lombok.extern.slf4j.Slf4j;
@@ -77,6 +84,7 @@ public class GobblinHelixTask implements Task {
   private GobblinHelixTaskMetrics taskMetrics;
   private SingleTask task;
   private String helixTaskId;
+  private EventBus eventBus;
 
   public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
                           TaskCallbackContext taskCallbackContext,
@@ -116,6 +124,9 @@ public class GobblinHelixTask implements Task {
     Retryer<SingleTask> retryer = 
RetryerFactory.newInstance(builder.getConfig());
 
     try {
+      eventBus = 
EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+          SharedResourcesBrokerFactory.getImplicitBroker());
+
       this.task = retryer.call(new Callable<SingleTask>() {
         @Override
         public SingleTask call()
@@ -158,6 +169,12 @@ public class GobblinHelixTask implements Task {
       log.error("Actual task {} interrupted.", this.taskId);
       this.taskMetrics.helixTaskTotalFailed.incrementAndGet();
       return new TaskResult(TaskResult.Status.CANCELED, "");
+    } catch (TaskCreationException te) {
+      eventBus.post(createTaskCreationEvent());
+      log.error("Actual task {} failed in creation due to {}, will request new 
container to schedule it",
+          this.taskId, te.getMessage());
+      this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
+      return new TaskResult(TaskResult.Status.FAILED, 
Throwables.getStackTraceAsString(te));
     } catch (Throwable t) {
       log.error("Actual task {} failed due to {}", this.taskId, 
t.getMessage());
       this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
@@ -168,6 +185,18 @@ public class GobblinHelixTask implements Task {
     }
   }
 
+  private ContainerHealthCheckFailureEvent createTaskCreationEvent() {
+    ContainerHealthCheckFailureEvent event = new 
ContainerHealthCheckFailureEvent(
+        ConfigFactory.parseMap(this.taskConfig.getConfigMap()) , 
getClass().getName());
+    event.addMetadata("jobName", this.jobName);
+    event.addMetadata("AppName", this.applicationName);
+    event.addMetadata("InstanceName", this.instanceName);
+    event.addMetadata("helixJobId", this.helixJobId);
+    event.addMetadata("helixTaskId", this.helixTaskId);
+    event.addMetadata("WUPath", this.workUnitFilePath.toString());
+    return event;
+  }
+
   private Integer getPartitionForHelixTask(TaskDriver taskDriver) {
     //Get Helix partition id for this task
     JobContext jobContext = taskDriver.getJobContext(this.helixJobId);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index eded48c..284413b 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -24,6 +24,7 @@ import java.lang.management.MemoryUsage;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -31,6 +32,22 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
+import org.apache.commons.math3.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import lombok.Setter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
@@ -56,21 +73,6 @@ import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.TaskEventMetadataUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 import org.apache.gobblin.util.retry.RetryerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.rholder.retry.RetryException;
-import com.github.rholder.retry.Retryer;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
-
-import javax.annotation.Nullable;
-import lombok.Setter;
 
 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIME_OUT_MS;
@@ -152,7 +154,14 @@ public class GobblinMultiTaskAttempt {
     }
 
     CountUpAndDownLatch countDownLatch = new CountUpAndDownLatch(0);
-    this.tasks = runWorkUnits(countDownLatch);
+    Pair<List<Task>, Boolean> executionResult = runWorkUnits(countDownLatch);
+    this.tasks = executionResult.getFirst();
+
+    // Indicating task creation failure, propagating exception as it should be 
noticeable to job launcher
+    if (!executionResult.getSecond()) {
+      throw new TaskCreationException("Failing in creating task before 
execution.");
+    }
+
     log.info("Waiting for submitted tasks of job {} to complete in container 
{}...", jobId, containerIdOptional.or(""));
     try {
       while (countDownLatch.getCount() > 0) {
@@ -370,11 +379,13 @@ public class GobblinMultiTaskAttempt {
    * </p>
    *
    * @param countDownLatch a {@link java.util.concurrent.CountDownLatch} 
waited on for job completion
-   * @return a list of {@link Task}s from the {@link WorkUnit}s
+   * @return a list of {@link Task}s from the {@link WorkUnit}s, as well as if 
there's a failure in task creation
+   * which should be handled separately to avoid silently starving on certain 
workunit.
    */
-  private List<Task> runWorkUnits(CountUpAndDownLatch countDownLatch) {
+  private Pair<List<Task>, Boolean> runWorkUnits(CountUpAndDownLatch 
countDownLatch) {
 
     List<Task> tasks = Lists.newArrayList();
+    boolean isTaskCreatedSuccessfully = true;
     while (this.workUnits.hasNext()) {
       WorkUnit workUnit = this.workUnits.next();
       String taskId = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY);
@@ -413,6 +424,10 @@ public class GobblinMultiTaskAttempt {
         }
 
         if (task == null) {
+          if (e instanceof RetryException) {
+            // Indicating task being null due to failure in creation even 
after retrying.
+            isTaskCreatedSuccessfully = false;
+          }
           // task could not be created, so directly count down
           countDownLatch.countDown();
           log.error("Could not create task for workunit {}", workUnit, e);
@@ -434,7 +449,7 @@ public class GobblinMultiTaskAttempt {
     
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState,
 JobEvent.TASKS_SUBMITTED));
     eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED, 
"tasksCount", Long.toString(countDownLatch.getRegisteredParties()));
 
-    return tasks;
+    return new Pair<>(tasks, isTaskCreatedSuccessfully);
   }
 
   private void printMemoryUsage() {
@@ -469,10 +484,13 @@ public class GobblinMultiTaskAttempt {
    * As the initialization of {@link Task} could have unstable external 
connection which could be healed through
    * retry, adding retry-wrapper here for the sake of fault-tolerance.
    */
-  private Task createTaskWithRetry(WorkUnitState workUnitState, CountDownLatch 
countDownLatch) {
+  @VisibleForTesting
+  Task createTaskWithRetry(WorkUnitState workUnitState, CountDownLatch 
countDownLatch) throws RetryException {
+    Properties defaultRetryConfig = new Properties();
+    defaultRetryConfig.setProperty(RETRY_TIME_OUT_MS, 
TimeUnit.MINUTES.toMillis(1L) + "");
+    defaultRetryConfig.setProperty(RETRY_INTERVAL_MS, 
TimeUnit.SECONDS.toMillis(2L) + "");
     Config config = 
ConfigUtils.propertiesToConfig(this.jobState.getProperties())
-        .withValue(RETRY_TIME_OUT_MS, 
ConfigValueFactory.fromAnyRef(TimeUnit.MINUTES.toMillis(1L)))
-        .withValue(RETRY_INTERVAL_MS, 
ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.toMillis(2L)));
+        .withFallback(ConfigUtils.propertiesToConfig(defaultRetryConfig));
     Retryer<Task> retryer = RetryerFactory.newInstance(config);
     // An "effectively final" variable for counting how many retried has been 
done, mostly for logging purpose.
     final AtomicInteger counter = new AtomicInteger(0);
@@ -487,9 +505,6 @@ public class GobblinMultiTaskAttempt {
           return createTaskRunnable(workUnitState, countDownLatch);
         }
       });
-    } catch (RetryException re) {
-      log.error(String.format("Fatal Exception creating Task after %s 
retries", counter));
-      throw 
Throwables.propagate(re.getLastFailedAttempt().getExceptionCause());
     } catch (ExecutionException ee) {
       throw new RuntimeException("Failure in executing retryer due to, ", ee);
     }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskCreationException.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskCreationException.java
new file mode 100644
index 0000000..725b2dd
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskCreationException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.runtime;
+
+import java.io.IOException;
+
+
+/**
+ * Exception thrown while creating task for execution within {@link 
GobblinMultiTaskAttempt}.
+ */
+public class TaskCreationException extends IOException {
+  public TaskCreationException(String message) {
+    super(message);
+  }
+}
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
new file mode 100644
index 0000000..e40b23d
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.SharedResourcesBrokerImpl;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIME_OUT_MS;
+
+
+public class GobblinMultiTaskAttemptTest {
+  private GobblinMultiTaskAttempt taskAttempt;
+
+  @Test
+  public void testRunWithTaskCreationFailure()
+      throws Exception {
+    // Preparing Instance of TaskAttempt with designed failure on task creation
+    WorkUnit tmpWU = new WorkUnit();
+    // Put necessary attributes in workunit
+    tmpWU.setProp(ConfigurationKeys.TASK_ID_KEY, "task_test");
+    List<WorkUnit> workUnit = ImmutableList.of(tmpWU);
+    JobState jobState = new JobState();
+    // Limit the number of times of retry in task-creation.
+    jobState.setProp(RETRY_TIME_OUT_MS, 1000);
+    TaskStateTracker stateTrackerMock = Mockito.mock(TaskStateTracker.class);
+    TaskExecutor taskExecutorMock = Mockito.mock(TaskExecutor.class);
+    Config config = ConfigFactory.empty();
+    SharedResourcesBrokerImpl<GobblinScopeTypes> topBroker = 
SharedResourcesBrokerFactory.createDefaultTopLevelBroker(config,
+        GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+    SharedResourcesBrokerImpl<GobblinScopeTypes> jobBroker =
+        topBroker.newSubscopedBuilder(new JobScopeInstance("testJob", 
"job123")).build();
+    taskAttempt =
+        new GobblinMultiTaskAttempt(workUnit.iterator(), "testJob", jobState, 
stateTrackerMock, taskExecutorMock,
+            Optional.absent(), Optional.absent(), jobBroker);
+
+    try {
+      // This attempt will automatically fail due to missing required config in
+      // org.apache.gobblin.runtime.TaskContext.getSource
+      taskAttempt.run();
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof IOException);
+      return;
+    }
+
+    // Should never reach here.
+    Assert.fail();
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
index 473719f..77a3c92 100644
--- 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
+++ 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
@@ -89,7 +89,11 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
     // Disable retry
     jobProperties.setProperty(RETRY_TIMES, "1");
 
-    GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+    try {
+      GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+    } catch (Exception e){
+      // Expecting to get exception, do nothing
+    }
 
     Assert.assertTrue(testAppender.events.stream().anyMatch(e -> 
e.getRenderedMessage()
         .startsWith("Could not create task for workunit")));
@@ -143,7 +147,11 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
     jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
BaseTestSource.class.getName());
     jobProperties.setProperty(TestExtractor.RAISE_ERROR, "false");
 
-    GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+    try {
+      GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+    } catch (Exception e){
+      // Expect to get exception, do nothing
+    }
 
     Assert.assertTrue(testAppender.events.stream().anyMatch(e -> 
e.getRenderedMessage()
         .startsWith("Could not submit task for workunit")));
@@ -164,8 +172,12 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
     jobProperties.setProperty(RETRY_TIMES, "1");
     jobProperties.setProperty(RETRY_TYPE, 
RetryerFactory.RetryType.FIXED_ATTEMPT.name());
 
-    GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
-    Assert.assertTrue(testAppender.events.stream().anyMatch(e -> 
e.getRenderedMessage().contains("Encountering memory error")));
+    try {
+      GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+    } catch (Throwable t){
+      // Expect to get exception, do nothing
+    }
+    Assert.assertTrue(testAppender.events.stream().anyMatch(e -> 
e.getRenderedMessage().contains("Could not create task for workunit")));
     logger.removeAppender(testAppender);
   }
 
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
index ff066f4..95502f9 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
@@ -260,7 +260,7 @@ public class SharedResourcesBrokerImpl<S extends 
ScopeType<S>> implements Shared
             parentScopes.add(createWrappedScope(tpe.defaultScopeInstance(), 
scopeMap, mainScopeType));
           } else {
             throw new IllegalArgumentException(String.format(
-                "Scope %s is an ancestor of %s, however it does not have a 
default id and is not provided as an acestor scope.",
+                "Scope %s is an ancestor of %s, however it does not have a 
default id and is not provided as an ancestor scope.",
                 tpe, mainScopeType));
           }
         }
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
index e10e675..8a9dd8b 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java
@@ -32,6 +32,7 @@ import lombok.Getter;
 public class ContainerHealthCheckFailureEvent {
   public static final String CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME = 
"ContainerHealthCheckEventBus";
 
+  // Context of emission of this event, like the task's state.
   @Getter
   private final Config config;
 

Reply via email to