This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c014ee89 [GOBBLIN-1828] Implement Timeout for Creating Writer 
Functionality (#3690)
0c014ee89 is described below

commit 0c014ee8938710f1b980d274fd07bdb098587f93
Author: Zihan Li <[email protected]>
AuthorDate: Mon May 1 16:00:18 2023 -0700

    [GOBBLIN-1828] Implement Timeout for Creating Writer Functionality (#3690)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1828] Implement Timeout for Creating Writer Functionality
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../apache/gobblin/writer/PartitionedDataWriter.java   | 13 +++++++++++--
 .../apache/gobblin/writer/PartitionedWriterTest.java   | 18 ++++++++++++++++--
 .../writer/test/TestPartitionAwareWriterBuilder.java   | 14 ++++++++++++++
 3 files changed, 41 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 800676da6..d5f0e8177 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -22,9 +22,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
 import org.apache.avro.SchemaBuilder;
@@ -115,6 +118,7 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
   @Getter
   @VisibleForTesting
   private long totalBytesFromEvictedWriters;
+  private ExecutorService createWriterPool;
 
 
   public PartitionedDataWriter(DataWriterBuilder<S, D> builder, final State 
state)
@@ -125,6 +129,7 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
     this.isSpeculativeAttemptSafe = true;
     this.isWatermarkCapable = true;
     this.baseWriterId = builder.getWriterId();
+    this.createWriterPool = Executors.newSingleThreadExecutor();
     this.closer = Closer.create();
     this.writerBuilder = builder;
     this.controlMessageHandler = new PartitionDataWriterMessageHandler();
@@ -170,9 +175,12 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
                     try {
                       log.info(String.format("Adding one more writer to 
loading cache of existing writer "
                           + "with size = %d", partitionWriters.size()));
-                      return createPartitionWriter(key);
-                    } catch (IOException e) {
+                      Future<DataWriter<D>> future = 
createWriterPool.submit(() -> createPartitionWriter(key));
+                      return future.get(writeTimeoutInterval, 
TimeUnit.SECONDS);
+                    } catch (ExecutionException | InterruptedException e) {
                       throw new RuntimeException("Error creating writer", e);
+                    } catch (TimeoutException e) {
+                      throw new RuntimeException(String.format("Failed to 
create writer due to timeout. The operation timed out after %s seconds.", 
writeTimeoutInterval), e);
                     }
                   }
                 }, state), state, key);
@@ -326,6 +334,7 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
       serializePartitionInfoToState();
     } finally {
       closeWritersInCache();
+      this.createWriterPool.shutdown();
       this.closer.close();
     }
   }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
index 68f3343f8..3148e5e5b 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.writer;
 
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.io.IOException;
 import java.util.List;
 
@@ -166,12 +167,25 @@ public class PartitionedWriterTest {
     writer.close();
   }
 
+  @Test
+  public void testTimeoutWhenCreatingWriter() throws IOException {
+    State state = new State();
+    state.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, 
TestPartitioner.class.getCanonicalName());
+    state.setProp(PartitionedDataWriter.PARTITIONED_WRITER_CACHE_TTL_SECONDS, 
6);
+    TestPartitionAwareWriterBuilder builder = new 
TestPartitionAwareWriterBuilder(true);
+
+    PartitionedDataWriter writer = new PartitionedDataWriter<String, 
String>(builder, state);
+
+    String record1 = "abc";
+    Assert.expectThrows(UncheckedExecutionException.class, () -> 
writer.writeEnvelope(new RecordEnvelope(record1)));
+  }
+
   @Test
   public void testPartitionWriterCacheRemovalListener()
       throws IOException, InterruptedException {
     State state = new State();
     state.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, 
TestPartitioner.class.getCanonicalName());
-    state.setProp(PartitionedDataWriter.PARTITIONED_WRITER_CACHE_TTL_SECONDS, 
1);
+    state.setProp(PartitionedDataWriter.PARTITIONED_WRITER_CACHE_TTL_SECONDS, 
3);
     TestPartitionAwareWriterBuilder builder = new 
TestPartitionAwareWriterBuilder();
 
     PartitionedDataWriter writer = new PartitionedDataWriter<String, 
String>(builder, state);
@@ -183,7 +197,7 @@ public class PartitionedWriterTest {
     writer.writeEnvelope(new RecordEnvelope(record2));
 
     //Sleep for more than cache expiration interval
-    Thread.sleep(1500);
+    Thread.sleep(3500);
 
     //Call cache clean up to ensure removal of expired entries.
     writer.getPartitionWriters().cleanUp();
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
index 8da42d728..75eafcf51 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
@@ -37,10 +37,17 @@ import lombok.Data;
 public class TestPartitionAwareWriterBuilder extends 
PartitionAwareDataWriterBuilder<String, String> {
 
   public final Queue<Action> actions = Queues.newArrayDeque();
+  private boolean testTimeout;
 
   public enum Actions {
     BUILD, WRITE, COMMIT, CLEANUP, CLOSE
   }
+  public TestPartitionAwareWriterBuilder() {
+    this(false);
+  }
+  public TestPartitionAwareWriterBuilder(boolean testTimeout) {
+    this.testTimeout = testTimeout;
+  }
 
   @Override
   public boolean validatePartitionSchema(Schema partitionSchema) {
@@ -50,6 +57,13 @@ public class TestPartitionAwareWriterBuilder extends 
PartitionAwareDataWriterBui
   @Override
   public DataWriter build()
       throws IOException {
+    if (testTimeout) {
+      try {
+        Thread.sleep(10*1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
     String partition = 
this.partition.get().get(TestPartitioner.PARTITION).toString();
     this.actions.add(new Action(Actions.BUILD, partition, null));
     if (partition.matches(".*\\d+.*")) {

Reply via email to