This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0c014ee89 [GOBBLIN-1828] Implement Timeout for Creating Writer
Functionality (#3690)
0c014ee89 is described below
commit 0c014ee8938710f1b980d274fd07bdb098587f93
Author: Zihan Li <[email protected]>
AuthorDate: Mon May 1 16:00:18 2023 -0700
[GOBBLIN-1828] Implement Timeout for Creating Writer Functionality (#3690)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1828] Implement Timeout for Creating Writer Functionality
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../apache/gobblin/writer/PartitionedDataWriter.java | 13 +++++++++++--
.../apache/gobblin/writer/PartitionedWriterTest.java | 18 ++++++++++++++++--
.../writer/test/TestPartitionAwareWriterBuilder.java | 14 ++++++++++++++
3 files changed, 41 insertions(+), 4 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 800676da6..d5f0e8177 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -22,9 +22,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.avro.SchemaBuilder;
@@ -115,6 +118,7 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
@Getter
@VisibleForTesting
private long totalBytesFromEvictedWriters;
+ private ExecutorService createWriterPool;
public PartitionedDataWriter(DataWriterBuilder<S, D> builder, final State
state)
@@ -125,6 +129,7 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
this.isSpeculativeAttemptSafe = true;
this.isWatermarkCapable = true;
this.baseWriterId = builder.getWriterId();
+ this.createWriterPool = Executors.newSingleThreadExecutor();
this.closer = Closer.create();
this.writerBuilder = builder;
this.controlMessageHandler = new PartitionDataWriterMessageHandler();
@@ -170,9 +175,12 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
try {
log.info(String.format("Adding one more writer to
loading cache of existing writer "
+ "with size = %d", partitionWriters.size()));
- return createPartitionWriter(key);
- } catch (IOException e) {
+ Future<DataWriter<D>> future =
createWriterPool.submit(() -> createPartitionWriter(key));
+ return future.get(writeTimeoutInterval,
TimeUnit.SECONDS);
+ } catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Error creating writer", e);
+ } catch (TimeoutException e) {
+ throw new RuntimeException(String.format("Failed to
create writer due to timeout. The operation timed out after %s seconds.",
writeTimeoutInterval), e);
}
}
}, state), state, key);
@@ -326,6 +334,7 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
serializePartitionInfoToState();
} finally {
closeWritersInCache();
+ this.createWriterPool.shutdown();
this.closer.close();
}
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
index 68f3343f8..3148e5e5b 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.writer;
+import com.google.common.util.concurrent.UncheckedExecutionException;
import java.io.IOException;
import java.util.List;
@@ -166,12 +167,25 @@ public class PartitionedWriterTest {
writer.close();
}
+ @Test
+ public void testTimeoutWhenCreatingWriter() throws IOException {
+ State state = new State();
+ state.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS,
TestPartitioner.class.getCanonicalName());
+ state.setProp(PartitionedDataWriter.PARTITIONED_WRITER_CACHE_TTL_SECONDS,
6);
+ TestPartitionAwareWriterBuilder builder = new
TestPartitionAwareWriterBuilder(true);
+
+ PartitionedDataWriter writer = new PartitionedDataWriter<String,
String>(builder, state);
+
+ String record1 = "abc";
+ Assert.expectThrows(UncheckedExecutionException.class, () ->
writer.writeEnvelope(new RecordEnvelope(record1)));
+ }
+
@Test
public void testPartitionWriterCacheRemovalListener()
throws IOException, InterruptedException {
State state = new State();
state.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS,
TestPartitioner.class.getCanonicalName());
- state.setProp(PartitionedDataWriter.PARTITIONED_WRITER_CACHE_TTL_SECONDS,
1);
+ state.setProp(PartitionedDataWriter.PARTITIONED_WRITER_CACHE_TTL_SECONDS,
3);
TestPartitionAwareWriterBuilder builder = new
TestPartitionAwareWriterBuilder();
PartitionedDataWriter writer = new PartitionedDataWriter<String,
String>(builder, state);
@@ -183,7 +197,7 @@ public class PartitionedWriterTest {
writer.writeEnvelope(new RecordEnvelope(record2));
//Sleep for more than cache expiration interval
- Thread.sleep(1500);
+ Thread.sleep(3500);
//Call cache clean up to ensure removal of expired entries.
writer.getPartitionWriters().cleanUp();
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
b/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
index 8da42d728..75eafcf51 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
@@ -37,10 +37,17 @@ import lombok.Data;
public class TestPartitionAwareWriterBuilder extends
PartitionAwareDataWriterBuilder<String, String> {
public final Queue<Action> actions = Queues.newArrayDeque();
+ private boolean testTimeout;
public enum Actions {
BUILD, WRITE, COMMIT, CLEANUP, CLOSE
}
+ public TestPartitionAwareWriterBuilder() {
+ this(false);
+ }
+ public TestPartitionAwareWriterBuilder(boolean testTimeout) {
+ this.testTimeout = testTimeout;
+ }
@Override
public boolean validatePartitionSchema(Schema partitionSchema) {
@@ -50,6 +57,13 @@ public class TestPartitionAwareWriterBuilder extends
PartitionAwareDataWriterBui
@Override
public DataWriter build()
throws IOException {
+ if (testTimeout) {
+ try {
+ Thread.sleep(10*1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
String partition =
this.partition.get().get(TestPartitioner.PARTITION).toString();
this.actions.add(new Action(Actions.BUILD, partition, null));
if (partition.matches(".*\\d+.*")) {