This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 08986ce [GOBBLIN-787] Add an option to include the task start time in
the out…
08986ce is described below
commit 08986ce3844cb613e0f000a692252448a33fd3ae
Author: Hung Tran <[email protected]>
AuthorDate: Wed May 29 14:47:09 2019 -0700
[GOBBLIN-787] Add an option to include the task start time in the out…
Closes #2653 from htran1/unique_file_names
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 2 +
.../java/org/apache/gobblin/runtime/fork/Fork.java | 17 ++++-
.../gobblin/runtime/JobLauncherTestHelper.java | 22 ++++++
.../java/org/apache/gobblin/runtime/TaskTest.java | 79 ++++++++++++++++++++++
.../runtime/local/LocalJobLauncherTest.java | 14 ++++
6 files changed, 135 insertions(+), 1 deletion(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 09c4c0e..58e4255 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -239,6 +239,7 @@ public class ConfigurationKeys {
public static final String JOB_KEY_KEY = "job.key";
public static final String TASK_ID_KEY = "task.id";
public static final String TASK_KEY_KEY = "task.key";
+ public static final String TASK_START_TIME_MILLIS_KEY =
"task.startTimeMillis";
public static final String TASK_ATTEMPT_ID_KEY = "task.AttemptId";
public static final String JOB_CONFIG_FILE_PATH_KEY = "job.config.path";
public static final String TASK_FAILURE_EXCEPTION_KEY =
"task.failure.exception";
@@ -380,6 +381,7 @@ public class ConfigurationKeys {
public static final String DEFAULT_WRITER_FILE_PATH_TYPE = "default";
public static final String SIMPLE_WRITER_DELIMITER =
"simple.writer.delimiter";
public static final String SIMPLE_WRITER_PREPEND_SIZE =
"simple.writer.prepend.size";
+ public static final String WRITER_ADD_TASK_TIMESTAMP = WRITER_PREFIX +
".addTaskTimestamp";
// Internal use only - used to send metadata to publisher
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 533ba12..f327d8d 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -375,6 +375,8 @@ public class GobblinMultiTaskAttempt {
workUnitState.setId(taskId);
workUnitState.setProp(ConfigurationKeys.JOB_ID_KEY, this.jobId);
workUnitState.setProp(ConfigurationKeys.TASK_ID_KEY, taskId);
+ workUnitState.setProp(ConfigurationKeys.TASK_START_TIME_MILLIS_KEY,
Long.toString(System.currentTimeMillis()));
+
if (this.containerIdOptional.isPresent()) {
workUnitState.setProp(ConfigurationKeys.TASK_ATTEMPT_ID_KEY,
this.containerIdOptional.get());
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index bbe26dd..bb5183f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -529,9 +529,24 @@ public class Fork<S, D> implements Closeable, FinalState,
RecordStreamConsumer<S
*/
private DataWriter<Object> buildWriter()
throws IOException {
+ String writerId = this.taskId;
+
+ // Add the task starting time if configured.
+ // This is used to reduce file name collisions which can happen due to the
execution of a workunit across multiple
+ // task instances.
+ // File names are generated from the writerId which is based on the
taskId. Different instances of
+ // the task have the same taskId, so file name collisions can occur.
+ // Adding the task start time to the taskId gives a writerId that should
be different across task instances.
+ if
(this.taskState.getPropAsBoolean(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP,
false)) {
+ String taskStartTime =
this.taskState.getProp(ConfigurationKeys.TASK_START_TIME_MILLIS_KEY);
+ Preconditions.checkArgument(taskStartTime != null,
ConfigurationKeys.TASK_START_TIME_MILLIS_KEY + " has not been set");
+
+ writerId = this.taskId + "_" + taskStartTime;
+ }
+
DataWriterBuilder<Object, Object> builder =
this.taskContext.getDataWriterBuilder(this.branches, this.index)
.writeTo(Destination.of(this.taskContext.getDestinationType(this.branches,
this.index), this.taskState))
- .writeInFormat(this.taskContext.getWriterOutputFormat(this.branches,
this.index)).withWriterId(this.taskId)
+ .writeInFormat(this.taskContext.getWriterOutputFormat(this.branches,
this.index)).withWriterId(writerId)
.withSchema(this.convertedSchema.orNull()).withBranches(this.branches).forBranch(this.index);
if (this.taskAttemptId.isPresent()) {
builder.withAttemptId(this.taskAttemptId.get());
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index c947dfc..e608e44 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -92,6 +94,26 @@ public class JobLauncherTestHelper {
Assert.assertEquals(taskState.getWorkingState(),
WorkUnitState.WorkingState.COMMITTED);
Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN),
TestExtractor.TOTAL_RECORDS);
+
+ // if the addition of the task timestamp is configured then validate
that the file name has the expected format
+ if
(Boolean.valueOf(taskState.getProp(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP,
"false"))) {
+ String pattern = ".*part.task_.*_(\\d+)_\\d+_(\\d+)_\\d+.avro";
+ String value =
taskState.getProp(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS);
+ Pattern r = Pattern.compile(pattern);
+ Matcher m =
r.matcher(taskState.getProp(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS));
+ long timeBuffer = 5 * 60 * 1000;
+ if (!m.matches()) {
+ Assert.fail("no matches for " + value);
+ }
+
+ Long currentTime = System.currentTimeMillis();
+ Assert.assertTrue(Long.valueOf(m.group(1)) > currentTime - timeBuffer);
+ Assert.assertTrue(Long.valueOf(m.group(1)) < currentTime);
+ // the task time should be after the job time
+ Assert.assertTrue(Long.valueOf(m.group(1)) < Long.valueOf(m.group(2)));
+ Assert.assertTrue(Long.valueOf(m.group(2)) > currentTime - timeBuffer);
+ Assert.assertTrue(Long.valueOf(m.group(2)) < currentTime);
+ }
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
index 84c4f0a..60f368c 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gobblin.runtime.util.TaskMetrics;
@@ -320,6 +321,84 @@ public class TaskTest {
}
/**
+ * Test the addition of a task timestamp to the file name
+ */
+ @Test
+ public void testTimestampInFilename()
+ throws Exception {
+ // Create a TaskState
+ TaskState taskState = getEmptyTestTaskState("testTimestampInFilename");
+ taskState.setProp(ConfigurationKeys.TASK_START_TIME_MILLIS_KEY, "12345");
+ taskState.setProp(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP, "true");
+
+ int numRecords = 1;
+ int numForks = 1;
+ ForkOperator mockForkOperator = new RoundRobinForkOperator(numForks);
+
+ ArrayList<ArrayList<Object>> recordCollectors = new ArrayList<>(numForks);
+ for (int i=0; i < numForks; ++i) {
+ recordCollectors.add(new ArrayList<>());
+ }
+
+ TaskContext mockTaskContext = getMockTaskContext(taskState,
+ new StringExtractor(numRecords), recordCollectors, mockForkOperator);
+
+ // Create a mock TaskStateTracker
+ TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
+
+ // Create a TaskExecutor - a real TaskExecutor must be created so a Fork
is run in a separate thread
+ TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+
+ // Create the Task
+ Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor,
Optional.<CountDownLatch>absent());
+
+ // Run and commit
+ task.run();
+ task.commit();
+
+ DataWriterBuilder writerBuilder =
mockTaskContext.getDataWriterBuilder(numForks, 0);
+
+ // writer id should have the expected name with the timestamp
+ Assert.assertEquals(writerBuilder.getWriterId(),
"testTimestampInFilename_12345_0");
+ }
+
+ /**
+ * Test the addition of a task timestamp to the file name fails if the task
start time is not present
+ */
+ @Test(expectedExceptions = {ExecutionException.class,
NullPointerException.class})
+ public void testTimestampInFilenameError()
+ throws Exception {
+ // Create a TaskState
+ TaskState taskState =
getEmptyTestTaskState("testTimestampInFilenameError");
+ taskState.setProp(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP, "true");
+
+ int numRecords = 1;
+ int numForks = 1;
+ ForkOperator mockForkOperator = new RoundRobinForkOperator(numForks);
+
+ ArrayList<ArrayList<Object>> recordCollectors = new ArrayList<>(numForks);
+ for (int i=0; i < numForks; ++i) {
+ recordCollectors.add(new ArrayList<>());
+ }
+
+ TaskContext mockTaskContext = getMockTaskContext(taskState,
+ new StringExtractor(numRecords), recordCollectors, mockForkOperator);
+
+ // Create a mock TaskStateTracker
+ TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
+
+ // Create a TaskExecutor - a real TaskExecutor must be created so a Fork
is run in a separate thread
+ TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+
+ // Create the Task
+ Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor,
Optional.<CountDownLatch>absent());
+
+ // Run and commit
+ task.run();
+ task.commit();
+ }
+
+ /**
* An implementation of {@link Extractor} that throws an {@link IOException}
during the invocation of
* {@link #readRecord(Object)}.
*/
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
index 8b0c481..a81aec6 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
@@ -110,6 +110,20 @@ public class LocalJobLauncherTest {
}
}
+ @Test
+ public void testLaunchJobWithTaskTimestamps() throws Exception {
+ Properties jobProps = loadJobProps();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
"-testLaunchJob");
+ jobProps.setProperty(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP, "true");
+
+ try {
+ this.jobLauncherTestHelper.runTest(jobProps);
+ } finally {
+
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+ }
+
@Test(groups = { "ignore" })
public void testCancelJob() throws Exception {
this.jobLauncherTestHelper.runTestWithCancellation(loadJobProps());