This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 166eb72  [GOBBLIN-1042] Fix ForkMetric incorrect return type of parent 
metric object and relevant unit tests
166eb72 is described below

commit 166eb725c01c279068aaefa2c9af93c907bd9eff
Author: autumnust <[email protected]>
AuthorDate: Thu Feb 13 15:54:38 2020 -0800

    [GOBBLIN-1042] Fix ForkMetric incorrect return type of parent metric object 
and relevant unit tests
    
    Closes #2882 from autumnust/metricClassCast
---
 .../apache/gobblin/cluster/ContainerMetrics.java   |   2 +-
 .../gobblin/cluster/InMemorySingleTaskRunner.java  |  41 ++++++++
 .../cluster/InMemoryWuFailedSingleTask.java        |  61 ++++++++++++
 .../gobblin/cluster/InMemoryWuSingleTask.java      | 106 +++++++++++++++++++++
 .../org/apache/gobblin/cluster/SingleTask.java     |  11 ++-
 .../apache/gobblin/cluster/SingleTaskRunner.java   |  29 ++++--
 .../org/apache/gobblin/cluster/DummySource.java    | 102 ++++++++++++++++++++
 .../SingleTaskRunnerMainArgumentsDataProvider.java |   2 +-
 .../gobblin/cluster/TestSingleTaskRerun.java       |  56 +++++++++++
 .../test/resources/_workunits/store/workunit.wu    |   0
 gobblin-cluster/src/test/resources/clusterConf     |   1 +
 .../org/apache/gobblin/metrics/GobblinMetrics.java |   5 +-
 .../gobblin/metrics/GobblinMetricsRegistry.java    |   2 +-
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   |  13 ++-
 .../apache/gobblin/runtime/util/ForkMetrics.java   |   8 +-
 .../apache/gobblin/runtime/util/JobMetrics.java    |   6 +-
 .../apache/gobblin/runtime/util/TaskMetrics.java   |   4 +-
 17 files changed, 419 insertions(+), 30 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerMetrics.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerMetrics.java
