This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ff1f66c  [GOBBLIN-680] Enhance error handling on task creation
ff1f66c is described below

commit ff1f66c033e6a5c7a8f75551e51633aadc387db9
Author: Lei Sun <[email protected]>
AuthorDate: Fri Feb 8 15:43:22 2019 -0800

    [GOBBLIN-680] Enhance error handling on task creation
    
    Closes #2551 from autumnust/MemoryStats
---
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   | 29 ++++++++-
 .../org/apache/gobblin/runtime/task/TaskUtils.java |  1 +
 .../apache/gobblin/TaskErrorIntegrationTest.java   | 68 +++++++++++++++++++++-
 3 files changed, 94 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 24ad335..e800107 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -18,6 +18,10 @@
 package org.apache.gobblin.runtime;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -359,7 +363,12 @@ public class GobblinMultiTaskAttempt {
         this.taskStateTracker.registerNewTask(task);
         task.setTaskFuture(this.taskExecutor.submit(task));
         tasks.add(task);
-      } catch (Exception e) {
+      } catch (Throwable e) {
+        if (e instanceof OutOfMemoryError) {
+          log.error("Encountering memory error in task creation/execution 
stage, please investigate memory usage:", e);
+          printMemoryUsage();
+        }
+
         if (task == null) {
           // task could not be created, so directly count down
           countDownLatch.countDown();
@@ -383,6 +392,24 @@ public class GobblinMultiTaskAttempt {
     return tasks;
   }
 
+  private void printMemoryUsage() {
+    MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage heapMemory = memoryBean.getHeapMemoryUsage();
+    MemoryUsage nonHeapMemory = memoryBean.getNonHeapMemoryUsage();
+    String format = "%-15s%-15s%-15s%-15s";
+
+    this.log.info("Heap Memory");
+    this.log.info(String.format(format, "init", "used", "Committed", "max"));
+    this.log.info(String.format(format, heapMemory.getInit(), 
heapMemory.getUsed(),
+        heapMemory.getCommitted(), heapMemory.getMax()));
+
+    this.log.info("Non-heap Memory");
+    this.log.info(String.format(format, "init", "used", "Committed", "max"));
+    this.log.info(String.format(format, nonHeapMemory.getInit(), 
nonHeapMemory.getUsed(),
+        nonHeapMemory.getCommitted(), nonHeapMemory.getMax()));
+  }
+
+
   private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch 
countDownLatch) {
     Optional<TaskFactory> taskFactoryOpt = 
TaskUtils.getTaskFactory(workUnitState);
     if (taskFactoryOpt.isPresent()) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
index f5ebd0d..6dfc5df 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.runtime.task;
 
 import com.google.common.base.Optional;
+import java.util.Properties;
 import org.apache.gobblin.configuration.State;
 
 
diff --git 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
index d99a3a1..9110d20 100644
--- 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
+++ 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
@@ -24,6 +24,14 @@ import java.util.Properties;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.NoopPublisher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.task.BaseAbstractTask;
+import org.apache.gobblin.runtime.task.TaskFactory;
+import org.apache.gobblin.runtime.task.TaskIFace;
+import org.apache.gobblin.runtime.task.TaskUtils;
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -72,7 +80,7 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
     Properties jobProperties =
         
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
 
-    jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
TestSource.class.getName());
+    jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
BaseTestSource.class.getName());
     jobProperties.setProperty(TestExtractor.RAISE_ERROR, "true");
 
     GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
@@ -101,7 +109,7 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
     Properties jobProperties =
         
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
 
-    jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
TestSource.class.getName());
+    jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
BaseTestSource.class.getName());
     jobProperties.setProperty(TestExtractor.RAISE_ERROR, "false");
 
     GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
@@ -112,6 +120,24 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
     logger.removeAppender(testAppender);
   }
 
+  @Test
+  public void testCustomizedTaskFrameworkFailureInTaskCreation() throws 
Exception {
+    TestAppender testAppender = new TestAppender();
+    Logger logger = 
LogManager.getLogger(GobblinMultiTaskAttempt.class.getName() + "-noattempt");
+    logger.addAppender(testAppender);
+
+    Properties jobProperties =
+        
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
+    jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
CustomizedTaskTestSource.class.getName());
+
+    GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+    Assert.assertTrue(testAppender.events.stream().anyMatch(e -> 
e.getRenderedMessage()
+        .startsWith("Encountering memory error")));
+
+    logger.removeAppender(testAppender);
+  }
+
+
   /**
    * Test extractor that can be configured to raise an exception on 
construction
    */
@@ -148,9 +174,45 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
   }
 
   /**
+   * Testing task and factory implementation for Customized Task 
implementation.
+   */
+  public static class TestCustomizedTask extends BaseAbstractTask {
+    public TestCustomizedTask(TaskContext taskContext) {
+      super(taskContext);
+
+      // trigger OutOfMemoryError on purpose during creation phase.
+      throw new OutOfMemoryError();
+    }
+  }
+
+  public static class TestTaskFactory implements TaskFactory {
+
+    @Override
+    public TaskIFace createTask(TaskContext taskContext) {
+      return new TestCustomizedTask(taskContext);
+    }
+
+    @Override
+    public DataPublisher createDataPublisher(JobState.DatasetState 
datasetState) {
+      return new NoopPublisher(datasetState);
+    }
+  }
+
+  public static class CustomizedTaskTestSource extends BaseTestSource {
+    @Override
+    public List<WorkUnit> getWorkunits(SourceState state) {
+      WorkUnit workUnit = new WorkUnit();
+      TaskUtils.setTaskFactoryClass(workUnit, TestTaskFactory.class);
+      workUnit.addAll(state);
+      return Collections.singletonList(workUnit);
+    }
+  }
+
+
+  /**
    * Test source that creates a {@link TestExtractor}
    */
-  public static class TestSource implements Source<Schema, GenericRecord> {
+  public static class BaseTestSource implements Source<Schema, GenericRecord> {
 
     @Override
     public List<WorkUnit> getWorkunits(SourceState state) {

Reply via email to