This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e60f22  [GOBBLIN-1343] Fix the data loss issue caused by the cache 
expiration in PartitionerDataWriter
2e60f22 is described below

commit 2e60f22784843b413dc08ab715d3cde886f12773
Author: Zihan Li <[email protected]>
AuthorDate: Tue Jan 5 15:24:36 2021 -0800

    [GOBBLIN-1343] Fix the data loss issue caused by the cache expiration in 
PartitionerDataWriter
    
    Fix the data loss issue caused by the cache
    expiration in PartitionerDataWriter
    
    address comments
    
    address comments and avoid overflow
    
    Closes #3180 from ZihanLi58/GOBBLIN-1343
---
 .../apache/gobblin/writer/PartitionedDataWriter.java | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 7a14ecb..0af84cf 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -101,6 +101,7 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
   private final ControlMessageHandler controlMessageHandler;
   private boolean isSpeculativeAttemptSafe;
   private boolean isWatermarkCapable;
+  private long writeTimeoutInterval;
 
   private ScheduledExecutorService cacheCleanUpExecutor;
 
@@ -128,6 +129,7 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
       this.state.setProp(WRITER_LATEST_SCHEMA, builder.getSchema());
     }
     long cacheExpiryInterval = 
this.state.getPropAsLong(PARTITIONED_WRITER_CACHE_TTL_SECONDS, 
DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS);
+    this.writeTimeoutInterval = cacheExpiryInterval / 3;
     log.debug("PartitionedDataWriter: Setting cache expiry interval to {} 
seconds", cacheExpiryInterval);
 
     this.partitionWriters = CacheBuilder.newBuilder()
@@ -235,18 +237,24 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
   @Override
   public void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws 
IOException {
     try {
-      DataWriter<D> writer = 
getDataWriterForRecord(recordEnvelope.getRecord());
+      GenericRecord partition = 
getPartitionForRecord(recordEnvelope.getRecord());
+      DataWriter<D> writer = this.partitionWriters.get(partition);
+      long startTime = System.currentTimeMillis();
       writer.writeEnvelope(recordEnvelope);
+      long timeForWriting = System.currentTimeMillis() - startTime;
+      // If the write take a long time, which is 1/3 of cache expiration time, 
we fail the writer to avoid data loss
+      // and further slowness on the same HDFS block
+      if (timeForWriting / 1000 > this.writeTimeoutInterval ) {
+        throw new IOException(String.format("Write record took %s s, but 
threshold is %s s",
+            timeForWriting / 1000, writeTimeoutInterval));
+      }
     } catch (ExecutionException ee) {
       throw new IOException(ee);
     }
   }
 
-  private DataWriter<D> getDataWriterForRecord(D record)
-      throws ExecutionException {
-    GenericRecord partition =
-        this.shouldPartition ? 
this.partitioner.get().partitionForRecord(record) : NON_PARTITIONED_WRITER_KEY;
-    return this.partitionWriters.get(partition);
+  private GenericRecord getPartitionForRecord(D record) {
+     return this.shouldPartition ? 
this.partitioner.get().partitionForRecord(record) : NON_PARTITIONED_WRITER_KEY;
   }
 
   @Override

Reply via email to