index 62f3652..252d197 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerMetrics.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerMetrics.java
@@ -48,7 +48,7 @@ public class ContainerMetrics extends GobblinMetrics {
    */
   public static ContainerMetrics get(final State containerState, final String 
applicationName,
       final String workerId) {
-    return (ContainerMetrics) 
GOBBLIN_METRICS_REGISTRY.getOrDefault(name(workerId), new 
Callable<GobblinMetrics>() {
+    return (ContainerMetrics) 
GOBBLIN_METRICS_REGISTRY.getOrCreate(name(workerId), new 
Callable<GobblinMetrics>() {
       @Override public GobblinMetrics call() throws Exception {
         return new ContainerMetrics(containerState, applicationName, workerId);
       }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
new file mode 100644
index 0000000..7af586e
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * An taskRunner for in-memory {@link SingleTask} that can be switched to run 
a meant-to-failed task.
+ */
+public class InMemorySingleTaskRunner extends SingleTaskRunner {
+  public InMemorySingleTaskRunner(String clusterConfigFilePath, String jobId, 
String workUnitFilePath) {
+    super(clusterConfigFilePath, jobId, workUnitFilePath);
+  }
+
+  @Override
+  protected SingleTask createTaskAttempt(TaskAttemptBuilder 
taskAttemptBuilder, FileSystem fs, StateStores stateStores,
+      Path jobStateFilePath, boolean fail) {
+    return !fail ? new InMemoryWuSingleTask(this.jobId, new 
Path(this.workUnitFilePath), jobStateFilePath, fs,
+        taskAttemptBuilder, stateStores, 
GobblinClusterUtils.getDynamicConfig(this.clusterConfig))
+        : new InMemoryWuFailedSingleTask(this.jobId, new 
Path(this.workUnitFilePath), jobStateFilePath, fs,
+            taskAttemptBuilder, stateStores, 
GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
+  }
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
new file mode 100644
index 0000000..75cbf61
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+
+/**
+ * Instead of deserializing {@link JobState} and {@link WorkUnit} from 
filesystem, create them in memory.
+ * This derived class will be failing due to missing the declaration of 
writerBuilder class thereby failing a Precondition
+ * check in AvroWriterBuilder which is used by default.
+ */
+public class InMemoryWuFailedSingleTask extends SingleTask {
+  public InMemoryWuFailedSingleTask(String jobId, Path workUnitFilePath, Path 
jobStateFilePath, FileSystem fs,
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config 
dynamicConfig) {
+    super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, 
stateStores, dynamicConfig);
+  }
+
+  @Override
+  protected List<WorkUnit> getWorkUnits()
+      throws IOException {
+    // Create WorkUnit in memory.
+    WorkUnit workUnit = new WorkUnit();
+    workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "randomTask");
+    workUnit.setProp("source.class", "org.apache.gobblin.cluster.DummySource");
+    return Lists.newArrayList(workUnit);
+  }
+
+  @Override
+  protected JobState getJobState()
+      throws IOException {
+    JobState jobState = new JobState("randomJobName", "randomJobId");
+    return jobState;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
new file mode 100644
index 0000000..f60ba9a
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+/**
+ * Instead of deserializing {@link JobState} and {@link WorkUnit} from 
filesystem, create them in memory.
+ * Uses {@link DummyDataWriter} so that the execution of a Task goes through.
+ *
+ * This extension class added a declared dummyWriterBuilder so that the task 
execution will go through.
+ * This class is primarily designed for testing purpose.
+ */
+public class InMemoryWuSingleTask extends SingleTask {
+  public InMemoryWuSingleTask(String jobId, Path workUnitFilePath, Path 
jobStateFilePath, FileSystem fs,
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config 
dynamicConfig) {
+    super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, 
stateStores, dynamicConfig);
+  }
+
+  @Override
+  protected List<WorkUnit> getWorkUnits()
+      throws IOException {
+    WorkUnit workUnit = new WorkUnit();
+    workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "randomTask");
+    workUnit.setProp("source.class", "org.apache.gobblin.cluster.DummySource");
+    // Missing this line leads to failure in precondition check of avro writer.
+    workUnit.setProp(ConfigurationKeys.WRITER_BUILDER_CLASS, 
DummyDataWriterBuilder.class.getName());
+    return Lists.newArrayList(workUnit);
+  }
+
+  @Override
+  protected JobState getJobState()
+      throws IOException {
+    JobState jobState = new JobState("randomJobName", "randomJobId");
+    return jobState;
+  }
+
+  public static class DummyDataWriterBuilder extends DataWriterBuilder<String, 
Integer> {
+
+    @Override
+    public DataWriter<Integer> build() throws IOException {
+      return new DummyDataWriter();
+    }
+  }
+
+  private static class DummyDataWriter implements DataWriter<Integer> {
+
+    @Override
+    public void write(Integer record) throws IOException {
+      // Nothing to do
+    }
+
+    @Override
+    public void commit() throws IOException {
+      // Nothing to do
+    }
+
+    @Override
+    public void cleanup() throws IOException {
+      // Nothing to do
+    }
+
+    @Override
+    public long recordsWritten() {
+      return 0;
+    }
+
+    @Override
+    public long bytesWritten() throws IOException {
+      return  0;
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Nothing to do
+    }
+  }
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 6d5d8e4..16a64fa 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -44,6 +44,9 @@ import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.SerializationUtils;
 
 
+/**
+ * A standalone unit to initialize and execute {@link GobblinMultiTaskAttempt} 
through deserialized {@link WorkUnit}
+ */
 public class SingleTask {
 
   private static final Logger _logger = 
LoggerFactory.getLogger(SingleTask.class);
@@ -88,6 +91,7 @@ public class SingleTask {
       _taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, 
jobState, jobBroker);
       
_taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
     } finally {
+      _logger.info("Clearing all metrics object in cache.");
       _taskattempt.cleanMetrics();
     }
   }
@@ -102,7 +106,7 @@ public class SingleTask {
     return ConfigFactory.parseProperties(jobProperties);
   }
 
-  private JobState getJobState()
+  protected JobState getJobState()
       throws java.io.IOException {
     JobState jobState;
 
@@ -119,7 +123,10 @@ public class SingleTask {
     return jobState;
   }
 
-  private List<WorkUnit> getWorkUnits()
+  /**
+   * Deserialize {@link WorkUnit}s from a path.
+   */
+  protected List<WorkUnit> getWorkUnits()
       throws IOException {
     String fileName = _workUnitFilePath.getName();
     String storeName = _workUnitFilePath.getParent().getName();
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index 04cd252..73d535f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -49,9 +49,9 @@ import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
 class SingleTaskRunner {
   private static final Logger logger = 
LoggerFactory.getLogger(SingleTaskRunner.class);
 
-  private final String jobId;
-  private final String workUnitFilePath;
-  private final Config clusterConfig;
+  protected final String jobId;
+  protected final String workUnitFilePath;
+  protected final Config clusterConfig;
   private final Path appWorkPath;
   private SingleTask task;
   private TaskExecutor taskExecutor;
@@ -69,9 +69,17 @@ class SingleTaskRunner {
 
   void run()
       throws IOException, InterruptedException {
+    this.run(false);
+  }
+
+  /**
+   *
+   * @param fail set to false in normal cases, when set to true, the 
underlying task will fail.
+   */
+  void run(boolean fail) throws IOException, InterruptedException{
     logger.info("SingleTaskRunner running.");
     startServices();
-    runTask();
+    runTask(fail);
     shutdownServices();
   }
 
@@ -96,14 +104,14 @@ class SingleTaskRunner {
     }
   }
 
-  private void runTask()
+  private void runTask(boolean fail)
       throws IOException, InterruptedException {
     logger.info("SingleTaskRunner running task.");
-    getSingleHelixTask();
+    getClusterSingleTask(fail);
     this.task.run();
   }
 
-  private void getSingleHelixTask()
+  private void getClusterSingleTask(boolean fail)
       throws IOException {
     final FileSystem fs = getFileSystem();
     final StateStores stateStores = new StateStores(this.clusterConfig, 
this.appWorkPath,
@@ -115,7 +123,12 @@ class SingleTaskRunner {
 
     final TaskAttemptBuilder taskAttemptBuilder = 
getTaskAttemptBuilder(stateStores);
 
-    this.task = new SingleTask(this.jobId, new Path(this.workUnitFilePath), 
jobStateFilePath, fs,
+    this.task = createTaskAttempt(taskAttemptBuilder, fs, stateStores, 
jobStateFilePath, fail);
+  }
+
+  protected SingleTask createTaskAttempt(TaskAttemptBuilder 
taskAttemptBuilder, FileSystem fs,
+      StateStores stateStores, Path jobStateFilePath, boolean fail) {
+    return new SingleTask(this.jobId, new Path(this.workUnitFilePath), 
jobStateFilePath, fs,
         taskAttemptBuilder, stateStores, 
GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
   }
 
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
new file mode 100644
index 0000000..f78628a
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * A source implementation that does nothing.
+ */
+public class DummySource extends AbstractSource<String, Integer> {
+
+  private static final int NUM_RECORDS_TO_EXTRACT_PER_EXTRACTOR = 10;
+  private static final int NUM_WORK_UNITS = 1;
+
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState sourceState) {
+    return Lists.newArrayList();
+  }
+
+  @Override
+  public Extractor<String, Integer> getExtractor(WorkUnitState state)
+      throws IOException {
+    return new DummyExtractor(state);
+  }
+
+  @Override
+  public void shutdown(SourceState state) {
+    // Nothing to do
+  }
+
+  /**
+   * A dummy implementation of {@link Extractor}.
+   */
+  private static class DummyExtractor implements Extractor<String, Integer> {
+
+    private final WorkUnitState workUnitState;
+    private int current;
+
+    DummyExtractor(WorkUnitState workUnitState) {
+      this.workUnitState = workUnitState;
+      workUnitState.setProp("FOO", "BAR");
+      this.current = 0;
+    }
+
+    @Override
+    public String getSchema() {
+      return "";
+    }
+
+    @Override
+    public Integer readRecord(Integer reuse)
+        throws DataRecordException, IOException {
+      // Simply just get some records and stopped
+      if (this.current > 10) {
+        return null;
+      }
+      return this.current++;
+    }
+
+    @Override
+    public long getExpectedRecordCount() {
+      return DummySource.NUM_RECORDS_TO_EXTRACT_PER_EXTRACTOR;
+    }
+
+    @Override
+    public long getHighWatermark() {
+      return this.workUnitState.getHighWaterMark();
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      // Nothing to do
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainArgumentsDataProvider.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainArgumentsDataProvider.java
index a535151..ba438b6 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainArgumentsDataProvider.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainArgumentsDataProvider.java
@@ -19,7 +19,7 @@ package org.apache.gobblin.cluster;
 
 class SingleTaskRunnerMainArgumentsDataProvider {
   static final String TEST_JOB_ID = "1";
-  static final String TEST_WORKUNIT = "/workunit.wu";
+  static final String TEST_WORKUNIT = "/_workunits/store/workunit.wu";
   static final String TEST_CLUSTER_CONF = "/cluster.conf";
 
   static String[] getArgs() {
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java
new file mode 100644
index 0000000..4fa3d74
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Notes & Usage:
+ * 0. This test could be used to reproduce task-execution issue in 
Gobblin-Cluster, within each container.
+ * 1. The workunit is being created in {@link InMemoryWuFailedSingleTask}.
+ * 2. When needed to reproduce certain errors, replace 
org.apache.gobblin.cluster.DummySource.DummyExtractor or
+ * {@link DummySource} to plug in required logic.
+ */
+public class TestSingleTaskRerun {
+
+  /**
+   * An in-memory {@link SingleTask} runner that could be used to simulate how 
it works in Gobblin-Cluster.
+   * For this example method, it fail the execution by missing certain 
configuration on purpose, catch the exception and
+   * re-run it again.
+   */
+  @Test
+  public void testMetricObjectCasting()
+      throws Exception {
+    final String clusterConfigPath = "clusterConf";
+    final String wuPath = "_workunits/store/workunit.wu";
+    String clusterConfPath = 
this.getClass().getClassLoader().getResource(clusterConfigPath).getPath();
+
+    InMemorySingleTaskRunner inMemorySingleTaskRunner =
+        new InMemorySingleTaskRunner(clusterConfPath, "testJob",
+            this.getClass().getClassLoader().getResource(wuPath).getPath());
+    try {
+      inMemorySingleTaskRunner.run(true);
+    } catch (Exception e) {
+      inMemorySingleTaskRunner.run();
+    }
+
+    Assert.assertTrue(true);
+  }
+}
diff --git a/gobblin-cluster/src/test/resources/_workunits/store/workunit.wu 
b/gobblin-cluster/src/test/resources/_workunits/store/workunit.wu
new file mode 100644
index 0000000..e69de29
diff --git a/gobblin-cluster/src/test/resources/clusterConf 
b/gobblin-cluster/src/test/resources/clusterConf
new file mode 100644
index 0000000..468d172
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/clusterConf
@@ -0,0 +1 @@
+gobblin.cluster.workDir = <FILL IN THE PATH TO THE OWNING DIRECTORY>
\ No newline at end of file
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index 5dc6bb2..f75da78 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -62,6 +62,8 @@ import org.apache.gobblin.metrics.reporter.ScheduledReporter;
 import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.util.PropertiesUtils;
 
+import lombok.Getter;
+
 
 /**
  * A class that represents a set of metrics associated with a given name.
@@ -73,6 +75,7 @@ public class GobblinMetrics {
   public static final String METRICS_ID_PREFIX = "gobblin.metrics.";
   public static final String METRICS_STATE_CUSTOM_TAGS = 
"metrics.state.custom.tags";
 
+  @Getter
   protected static final GobblinMetricsRegistry GOBBLIN_METRICS_REGISTRY = 
GobblinMetricsRegistry.getInstance();
 
   /**
@@ -148,7 +151,7 @@ public class GobblinMetrics {
    * @return a {@link GobblinMetrics} instance
    */
   public static GobblinMetrics get(final String id, final MetricContext 
parentContext, final List<Tag<?>> tags) {
-    return GOBBLIN_METRICS_REGISTRY.getOrDefault(id, new 
Callable<GobblinMetrics>() {
+    return GOBBLIN_METRICS_REGISTRY.getOrCreate(id, new 
Callable<GobblinMetrics>() {
       @Override
       public GobblinMetrics call()
           throws Exception {
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
index f64093e..60bdfa7 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
@@ -91,7 +91,7 @@ public class GobblinMetricsRegistry {
    *
    * @return a {@link GobblinMetrics} instance associated with the id
    */
-  public GobblinMetrics getOrDefault(String id, Callable<? extends 
GobblinMetrics> valueLoader) {
+  public GobblinMetrics getOrCreate(String id, Callable<? extends 
GobblinMetrics> valueLoader) {
     try {
       return this.metricsCache.get(id, valueLoader);
     } catch (ExecutionException ee) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 016998d..402a3ff 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
+import org.apache.gobblin.metrics.GobblinMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -278,10 +279,11 @@ public class GobblinMultiTaskAttempt {
     }
 
     if (hasTaskFailure) {
+      String errorMsg ="";
       for (Task task : tasks) {
         if 
(task.getTaskState().contains(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)) {
-          log.error(String.format("Task %s failed due to exception: %s", 
task.getTaskId(),
-              
task.getTaskState().getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)));
+          errorMsg = String.format("Task %s failed due to exception: %s", 
task.getTaskId(),
+              
task.getTaskState().getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY));
         }
 
         // If there are task failures then the tasks may be reattempted. Save 
a copy of the task state that is used
@@ -294,7 +296,8 @@ public class GobblinMultiTaskAttempt {
       }
 
       throw new IOException(
-          String.format("Not all tasks running in container %s completed 
successfully", containerIdOptional.or("")));
+          String.format("Not all tasks running in container %s completed 
successfully, last recorded exception[%s]",
+              containerIdOptional.or(""), errorMsg));
     }
   }
 
@@ -323,7 +326,7 @@ public class GobblinMultiTaskAttempt {
   }
 
   /**
-   * Determine if the task executed successfully in a prior attempt by 
checkitn the task state store for the success
+   * Determine if the task executed successfully in a prior attempt by 
checking the task state store for the success
    * marker.
    * @param taskId task id to check
    * @return whether the task was processed successfully in a prior attempt
@@ -367,7 +370,7 @@ public class GobblinMultiTaskAttempt {
       WorkUnit workUnit = this.workUnits.next();
       String taskId = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY);
 
-      // skip tasks that executed successsfully in a prior attempt
+      // skip tasks that executed successfully in a prior attempt
       if (taskSuccessfulInPriorAttempt(taskId)) {
         continue;
       }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
index e38c567..235ca0b 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
@@ -38,15 +38,11 @@ public class ForkMetrics extends GobblinMetrics {
   private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName";
 
   protected ForkMetrics(TaskState taskState, int index) {
-    super(name(taskState, index), parentContextForFork(taskState), 
getForkMetricsTags(taskState, index));
-  }
-
-  private static MetricContext parentContextForFork(TaskState taskState) {
-    return TaskMetrics.get(METRICS_ID_PREFIX + taskState.getJobId() + "." + 
taskState.getTaskId()).getMetricContext();
+    super(name(taskState, index), 
TaskMetrics.get(taskState).getMetricContext(), getForkMetricsTags(taskState, 
index));
   }
 
   public static ForkMetrics get(final TaskState taskState, int index) {
-    return (ForkMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(taskState, 
index), new Callable<GobblinMetrics>() {
+    return (ForkMetrics) GOBBLIN_METRICS_REGISTRY.getOrCreate(name(taskState, 
index), new Callable<GobblinMetrics>() {
       @Override
       public GobblinMetrics call() throws Exception {
         return new ForkMetrics(taskState, index);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
index 82dccdb..03146e1 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
@@ -109,7 +109,7 @@ public class JobMetrics extends GobblinMetrics {
    * @return a {@link JobMetrics} instance
    */
   public static JobMetrics get(final JobState jobState, final MetricContext 
parentContext, CreatorTag creatorTag) {
-    return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), 
new Callable<GobblinMetrics>() {
+    return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrCreate(name(jobState), 
new Callable<GobblinMetrics>() {
       @Override
       public GobblinMetrics call() throws Exception {
         return new JobMetrics(jobState, parentContext, creatorTag);
@@ -126,7 +126,7 @@ public class JobMetrics extends GobblinMetrics {
    */
   @Deprecated
   public static JobMetrics get(final JobState jobState) {
-    return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), 
new Callable<GobblinMetrics>() {
+    return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrCreate(name(jobState), 
new Callable<GobblinMetrics>() {
       @Override
       public GobblinMetrics call() throws Exception {
         return new JobMetrics(jobState, DEFAULT_CREATOR_TAG);
@@ -142,7 +142,7 @@ public class JobMetrics extends GobblinMetrics {
    * @return a {@link JobMetrics} instance
    */
   public static JobMetrics get(final JobState jobState, CreatorTag creatorTag) 
{
-    return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), 
new Callable<GobblinMetrics>() {
+    return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrCreate(name(jobState), 
new Callable<GobblinMetrics>() {
       @Override
       public GobblinMetrics call() throws Exception {
         return new JobMetrics(jobState, creatorTag);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
index 0ef2934..904c015 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
@@ -52,7 +52,7 @@ public class TaskMetrics extends GobblinMetrics {
    * @return a {@link TaskMetrics} instance
    */
   public static TaskMetrics get(final TaskState taskState) {
-    return (TaskMetrics) 
GOBBLIN_METRICS_REGISTRY.getOrDefault(name(taskState), new 
Callable<GobblinMetrics>() {
+    return (TaskMetrics) GOBBLIN_METRICS_REGISTRY.getOrCreate(name(taskState), 
new Callable<GobblinMetrics>() {
       @Override
       public GobblinMetrics call() throws Exception {
         return new TaskMetrics(taskState);
@@ -85,7 +85,7 @@ public class TaskMetrics extends GobblinMetrics {
     remove(name(task));
   }
 
-  private static String name(TaskState taskState) {
+  public static String name(TaskState taskState) {
     return METRICS_ID_PREFIX + taskState.getJobId() + "." + 
taskState.getTaskId();
   }
 

Reply via email to