This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new dde8d1d [GOBBLIN-1386] Increase the timeout setting for Gobblin
Partitioned DataWriter to write a record
dde8d1d is described below
commit dde8d1dfa31a6aa2049f3d4223d3d7e694353d88
Author: Zihan Li <[email protected]>
AuthorDate: Thu Feb 11 09:16:00 2021 -0800
[GOBBLIN-1386] Increase the timeout setting for Gobblin Partitioned
DataWriter to write a record
Closes #3226 from ZihanLi58/GOBBLIN-1386
---
.../main/java/org/apache/gobblin/writer/PartitionedDataWriter.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 3154c85..3c95a6b 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -82,6 +82,8 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
// in incorrect behavior.
public static final String PARTITIONED_WRITER_CACHE_TTL_SECONDS =
"partitionedDataWriter.cache.ttl.seconds";
public static final Long DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS =
Long.MAX_VALUE;
+ public static final String PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS =
"partitionedDataWriter.write.timeout.seconds";
+ public static final Long DEFAULT_PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS =
480L;
private static final GenericRecord NON_PARTITIONED_WRITER_KEY =
new
GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord());
@@ -130,7 +132,10 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
this.state.setProp(WRITER_LATEST_SCHEMA, builder.getSchema());
}
long cacheExpiryInterval =
this.state.getPropAsLong(PARTITIONED_WRITER_CACHE_TTL_SECONDS,
DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS);
- this.writeTimeoutInterval = cacheExpiryInterval / 3;
+ this.writeTimeoutInterval =
this.state.getPropAsLong(PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS,
+ DEFAULT_PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS);
+ // Bound the timeout value to avoid data loss when slow write happening
+ this.writeTimeoutInterval = Math.min(this.writeTimeoutInterval,
cacheExpiryInterval / 3 * 2);
log.debug("PartitionedDataWriter: Setting cache expiry interval to {}
seconds", cacheExpiryInterval);
this.partitionWriters = CacheBuilder.newBuilder()