This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 08986ce  [GOBBLIN-787] Add an option to include the task start time in 
the out…
08986ce is described below

commit 08986ce3844cb613e0f000a692252448a33fd3ae
Author: Hung Tran <[email protected]>
AuthorDate: Wed May 29 14:47:09 2019 -0700

    [GOBBLIN-787] Add an option to include the task start time in the out…
    
    Closes #2653 from htran1/unique_file_names
---
 .../gobblin/configuration/ConfigurationKeys.java   |  2 +
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   |  2 +
 .../java/org/apache/gobblin/runtime/fork/Fork.java | 17 ++++-
 .../gobblin/runtime/JobLauncherTestHelper.java     | 22 ++++++
 .../java/org/apache/gobblin/runtime/TaskTest.java  | 79 ++++++++++++++++++++++
 .../runtime/local/LocalJobLauncherTest.java        | 14 ++++
 6 files changed, 135 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 09c4c0e..58e4255 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -239,6 +239,7 @@ public class ConfigurationKeys {
   public static final String JOB_KEY_KEY = "job.key";
   public static final String TASK_ID_KEY = "task.id";
   public static final String TASK_KEY_KEY = "task.key";
+  public static final String TASK_START_TIME_MILLIS_KEY = 
"task.startTimeMillis";
   public static final String TASK_ATTEMPT_ID_KEY = "task.AttemptId";
   public static final String JOB_CONFIG_FILE_PATH_KEY = "job.config.path";
   public static final String TASK_FAILURE_EXCEPTION_KEY = 
"task.failure.exception";
@@ -380,6 +381,7 @@ public class ConfigurationKeys {
   public static final String DEFAULT_WRITER_FILE_PATH_TYPE = "default";
   public static final String SIMPLE_WRITER_DELIMITER = 
"simple.writer.delimiter";
   public static final String SIMPLE_WRITER_PREPEND_SIZE = 
"simple.writer.prepend.size";
+  public static final String WRITER_ADD_TASK_TIMESTAMP = WRITER_PREFIX + 
".addTaskTimestamp";
 
 
   // Internal use only - used to send metadata to publisher
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 533ba12..f327d8d 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -375,6 +375,8 @@ public class GobblinMultiTaskAttempt {
       workUnitState.setId(taskId);
       workUnitState.setProp(ConfigurationKeys.JOB_ID_KEY, this.jobId);
       workUnitState.setProp(ConfigurationKeys.TASK_ID_KEY, taskId);
+      workUnitState.setProp(ConfigurationKeys.TASK_START_TIME_MILLIS_KEY, 
Long.toString(System.currentTimeMillis()));
+
       if (this.containerIdOptional.isPresent()) {
         workUnitState.setProp(ConfigurationKeys.TASK_ATTEMPT_ID_KEY, 
this.containerIdOptional.get());
       }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index bbe26dd..bb5183f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -529,9 +529,24 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
    */
   private DataWriter<Object> buildWriter()
       throws IOException {
+    String writerId = this.taskId;
+
+    // Add the task starting time if configured.
+    // This is used to reduce file name collisions which can happen due to the 
execution of a workunit across multiple
+    // task instances.
+    // File names are generated from the writerId which is based on the 
taskId. Different instances of
+    // the task have the same taskId, so file name collisions can occur.
+    // Adding the task start time to the taskId gives a writerId that should 
be different across task instances.
+    if 
(this.taskState.getPropAsBoolean(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP, 
false)) {
+      String taskStartTime = 
this.taskState.getProp(ConfigurationKeys.TASK_START_TIME_MILLIS_KEY);
+      Preconditions.checkArgument(taskStartTime != null, 
ConfigurationKeys.TASK_START_TIME_MILLIS_KEY + " has not been set");
+
+      writerId = this.taskId + "_" + taskStartTime;
+    }
+
     DataWriterBuilder<Object, Object> builder = 
this.taskContext.getDataWriterBuilder(this.branches, this.index)
         
.writeTo(Destination.of(this.taskContext.getDestinationType(this.branches, 
this.index), this.taskState))
-        .writeInFormat(this.taskContext.getWriterOutputFormat(this.branches, 
this.index)).withWriterId(this.taskId)
+        .writeInFormat(this.taskContext.getWriterOutputFormat(this.branches, 
this.index)).withWriterId(writerId)
         
.withSchema(this.convertedSchema.orNull()).withBranches(this.branches).forBranch(this.index);
     if (this.taskAttemptId.isPresent()) {
       builder.withAttemptId(this.taskAttemptId.get());
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index c947dfc..e608e44 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -92,6 +94,26 @@ public class JobLauncherTestHelper {
       Assert.assertEquals(taskState.getWorkingState(), 
WorkUnitState.WorkingState.COMMITTED);
       
Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN),
           TestExtractor.TOTAL_RECORDS);
+
+      // if the addition of the task timestamp is configured then validate 
that the file name has the expected format
+      if 
(Boolean.valueOf(taskState.getProp(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP, 
"false"))) {
+        String pattern = ".*part.task_.*_(\\d+)_\\d+_(\\d+)_\\d+.avro";
+        String value = 
taskState.getProp(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS);
+        Pattern r = Pattern.compile(pattern);
+        Matcher m = 
r.matcher(taskState.getProp(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS));
+        long timeBuffer = 5 * 60 * 1000;
+        if (!m.matches()) {
+          Assert.fail("no matches for " + value);
+        }
+
+        Long currentTime = System.currentTimeMillis();
+        Assert.assertTrue(Long.valueOf(m.group(1)) > currentTime - timeBuffer);
+        Assert.assertTrue(Long.valueOf(m.group(1)) < currentTime);
+        // the task time should be after the job time
+        Assert.assertTrue(Long.valueOf(m.group(1)) < Long.valueOf(m.group(2)));
+        Assert.assertTrue(Long.valueOf(m.group(2)) > currentTime - timeBuffer);
+        Assert.assertTrue(Long.valueOf(m.group(2)) < currentTime);
+      }
     }
   }
 
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
index 84c4f0a..60f368c 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.gobblin.runtime.util.TaskMetrics;
@@ -320,6 +321,84 @@ public class TaskTest {
   }
 
   /**
+   * Test the addition of a task timestamp to the file name
+   */
+  @Test
+  public void testTimestampInFilename()
+      throws Exception {
+    // Create a TaskState
+    TaskState taskState = getEmptyTestTaskState("testTimestampInFilename");
+    taskState.setProp(ConfigurationKeys.TASK_START_TIME_MILLIS_KEY, "12345");
+    taskState.setProp(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP, "true");
+
+    int numRecords = 1;
+    int numForks = 1;
+    ForkOperator mockForkOperator = new RoundRobinForkOperator(numForks);
+
+    ArrayList<ArrayList<Object>> recordCollectors = new ArrayList<>(numForks);
+    for (int i=0; i < numForks; ++i) {
+      recordCollectors.add(new ArrayList<>());
+    }
+
+    TaskContext mockTaskContext = getMockTaskContext(taskState,
+        new StringExtractor(numRecords), recordCollectors, mockForkOperator);
+
+    // Create a mock TaskStateTracker
+    TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
+
+    // Create a TaskExecutor - a real TaskExecutor must be created so a Fork 
is run in a separate thread
+    TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+
+    // Create the Task
+    Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, 
Optional.<CountDownLatch>absent());
+
+    // Run and commit
+    task.run();
+    task.commit();
+
+    DataWriterBuilder writerBuilder = 
mockTaskContext.getDataWriterBuilder(numForks, 0);
+
+    // writer id should have the expected name with the timestamp
+    Assert.assertEquals(writerBuilder.getWriterId(), 
"testTimestampInFilename_12345_0");
+  }
+
+  /**
+   * Test the addition of a task timestamp to the file name fails if the task 
start time is not present
+   */
+  @Test(expectedExceptions = {ExecutionException.class, 
NullPointerException.class})
+  public void testTimestampInFilenameError()
+      throws Exception {
+    // Create a TaskState
+    TaskState taskState = 
getEmptyTestTaskState("testTimestampInFilenameError");
+    taskState.setProp(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP, "true");
+
+    int numRecords = 1;
+    int numForks = 1;
+    ForkOperator mockForkOperator = new RoundRobinForkOperator(numForks);
+
+    ArrayList<ArrayList<Object>> recordCollectors = new ArrayList<>(numForks);
+    for (int i=0; i < numForks; ++i) {
+      recordCollectors.add(new ArrayList<>());
+    }
+
+    TaskContext mockTaskContext = getMockTaskContext(taskState,
+        new StringExtractor(numRecords), recordCollectors, mockForkOperator);
+
+    // Create a mock TaskStateTracker
+    TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
+
+    // Create a TaskExecutor - a real TaskExecutor must be created so a Fork 
is run in a separate thread
+    TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+
+    // Create the Task
+    Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, 
Optional.<CountDownLatch>absent());
+
+    // Run and commit
+    task.run();
+    task.commit();
+  }
+
+  /**
    * An implementation of {@link Extractor} that throws an {@link IOException} 
during the invocation of
    * {@link #readRecord(Object)}.
    */
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
index 8b0c481..a81aec6 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/local/LocalJobLauncherTest.java
@@ -110,6 +110,20 @@ public class LocalJobLauncherTest {
     }
   }
 
+  @Test
+  public void testLaunchJobWithTaskTimestamps() throws Exception {
+    Properties jobProps = loadJobProps();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
"-testLaunchJob");
+    jobProps.setProperty(ConfigurationKeys.WRITER_ADD_TASK_TIMESTAMP, "true");
+
+    try {
+      this.jobLauncherTestHelper.runTest(jobProps);
+    } finally {
+      
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+  }
+
   @Test(groups = { "ignore" })
   public void testCancelJob() throws Exception {
     this.jobLauncherTestHelper.runTestWithCancellation(loadJobProps());

Reply via email to