This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4610f895f GOBBLIN-2033 set unique writer output dir by appending 
taskAttemptId to writer output path (#3910)
4610f895f is described below

commit 4610f895f8aa9bda51259a48025d7a039f09a25e
Author: rongshen <[email protected]>
AuthorDate: Wed Apr 10 17:09:11 2024 -0700

    GOBBLIN-2033 set unique writer output dir by appending taskAttemptId to 
writer output path (#3910)
    
    * ETL-15191 update writer output dir to include taskAttemptId
    
    * [GOBBLIN-2033] enable unique writer output dir with task attempt id
    
    * append timestamp to taskoutput path
    
    ---------
    
    Co-authored-by: Rong Shen <[email protected]>
---
 .../extractor/extract/FlushingExtractor.java       | 30 ++++++++++++++++++++++
 .../extract/kafka/KafkaStreamingExtractorTest.java | 16 ++++++++++++
 2 files changed, 46 insertions(+)

diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
index 1d7f80c7e..1b1f56f56 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
@@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.ack.Ackable;
 import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metrics.MetricContextUtils;
 import org.apache.gobblin.publisher.DataPublisher;
@@ -52,6 +53,7 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.writer.LastWatermarkTracker;
 import org.apache.gobblin.writer.WatermarkStorage;
 import org.apache.gobblin.writer.WatermarkTracker;
+import org.apache.hadoop.fs.Path;
 
 
 /**
@@ -87,6 +89,16 @@ public abstract class FlushingExtractor<S, D> extends 
EventBasedExtractor<S, D>
 
   public static final String WATERMARK_COMMIT_TIME_METRIC = 
"state.store.metrics.watermarkCommitTime";
   public static final String COMMIT_STEP_METRIC_PREFIX = "commit.step.";
+  /**
+   * this property is used to append task attempt id with timestamp to the 
output directory on startup.
+   * The purpose of this change is to make sure each task writes to a unique 
directory. In case corrupted files from
+   * previous run generated, next run will start from a new path and will not 
pick those corrupted files. <br><br>
+   *
+   * NOTE: This feature must be executed before the publisher is initialized 
for publisher to use the same path.
+   * If this assumption is violated, then this feature will have 
nondeterministic behavior w.r.t. data loss.
+   */
+  public static final String 
ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID =
+      "flush.extractor.enableUniqueWriterOutputDirWithTaskAttemptId";
 
   @Getter
   protected Map<String, CheckpointableWatermark> lastCommittedWatermarks;
@@ -126,11 +138,29 @@ public abstract class FlushingExtractor<S, D> extends 
EventBasedExtractor<S, D>
     preCommitSteps.stream().map(commitStep -> new 
AtomicLong(0L)).forEach(this.preCommitStepTimes::add);
     postCommitSteps.stream().map(commitStep -> new 
AtomicLong(0L)).forEach(this.postCommitStepTimes::add);
 
+    if (ConfigUtils.getBoolean(config, 
ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID, false)) {
+      setUniqueTaskOutputPathWithTaskAttemptId(this.workUnitState);
+    }
+
     initFlushPublisher();
     MetricContextUtils.registerGauge(this.getMetricContext(), 
WATERMARK_COMMIT_TIME_METRIC, this.watermarkCommitTime);
     initCommitStepMetrics(this.preCommitSteps, this.postCommitSteps);
   }
 
+  private void setUniqueTaskOutputPathWithTaskAttemptId(WorkUnitState 
workUnitState) {
+    
Preconditions.checkArgument(this.workUnitState.contains(ConfigurationKeys.TASK_ATTEMPT_ID_KEY),
+        String.format("Unable to updateTaskOutputPath, work unit state must 
contain %s because %s is enabled",
+            ConfigurationKeys.TASK_ATTEMPT_ID_KEY, 
ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID));
+    
Preconditions.checkArgument(this.workUnitState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR),
+        String.format("Unable to updateTaskOutputPath, work unit state must 
contain %s because %s is enabled",
+            ConfigurationKeys.WRITER_OUTPUT_DIR, 
ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID));
+    String taskAttemptId = 
workUnitState.getProp(ConfigurationKeys.TASK_ATTEMPT_ID_KEY);
+    String writerOutputDirWithTaskAttemptId =
+            new 
Path(this.workUnitState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
+                taskAttemptId + "_" + System.currentTimeMillis()).toString();
+    this.workUnitState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, 
writerOutputDirWithTaskAttemptId);
+  }
+
   private void initCommitStepMetrics(List<String>... commitStepLists) {
     for (List<String> commitSteps : commitStepLists) {
       for (String commitStepAlias : commitSteps) {
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
index 922d1303e..2b3d0901f 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -130,6 +131,21 @@ public class KafkaStreamingExtractorTest {
     }
   }
 
+  @Test
+  public void testWriteOutputDirUpdate() {
+    WorkUnitState state = KafkaExtractorUtils.getWorkUnitState("testTopic", 
numPartitions);
+    state.setProp(FlushingExtractor.FLUSH_DATA_PUBLISHER_CLASS, 
TestDataPublisher.class.getName());
+    state.setProp(ConfigurationKeys.TASK_ATTEMPT_ID_KEY, 
"GobblinYarnTaskRunner_1");
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, "/tmp/test");
+    
state.setProp(FlushingExtractor.ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID,
 false);
+    new KafkaStreamingExtractor(state);
+    Assert.assertEquals(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), 
"/tmp/test");
+    
state.setProp(FlushingExtractor.ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID,
 true);
+    new KafkaStreamingExtractor(state);
+    Assert.assertTrue(
+        
state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR).contains("/tmp/test/GobblinYarnTaskRunner_1_"));
+  }
+
   static class TestDataPublisher extends DataPublisher {
     public TestDataPublisher(WorkUnitState state) {
       super(state);

Reply via email to