This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ff1f66c [GOBBLIN-680] Enhance error handling on task creation
ff1f66c is described below
commit ff1f66c033e6a5c7a8f75551e51633aadc387db9
Author: Lei Sun <[email protected]>
AuthorDate: Fri Feb 8 15:43:22 2019 -0800
[GOBBLIN-680] Enhance error handling on task creation
Closes #2551 from autumnust/MemoryStats
---
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 29 ++++++++-
.../org/apache/gobblin/runtime/task/TaskUtils.java | 1 +
.../apache/gobblin/TaskErrorIntegrationTest.java | 68 +++++++++++++++++++++-
3 files changed, 94 insertions(+), 4 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 24ad335..e800107 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -18,6 +18,10 @@
package org.apache.gobblin.runtime;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -359,7 +363,12 @@ public class GobblinMultiTaskAttempt {
this.taskStateTracker.registerNewTask(task);
task.setTaskFuture(this.taskExecutor.submit(task));
tasks.add(task);
- } catch (Exception e) {
+ } catch (Throwable e) {
+ if (e instanceof OutOfMemoryError) {
+ log.error("Encountering memory error in task creation/execution
stage, please investigate memory usage:", e);
+ printMemoryUsage();
+ }
+
if (task == null) {
// task could not be created, so directly count down
countDownLatch.countDown();
@@ -383,6 +392,24 @@ public class GobblinMultiTaskAttempt {
return tasks;
}
+ private void printMemoryUsage() {
+ MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage heapMemory = memoryBean.getHeapMemoryUsage();
+ MemoryUsage nonHeapMemory = memoryBean.getNonHeapMemoryUsage();
+ String format = "%-15s%-15s%-15s%-15s";
+
+ this.log.info("Heap Memory");
+ this.log.info(String.format(format, "init", "used", "Committed", "max"));
+ this.log.info(String.format(format, heapMemory.getInit(),
heapMemory.getUsed(),
+ heapMemory.getCommitted(), heapMemory.getMax()));
+
+ this.log.info("Non-heap Memory");
+ this.log.info(String.format(format, "init", "used", "Committed", "max"));
+ this.log.info(String.format(format, nonHeapMemory.getInit(),
nonHeapMemory.getUsed(),
+ nonHeapMemory.getCommitted(), nonHeapMemory.getMax()));
+ }
+
+
private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch
countDownLatch) {
Optional<TaskFactory> taskFactoryOpt =
TaskUtils.getTaskFactory(workUnitState);
if (taskFactoryOpt.isPresent()) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
index f5ebd0d..6dfc5df 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.runtime.task;
import com.google.common.base.Optional;
+import java.util.Properties;
import org.apache.gobblin.configuration.State;
diff --git
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
index d99a3a1..9110d20 100644
---
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
+++
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
@@ -24,6 +24,14 @@ import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.NoopPublisher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.task.BaseAbstractTask;
+import org.apache.gobblin.runtime.task.TaskFactory;
+import org.apache.gobblin.runtime.task.TaskIFace;
+import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -72,7 +80,7 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
Properties jobProperties =
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
- jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY,
TestSource.class.getName());
+ jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY,
BaseTestSource.class.getName());
jobProperties.setProperty(TestExtractor.RAISE_ERROR, "true");
GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
@@ -101,7 +109,7 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
Properties jobProperties =
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
- jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY,
TestSource.class.getName());
+ jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY,
BaseTestSource.class.getName());
jobProperties.setProperty(TestExtractor.RAISE_ERROR, "false");
GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
@@ -112,6 +120,24 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
logger.removeAppender(testAppender);
}
+ @Test
+ public void testCustomizedTaskFrameworkFailureInTaskCreation() throws
Exception {
+ TestAppender testAppender = new TestAppender();
+ Logger logger =
LogManager.getLogger(GobblinMultiTaskAttempt.class.getName() + "-noattempt");
+ logger.addAppender(testAppender);
+
+ Properties jobProperties =
+
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
+ jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY,
CustomizedTaskTestSource.class.getName());
+
+ GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+ Assert.assertTrue(testAppender.events.stream().anyMatch(e ->
e.getRenderedMessage()
+ .startsWith("Encountering memory error")));
+
+ logger.removeAppender(testAppender);
+ }
+
+
/**
* Test extractor that can be configured to raise an exception on
construction
*/
@@ -148,9 +174,45 @@ public class TaskErrorIntegrationTest extends BMNGRunner {
}
/**
+ * Testing task and factory implementation for Customized Task
implementation.
+ */
+ public static class TestCustomizedTask extends BaseAbstractTask {
+ public TestCustomizedTask(TaskContext taskContext) {
+ super(taskContext);
+
+ // trigger OutOfMemoryError on purpose during creation phase.
+ throw new OutOfMemoryError();
+ }
+ }
+
+ public static class TestTaskFactory implements TaskFactory {
+
+ @Override
+ public TaskIFace createTask(TaskContext taskContext) {
+ return new TestCustomizedTask(taskContext);
+ }
+
+ @Override
+ public DataPublisher createDataPublisher(JobState.DatasetState
datasetState) {
+ return new NoopPublisher(datasetState);
+ }
+ }
+
+ public static class CustomizedTaskTestSource extends BaseTestSource {
+ @Override
+ public List<WorkUnit> getWorkunits(SourceState state) {
+ WorkUnit workUnit = new WorkUnit();
+ TaskUtils.setTaskFactoryClass(workUnit, TestTaskFactory.class);
+ workUnit.addAll(state);
+ return Collections.singletonList(workUnit);
+ }
+ }
+
+
+ /**
* Test source that creates a {@link TestExtractor}
*/
- public static class TestSource implements Source<Schema, GenericRecord> {
+ public static class BaseTestSource implements Source<Schema, GenericRecord> {
@Override
public List<WorkUnit> getWorkunits(SourceState state) {