This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 166eb72 [GOBBLIN-1042] Fix ForkMetric incorrect return type of parent
metric object and relevant unit tests
166eb72 is described below
commit 166eb725c01c279068aaefa2c9af93c907bd9eff
Author: autumnust <[email protected]>
AuthorDate: Thu Feb 13 15:54:38 2020 -0800
[GOBBLIN-1042] Fix ForkMetric incorrect return type of parent metric object
and relevant unit tests
Closes #2882 from autumnust/metricClassCast
---
.../apache/gobblin/cluster/ContainerMetrics.java | 2 +-
.../gobblin/cluster/InMemorySingleTaskRunner.java | 41 ++++++++
.../cluster/InMemoryWuFailedSingleTask.java | 61 ++++++++++++
.../gobblin/cluster/InMemoryWuSingleTask.java | 106 +++++++++++++++++++++
.../org/apache/gobblin/cluster/SingleTask.java | 11 ++-
.../apache/gobblin/cluster/SingleTaskRunner.java | 29 ++++--
.../org/apache/gobblin/cluster/DummySource.java | 102 ++++++++++++++++++++
.../SingleTaskRunnerMainArgumentsDataProvider.java | 2 +-
.../gobblin/cluster/TestSingleTaskRerun.java | 56 +++++++++++
.../test/resources/_workunits/store/workunit.wu | 0
gobblin-cluster/src/test/resources/clusterConf | 1 +
.../org/apache/gobblin/metrics/GobblinMetrics.java | 5 +-
.../gobblin/metrics/GobblinMetricsRegistry.java | 2 +-
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 13 ++-
.../apache/gobblin/runtime/util/ForkMetrics.java | 8 +-
.../apache/gobblin/runtime/util/JobMetrics.java | 6 +-
.../apache/gobblin/runtime/util/TaskMetrics.java | 4 +-
17 files changed, 419 insertions(+), 30 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerMetrics.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerMetrics.java
index 62f3652..252d197 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerMetrics.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerMetrics.java
@@ -48,7 +48,7 @@ public class ContainerMetrics extends GobblinMetrics {
*/
public static ContainerMetrics get(final State containerState, final String
applicationName,
final String workerId) {
- return (ContainerMetrics)
GOBBLIN_METRICS_REGISTRY.getOrDefault(name(workerId), new
Callable<GobblinMetrics>() {
+ return (ContainerMetrics)
GOBBLIN_METRICS_REGISTRY.getOrCreate(name(workerId), new
Callable<GobblinMetrics>() {
@Override public GobblinMetrics call() throws Exception {
return new ContainerMetrics(containerState, applicationName, workerId);
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
new file mode 100644
index 0000000..7af586e
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemorySingleTaskRunner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * An taskRunner for in-memory {@link SingleTask} that can be switched to run
a meant-to-failed task.
+ */
+public class InMemorySingleTaskRunner extends SingleTaskRunner {
+ public InMemorySingleTaskRunner(String clusterConfigFilePath, String jobId,
String workUnitFilePath) {
+ super(clusterConfigFilePath, jobId, workUnitFilePath);
+ }
+
+ @Override
+ protected SingleTask createTaskAttempt(TaskAttemptBuilder
taskAttemptBuilder, FileSystem fs, StateStores stateStores,
+ Path jobStateFilePath, boolean fail) {
+ return !fail ? new InMemoryWuSingleTask(this.jobId, new
Path(this.workUnitFilePath), jobStateFilePath, fs,
+ taskAttemptBuilder, stateStores,
GobblinClusterUtils.getDynamicConfig(this.clusterConfig))
+ : new InMemoryWuFailedSingleTask(this.jobId, new
Path(this.workUnitFilePath), jobStateFilePath, fs,
+ taskAttemptBuilder, stateStores,
GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
+ }
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
new file mode 100644
index 0000000..75cbf61
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+
+/**
+ * Instead of deserializing {@link JobState} and {@link WorkUnit} from
filesystem, create them in memory.
+ * This derived class will be failing due to missing the declaration of
writerBuilder class thereby failing a Precondition
+ * check in AvroWriterBuilder which is used by default.
+ */
+public class InMemoryWuFailedSingleTask extends SingleTask {
+ public InMemoryWuFailedSingleTask(String jobId, Path workUnitFilePath, Path
jobStateFilePath, FileSystem fs,
+ TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config
dynamicConfig) {
+ super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder,
stateStores, dynamicConfig);
+ }
+
+ @Override
+ protected List<WorkUnit> getWorkUnits()
+ throws IOException {
+ // Create WorkUnit in memory.
+ WorkUnit workUnit = new WorkUnit();
+ workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "randomTask");
+ workUnit.setProp("source.class", "org.apache.gobblin.cluster.DummySource");
+ return Lists.newArrayList(workUnit);
+ }
+
+ @Override
+ protected JobState getJobState()
+ throws IOException {
+ JobState jobState = new JobState("randomJobName", "randomJobId");
+ return jobState;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
new file mode 100644
index 0000000..f60ba9a
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+/**
+ * Instead of deserializing {@link JobState} and {@link WorkUnit} from
filesystem, create them in memory.
+ * Uses {@link DummyDataWriter} so that the execution of a Task goes through.
+ *
+ * This extension class added a declared dummyWriterBuilder so that the task
execution will go through.
+ * This class is primarily designed for testing purpose.
+ */
+public class InMemoryWuSingleTask extends SingleTask {
+ public InMemoryWuSingleTask(String jobId, Path workUnitFilePath, Path
jobStateFilePath, FileSystem fs,
+ TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config
dynamicConfig) {
+ super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder,
stateStores, dynamicConfig);
+ }
+
+ @Override
+ protected List<WorkUnit> getWorkUnits()
+ throws IOException {
+ WorkUnit workUnit = new WorkUnit();
+ workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "randomTask");
+ workUnit.setProp("source.class", "org.apache.gobblin.cluster.DummySource");
+ // Missing this line leads to failure in precondition check of avro writer.
+ workUnit.setProp(ConfigurationKeys.WRITER_BUILDER_CLASS,
DummyDataWriterBuilder.class.getName());
+ return Lists.newArrayList(workUnit);
+ }
+
+ @Override
+ protected JobState getJobState()
+ throws IOException {
+ JobState jobState = new JobState("randomJobName", "randomJobId");
+ return jobState;
+ }
+
+ public static class DummyDataWriterBuilder extends DataWriterBuilder<String,
Integer> {
+
+ @Override
+ public DataWriter<Integer> build() throws IOException {
+ return new DummyDataWriter();
+ }
+ }
+
+ private static class DummyDataWriter implements DataWriter<Integer> {
+
+ @Override
+ public void write(Integer record) throws IOException {
+ // Nothing to do
+ }
+
+ @Override
+ public void commit() throws IOException {
+ // Nothing to do
+ }
+
+ @Override
+ public void cleanup() throws IOException {
+ // Nothing to do
+ }
+
+ @Override
+ public long recordsWritten() {
+ return 0;
+ }
+
+ @Override
+ public long bytesWritten() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Nothing to do
+ }
+ }
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 6d5d8e4..16a64fa 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -44,6 +44,9 @@ import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.SerializationUtils;
+/**
+ * A standalone unit to initialize and execute {@link GobblinMultiTaskAttempt}
through deserialized {@link WorkUnit}
+ */
public class SingleTask {
private static final Logger _logger =
LoggerFactory.getLogger(SingleTask.class);
@@ -88,6 +91,7 @@ public class SingleTask {
_taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId,
jobState, jobBroker);
_taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
} finally {
+ _logger.info("Clearing all metrics object in cache.");
_taskattempt.cleanMetrics();
}
}
@@ -102,7 +106,7 @@ public class SingleTask {
return ConfigFactory.parseProperties(jobProperties);
}
- private JobState getJobState()
+ protected JobState getJobState()
throws java.io.IOException {
JobState jobState;
@@ -119,7 +123,10 @@ public class SingleTask {
return jobState;
}
- private List<WorkUnit> getWorkUnits()
+ /**
+ * Deserialize {@link WorkUnit}s from a path.
+ */
+ protected List<WorkUnit> getWorkUnits()
throws IOException {
String fileName = _workUnitFilePath.getName();
String storeName = _workUnitFilePath.getParent().getName();
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index 04cd252..73d535f 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -49,9 +49,9 @@ import static
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
class SingleTaskRunner {
private static final Logger logger =
LoggerFactory.getLogger(SingleTaskRunner.class);
- private final String jobId;
- private final String workUnitFilePath;
- private final Config clusterConfig;
+ protected final String jobId;
+ protected final String workUnitFilePath;
+ protected final Config clusterConfig;
private final Path appWorkPath;
private SingleTask task;
private TaskExecutor taskExecutor;
@@ -69,9 +69,17 @@ class SingleTaskRunner {
void run()
throws IOException, InterruptedException {
+ this.run(false);
+ }
+
+ /**
+ *
+ * @param fail set to false in normal cases, when set to true, the
underlying task will fail.
+ */
+ void run(boolean fail) throws IOException, InterruptedException{
logger.info("SingleTaskRunner running.");
startServices();
- runTask();
+ runTask(fail);
shutdownServices();
}
@@ -96,14 +104,14 @@ class SingleTaskRunner {
}
}
- private void runTask()
+ private void runTask(boolean fail)
throws IOException, InterruptedException {
logger.info("SingleTaskRunner running task.");
- getSingleHelixTask();
+ getClusterSingleTask(fail);
this.task.run();
}
- private void getSingleHelixTask()
+ private void getClusterSingleTask(boolean fail)
throws IOException {
final FileSystem fs = getFileSystem();
final StateStores stateStores = new StateStores(this.clusterConfig,
this.appWorkPath,
@@ -115,7 +123,12 @@ class SingleTaskRunner {
final TaskAttemptBuilder taskAttemptBuilder =
getTaskAttemptBuilder(stateStores);
- this.task = new SingleTask(this.jobId, new Path(this.workUnitFilePath),
jobStateFilePath, fs,
+ this.task = createTaskAttempt(taskAttemptBuilder, fs, stateStores,
jobStateFilePath, fail);
+ }
+
+ protected SingleTask createTaskAttempt(TaskAttemptBuilder
taskAttemptBuilder, FileSystem fs,
+ StateStores stateStores, Path jobStateFilePath, boolean fail) {
+ return new SingleTask(this.jobId, new Path(this.workUnitFilePath),
jobStateFilePath, fs,
taskAttemptBuilder, stateStores,
GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
}
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
new file mode 100644
index 0000000..f78628a
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * A source implementation that does nothing.
+ */
+public class DummySource extends AbstractSource<String, Integer> {
+
+ private static final int NUM_RECORDS_TO_EXTRACT_PER_EXTRACTOR = 10;
+ private static final int NUM_WORK_UNITS = 1;
+
+ @Override
+ public List<WorkUnit> getWorkunits(SourceState sourceState) {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public Extractor<String, Integer> getExtractor(WorkUnitState state)
+ throws IOException {
+ return new DummyExtractor(state);
+ }
+
+ @Override
+ public void shutdown(SourceState state) {
+ // Nothing to do
+ }
+
+ /**
+ * A dummy implementation of {@link Extractor}.
+ */
+ private static class DummyExtractor implements Extractor<String, Integer> {
+
+ private final WorkUnitState workUnitState;
+ private int current;
+
+ DummyExtractor(WorkUnitState workUnitState) {
+ this.workUnitState = workUnitState;
+ workUnitState.setProp("FOO", "BAR");
+ this.current = 0;
+ }
+
+ @Override
+ public String getSchema() {
+ return "";
+ }
+
+ @Override
+ public Integer readRecord(Integer reuse)
+ throws DataRecordException, IOException {
+ // Simply just get some records and stopped
+ if (this.current > 10) {
+ return null;
+ }
+ return this.current++;
+ }
+
+ @Override
+ public long getExpectedRecordCount() {
+ return DummySource.NUM_RECORDS_TO_EXTRACT_PER_EXTRACTOR;
+ }
+
+ @Override
+ public long getHighWatermark() {
+ return this.workUnitState.getHighWaterMark();
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ // Nothing to do
+ }
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainArgumentsDataProvider.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainArgumentsDataProvider.java
index a535151..ba438b6 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainArgumentsDataProvider.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskRunnerMainArgumentsDataProvider.java
@@ -19,7 +19,7 @@ package org.apache.gobblin.cluster;
class SingleTaskRunnerMainArgumentsDataProvider {
static final String TEST_JOB_ID = "1";
- static final String TEST_WORKUNIT = "/workunit.wu";
+ static final String TEST_WORKUNIT = "/_workunits/store/workunit.wu";
static final String TEST_CLUSTER_CONF = "/cluster.conf";
static String[] getArgs() {
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java
new file mode 100644
index 0000000..4fa3d74
--- /dev/null
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTaskRerun.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Notes & Usage:
+ * 0. This test could be used to reproduce task-execution issue in
Gobblin-Cluster, within each container.
+ * 1. The workunit is being created in {@link InMemoryWuFailedSingleTask}.
+ * 2. When needed to reproduce certain errors, replace
org.apache.gobblin.cluster.DummySource.DummyExtractor or
+ * {@link DummySource} to plug in required logic.
+ */
+public class TestSingleTaskRerun {
+
+ /**
+ * An in-memory {@link SingleTask} runner that could be used to simulate how
it works in Gobblin-Cluster.
+ * For this example method, it fail the execution by missing certain
configuration on purpose, catch the exception and
+ * re-run it again.
+ */
+ @Test
+ public void testMetricObjectCasting()
+ throws Exception {
+ final String clusterConfigPath = "clusterConf";
+ final String wuPath = "_workunits/store/workunit.wu";
+ String clusterConfPath =
this.getClass().getClassLoader().getResource(clusterConfigPath).getPath();
+
+ InMemorySingleTaskRunner inMemorySingleTaskRunner =
+ new InMemorySingleTaskRunner(clusterConfPath, "testJob",
+ this.getClass().getClassLoader().getResource(wuPath).getPath());
+ try {
+ inMemorySingleTaskRunner.run(true);
+ } catch (Exception e) {
+ inMemorySingleTaskRunner.run();
+ }
+
+ Assert.assertTrue(true);
+ }
+}
diff --git a/gobblin-cluster/src/test/resources/_workunits/store/workunit.wu
b/gobblin-cluster/src/test/resources/_workunits/store/workunit.wu
new file mode 100644
index 0000000..e69de29
diff --git a/gobblin-cluster/src/test/resources/clusterConf
b/gobblin-cluster/src/test/resources/clusterConf
new file mode 100644
index 0000000..468d172
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/clusterConf
@@ -0,0 +1 @@
+gobblin.cluster.workDir = <FILL IN THE PATH TO THE OWNING DIRECTORY>
\ No newline at end of file
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index 5dc6bb2..f75da78 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -62,6 +62,8 @@ import org.apache.gobblin.metrics.reporter.ScheduledReporter;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.util.PropertiesUtils;
+import lombok.Getter;
+
/**
* A class that represents a set of metrics associated with a given name.
@@ -73,6 +75,7 @@ public class GobblinMetrics {
public static final String METRICS_ID_PREFIX = "gobblin.metrics.";
public static final String METRICS_STATE_CUSTOM_TAGS =
"metrics.state.custom.tags";
+ @Getter
protected static final GobblinMetricsRegistry GOBBLIN_METRICS_REGISTRY =
GobblinMetricsRegistry.getInstance();
/**
@@ -148,7 +151,7 @@ public class GobblinMetrics {
* @return a {@link GobblinMetrics} instance
*/
public static GobblinMetrics get(final String id, final MetricContext
parentContext, final List<Tag<?>> tags) {
- return GOBBLIN_METRICS_REGISTRY.getOrDefault(id, new
Callable<GobblinMetrics>() {
+ return GOBBLIN_METRICS_REGISTRY.getOrCreate(id, new
Callable<GobblinMetrics>() {
@Override
public GobblinMetrics call()
throws Exception {
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
index f64093e..60bdfa7 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
@@ -91,7 +91,7 @@ public class GobblinMetricsRegistry {
*
* @return a {@link GobblinMetrics} instance associated with the id
*/
- public GobblinMetrics getOrDefault(String id, Callable<? extends
GobblinMetrics> valueLoader) {
+ public GobblinMetrics getOrCreate(String id, Callable<? extends
GobblinMetrics> valueLoader) {
try {
return this.metricsCache.get(id, valueLoader);
} catch (ExecutionException ee) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 016998d..402a3ff 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import org.apache.gobblin.metrics.GobblinMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -278,10 +279,11 @@ public class GobblinMultiTaskAttempt {
}
if (hasTaskFailure) {
+ String errorMsg ="";
for (Task task : tasks) {
if
(task.getTaskState().contains(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)) {
- log.error(String.format("Task %s failed due to exception: %s",
task.getTaskId(),
-
task.getTaskState().getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)));
+ errorMsg = String.format("Task %s failed due to exception: %s",
task.getTaskId(),
+
task.getTaskState().getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY));
}
// If there are task failures then the tasks may be reattempted. Save
a copy of the task state that is used
@@ -294,7 +296,8 @@ public class GobblinMultiTaskAttempt {
}
throw new IOException(
- String.format("Not all tasks running in container %s completed
successfully", containerIdOptional.or("")));
+ String.format("Not all tasks running in container %s completed
successfully, last recorded exception[%s]",
+ containerIdOptional.or(""), errorMsg));
}
}
@@ -323,7 +326,7 @@ public class GobblinMultiTaskAttempt {
}
/**
- * Determine if the task executed successfully in a prior attempt by
checkitn the task state store for the success
+ * Determine if the task executed successfully in a prior attempt by
checking the task state store for the success
* marker.
* @param taskId task id to check
* @return whether the task was processed successfully in a prior attempt
@@ -367,7 +370,7 @@ public class GobblinMultiTaskAttempt {
WorkUnit workUnit = this.workUnits.next();
String taskId = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY);
- // skip tasks that executed successsfully in a prior attempt
+ // skip tasks that executed successfully in a prior attempt
if (taskSuccessfulInPriorAttempt(taskId)) {
continue;
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
index e38c567..235ca0b 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
@@ -38,15 +38,11 @@ public class ForkMetrics extends GobblinMetrics {
private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName";
protected ForkMetrics(TaskState taskState, int index) {
- super(name(taskState, index), parentContextForFork(taskState),
getForkMetricsTags(taskState, index));
- }
-
- private static MetricContext parentContextForFork(TaskState taskState) {
- return TaskMetrics.get(METRICS_ID_PREFIX + taskState.getJobId() + "." +
taskState.getTaskId()).getMetricContext();
+ super(name(taskState, index),
TaskMetrics.get(taskState).getMetricContext(), getForkMetricsTags(taskState,
index));
}
public static ForkMetrics get(final TaskState taskState, int index) {
- return (ForkMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(taskState,
index), new Callable<GobblinMetrics>() {
+ return (ForkMetrics) GOBBLIN_METRICS_REGISTRY.getOrCreate(name(taskState,
index), new Callable<GobblinMetrics>() {
@Override
public GobblinMetrics call() throws Exception {
return new ForkMetrics(taskState, index);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
index 82dccdb..03146e1 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
@@ -109,7 +109,7 @@ public class JobMetrics extends GobblinMetrics {
* @return a {@link JobMetrics} instance
*/
public static JobMetrics get(final JobState jobState, final MetricContext
parentContext, CreatorTag creatorTag) {
- return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState),
new Callable<GobblinMetrics>() {
+ return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrCreate(name(jobState),
new Callable<GobblinMetrics>() {
@Override
public GobblinMetrics call() throws Exception {
return new JobMetrics(jobState, parentContext, creatorTag);
@@ -126,7 +126,7 @@ public class JobMetrics extends GobblinMetrics {
*/
@Deprecated
public static JobMetrics get(final JobState jobState) {
- return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState),
new Callable<GobblinMetrics>() {
+ return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrCreate(name(jobState),
new Callable<GobblinMetrics>() {
@Override
public GobblinMetrics call() throws Exception {
return new JobMetrics(jobState, DEFAULT_CREATOR_TAG);
@@ -142,7 +142,7 @@ public class JobMetrics extends GobblinMetrics {
* @return a {@link JobMetrics} instance
*/
public static JobMetrics get(final JobState jobState, CreatorTag creatorTag)
{
- return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState),
new Callable<GobblinMetrics>() {
+ return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrCreate(name(jobState),
new Callable<GobblinMetrics>() {
@Override
public GobblinMetrics call() throws Exception {
return new JobMetrics(jobState, creatorTag);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
index 0ef2934..904c015 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
@@ -52,7 +52,7 @@ public class TaskMetrics extends GobblinMetrics {
* @return a {@link TaskMetrics} instance
*/
public static TaskMetrics get(final TaskState taskState) {
- return (TaskMetrics)
GOBBLIN_METRICS_REGISTRY.getOrDefault(name(taskState), new
Callable<GobblinMetrics>() {
+ return (TaskMetrics) GOBBLIN_METRICS_REGISTRY.getOrCreate(name(taskState),
new Callable<GobblinMetrics>() {
@Override
public GobblinMetrics call() throws Exception {
return new TaskMetrics(taskState);
@@ -85,7 +85,7 @@ public class TaskMetrics extends GobblinMetrics {
remove(name(task));
}
- private static String name(TaskState taskState) {
+ public static String name(TaskState taskState) {
return METRICS_ID_PREFIX + taskState.getJobId() + "." +
taskState.getTaskId();
}