This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new dde8d1d  [GOBBLIN-1386] Increase the timeout setting for Gobblin 
Partitioned DataWriter to write a record
dde8d1d is described below

commit dde8d1dfa31a6aa2049f3d4223d3d7e694353d88
Author: Zihan Li <[email protected]>
AuthorDate: Thu Feb 11 09:16:00 2021 -0800

    [GOBBLIN-1386] Increase the timeout setting for Gobblin Partitioned 
DataWriter to write a record
    
    Closes #3226 from ZihanLi58/GOBBLIN-1386
---
 .../main/java/org/apache/gobblin/writer/PartitionedDataWriter.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 3154c85..3c95a6b 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -82,6 +82,8 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
   // in incorrect behavior.
   public static final String PARTITIONED_WRITER_CACHE_TTL_SECONDS = 
"partitionedDataWriter.cache.ttl.seconds";
   public static final Long DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS = 
Long.MAX_VALUE;
+  public static final String PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS = 
"partitionedDataWriter.write.timeout.seconds";
+  public static final Long DEFAULT_PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS = 
480L;
 
   private static final GenericRecord NON_PARTITIONED_WRITER_KEY =
       new 
GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord());
@@ -130,7 +132,10 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
       this.state.setProp(WRITER_LATEST_SCHEMA, builder.getSchema());
     }
     long cacheExpiryInterval = 
this.state.getPropAsLong(PARTITIONED_WRITER_CACHE_TTL_SECONDS, 
DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS);
-    this.writeTimeoutInterval = cacheExpiryInterval / 3;
+    this.writeTimeoutInterval = 
this.state.getPropAsLong(PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS,
+        DEFAULT_PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS);
+    // Bound the timeout value to avoid data loss when slow write happening
+    this.writeTimeoutInterval = Math.min(this.writeTimeoutInterval, 
cacheExpiryInterval / 3 * 2);
     log.debug("PartitionedDataWriter: Setting cache expiry interval to {} 
seconds", cacheExpiryInterval);
 
     this.partitionWriters = CacheBuilder.newBuilder()

Reply via email to