This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2e60f22 [GOBBLIN-1343] Fix the data loss issue caused by the cache
expiration in PartitionerDataWriter
2e60f22 is described below
commit 2e60f22784843b413dc08ab715d3cde886f12773
Author: Zihan Li <[email protected]>
AuthorDate: Tue Jan 5 15:24:36 2021 -0800
[GOBBLIN-1343] Fix the data loss issue caused by the cache expiration in
PartitionerDataWriter
Fix the data loss issue caused by the cache
expiration in PartitionerDataWriter
address comments
address comments and avoid overflow
Closes #3180 from ZihanLi58/GOBBLIN-1343
---
.../apache/gobblin/writer/PartitionedDataWriter.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 7a14ecb..0af84cf 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -101,6 +101,7 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
private final ControlMessageHandler controlMessageHandler;
private boolean isSpeculativeAttemptSafe;
private boolean isWatermarkCapable;
+ private long writeTimeoutInterval;
private ScheduledExecutorService cacheCleanUpExecutor;
@@ -128,6 +129,7 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
this.state.setProp(WRITER_LATEST_SCHEMA, builder.getSchema());
}
long cacheExpiryInterval =
this.state.getPropAsLong(PARTITIONED_WRITER_CACHE_TTL_SECONDS,
DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS);
+ this.writeTimeoutInterval = cacheExpiryInterval / 3;
log.debug("PartitionedDataWriter: Setting cache expiry interval to {}
seconds", cacheExpiryInterval);
this.partitionWriters = CacheBuilder.newBuilder()
@@ -235,18 +237,24 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
@Override
public void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws
IOException {
try {
- DataWriter<D> writer =
getDataWriterForRecord(recordEnvelope.getRecord());
+ GenericRecord partition =
getPartitionForRecord(recordEnvelope.getRecord());
+ DataWriter<D> writer = this.partitionWriters.get(partition);
+ long startTime = System.currentTimeMillis();
writer.writeEnvelope(recordEnvelope);
+ long timeForWriting = System.currentTimeMillis() - startTime;
+ // If the write take a long time, which is 1/3 of cache expiration time,
we fail the writer to avoid data loss
+ // and further slowness on the same HDFS block
+ if (timeForWriting / 1000 > this.writeTimeoutInterval ) {
+ throw new IOException(String.format("Write record took %s s, but
threshold is %s s",
+ timeForWriting / 1000, writeTimeoutInterval));
+ }
} catch (ExecutionException ee) {
throw new IOException(ee);
}
}
- private DataWriter<D> getDataWriterForRecord(D record)
- throws ExecutionException {
- GenericRecord partition =
- this.shouldPartition ?
this.partitioner.get().partitionForRecord(record) : NON_PARTITIONED_WRITER_KEY;
- return this.partitionWriters.get(partition);
+ private GenericRecord getPartitionForRecord(D record) {
+ return this.shouldPartition ?
this.partitioner.get().partitionForRecord(record) : NON_PARTITIONED_WRITER_KEY;
}
@Override