This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4610f895f GOBBLIN-2033 set unique writer output dir by appending
taskAttemptId to writer output path (#3910)
4610f895f is described below
commit 4610f895f8aa9bda51259a48025d7a039f09a25e
Author: rongshen <[email protected]>
AuthorDate: Wed Apr 10 17:09:11 2024 -0700
GOBBLIN-2033 set unique writer output dir by appending taskAttemptId to
writer output path (#3910)
* ETL-15191 update writer output dir to include taskAttemptId
* [GOBBLIN-2033] enable unique writer output dir with task attempt id
* append timestamp to taskoutput path
---------
Co-authored-by: Rong Shen <[email protected]>
---
.../extractor/extract/FlushingExtractor.java | 30 ++++++++++++++++++++++
.../extract/kafka/KafkaStreamingExtractorTest.java | 16 ++++++++++++
2 files changed, 46 insertions(+)
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
index 1d7f80c7e..1b1f56f56 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
@@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.ack.Ackable;
import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.MetricContextUtils;
import org.apache.gobblin.publisher.DataPublisher;
@@ -52,6 +53,7 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.LastWatermarkTracker;
import org.apache.gobblin.writer.WatermarkStorage;
import org.apache.gobblin.writer.WatermarkTracker;
+import org.apache.hadoop.fs.Path;
/**
@@ -87,6 +89,16 @@ public abstract class FlushingExtractor<S, D> extends
EventBasedExtractor<S, D>
public static final String WATERMARK_COMMIT_TIME_METRIC =
"state.store.metrics.watermarkCommitTime";
public static final String COMMIT_STEP_METRIC_PREFIX = "commit.step.";
+ /**
+ * this property is used to append task attempt id with timestamp to the
output directory on startup.
+ * The purpose of this change is to make sure each task writes to a unique
directory. In case corrupted files from
+ * previous run generated, next run will start from a new path and will not
pick those corrupted files. <br><br>
+ *
+ * NOTE: This feature must be executed before the publisher is initialized
for publisher to use the same path.
+ * If this assumption is violated, then this feature will have
nondeterministic behavior w.r.t. data loss.
+ */
+ public static final String
ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID =
+ "flush.extractor.enableUniqueWriterOutputDirWithTaskAttemptId";
@Getter
protected Map<String, CheckpointableWatermark> lastCommittedWatermarks;
@@ -126,11 +138,29 @@ public abstract class FlushingExtractor<S, D> extends
EventBasedExtractor<S, D>
preCommitSteps.stream().map(commitStep -> new
AtomicLong(0L)).forEach(this.preCommitStepTimes::add);
postCommitSteps.stream().map(commitStep -> new
AtomicLong(0L)).forEach(this.postCommitStepTimes::add);
+ if (ConfigUtils.getBoolean(config,
ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID, false)) {
+ setUniqueTaskOutputPathWithTaskAttemptId(this.workUnitState);
+ }
+
initFlushPublisher();
MetricContextUtils.registerGauge(this.getMetricContext(),
WATERMARK_COMMIT_TIME_METRIC, this.watermarkCommitTime);
initCommitStepMetrics(this.preCommitSteps, this.postCommitSteps);
}
+ private void setUniqueTaskOutputPathWithTaskAttemptId(WorkUnitState
workUnitState) {
+
Preconditions.checkArgument(this.workUnitState.contains(ConfigurationKeys.TASK_ATTEMPT_ID_KEY),
+ String.format("Unable to updateTaskOutputPath, work unit state must
contain %s because %s is enabled",
+ ConfigurationKeys.TASK_ATTEMPT_ID_KEY,
ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID));
+
Preconditions.checkArgument(this.workUnitState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR),
+ String.format("Unable to updateTaskOutputPath, work unit state must
contain %s because %s is enabled",
+ ConfigurationKeys.WRITER_OUTPUT_DIR,
ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID));
+ String taskAttemptId =
workUnitState.getProp(ConfigurationKeys.TASK_ATTEMPT_ID_KEY);
+ String writerOutputDirWithTaskAttemptId =
+ new
Path(this.workUnitState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
+ taskAttemptId + "_" + System.currentTimeMillis()).toString();
+ this.workUnitState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR,
writerOutputDirWithTaskAttemptId);
+ }
+
private void initCommitStepMetrics(List<String>... commitStepLists) {
for (List<String> commitSteps : commitStepLists) {
for (String commitStepAlias : commitSteps) {
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
index 922d1303e..2b3d0901f 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -130,6 +131,21 @@ public class KafkaStreamingExtractorTest {
}
}
+ @Test
+ public void testWriteOutputDirUpdate() {
+ WorkUnitState state = KafkaExtractorUtils.getWorkUnitState("testTopic",
numPartitions);
+ state.setProp(FlushingExtractor.FLUSH_DATA_PUBLISHER_CLASS,
TestDataPublisher.class.getName());
+ state.setProp(ConfigurationKeys.TASK_ATTEMPT_ID_KEY,
"GobblinYarnTaskRunner_1");
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, "/tmp/test");
+
state.setProp(FlushingExtractor.ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID,
false);
+ new KafkaStreamingExtractor(state);
+ Assert.assertEquals(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
"/tmp/test");
+
state.setProp(FlushingExtractor.ENABLE_UNIQUE_WRITER_OUTPUT_DIR_WITH_TASK_ATTEMPT_ID,
true);
+ new KafkaStreamingExtractor(state);
+ Assert.assertTrue(
+
state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR).contains("/tmp/test/GobblinYarnTaskRunner_1_"));
+ }
+
static class TestDataPublisher extends DataPublisher {
public TestDataPublisher(WorkUnitState state) {
super(state);