This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fdaa2fcab Make RecordSupplierInputSource respect sampler timeout when 
stream is empty (#13296)
2fdaa2fcab is described below

commit 2fdaa2fcabc7ceb91568ce1e6b1fcede2da7602c
Author: Jonathan Wei <[email protected]>
AuthorDate: Thu Nov 3 17:45:35 2022 -0500

    Make RecordSupplierInputSource respect sampler timeout when stream is empty 
(#13296)
    
    * Make RecordSupplierInputSource respect sampler timeout when stream is 
empty
    
    * Rename timeout param, make it nullable, add timeout test
---
 .../seekablestream/RecordSupplierInputSource.java  | 24 ++++++++++++++++-
 .../seekablestream/SeekableStreamSamplerSpec.java  |  6 +++--
 .../overlord/sampler/InputSourceSamplerTest.java   |  2 +-
 .../RecordSupplierInputSourceTest.java             | 31 +++++++++++++++++++++-
 4 files changed, 58 insertions(+), 5 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
index c387571507..ee54f2ac22 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
@@ -31,6 +31,7 @@ import 
org.apache.druid.indexing.overlord.sampler.SamplerException;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 
 import javax.annotation.Nullable;
@@ -45,19 +46,28 @@ import java.util.stream.Collectors;
  */
 public class RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, 
RecordType extends ByteEntity> extends AbstractInputSource
 {
+  private static final Logger LOG = new 
Logger(RecordSupplierInputSource.class);
+
   private final String topic;
   private final RecordSupplier<PartitionIdType, SequenceOffsetType, 
RecordType> recordSupplier;
   private final boolean useEarliestOffset;
 
+  /**
+   * Maximum amount of time in which the entity iterator will return results. 
If null, no timeout is applied.
+   */
+  private final Integer iteratorTimeoutMs;
+
   public RecordSupplierInputSource(
       String topic,
       RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> 
recordSupplier,
-      boolean useEarliestOffset
+      boolean useEarliestOffset,
+      Integer iteratorTimeoutMs
   )
   {
     this.topic = topic;
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
+    this.iteratorTimeoutMs = iteratorTimeoutMs;
     try {
       assignAndSeek(recordSupplier);
     }
@@ -123,13 +133,24 @@ public class RecordSupplierInputSource<PartitionIdType, 
SequenceOffsetType, Reco
       private Iterator<OrderedPartitionableRecord<PartitionIdType, 
SequenceOffsetType, RecordType>> recordIterator;
       private Iterator<? extends ByteEntity> bytesIterator;
       private volatile boolean closed;
+      private final long createTime = System.currentTimeMillis();
+      private final Long terminationTime = iteratorTimeoutMs != null ? 
createTime + iteratorTimeoutMs : null;
 
       private void waitNextIteratorIfNecessary()
       {
         while (!closed && (bytesIterator == null || !bytesIterator.hasNext())) 
{
           while (!closed && (recordIterator == null || 
!recordIterator.hasNext())) {
+            if (terminationTime != null && System.currentTimeMillis() > 
terminationTime) {
+              LOG.info(
+                  "Configured sampler timeout [%s] has been exceeded, 
returning without a bytesIterator.",
+                  iteratorTimeoutMs
+              );
+              bytesIterator = null;
+              return;
+            }
             recordIterator = 
recordSupplier.poll(SeekableStreamSamplerSpec.POLL_TIMEOUT_MS).iterator();
           }
+
           if (!closed) {
             bytesIterator = recordIterator.next().getData().iterator();
           }
@@ -152,6 +173,7 @@ public class RecordSupplierInputSource<PartitionIdType, 
SequenceOffsetType, Reco
       @Override
       public void close()
       {
+        LOG.info("Closing entity iterator.");
         closed = true;
         recordSupplier.close();
       }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index 3be7a4feb1..f27cdb50fc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -106,7 +106,8 @@ public abstract class 
SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
       inputSource = new RecordSupplierInputSource<>(
           ioConfig.getStream(),
           recordSupplier,
-          ioConfig.isUseEarliestSequenceNumber()
+          ioConfig.isUseEarliestSequenceNumber(),
+          samplerConfig.getTimeoutMs() <= 0 ? null : 
samplerConfig.getTimeoutMs()
       );
       inputFormat = Preconditions.checkNotNull(
           ioConfig.getInputFormat(),
@@ -173,7 +174,8 @@ public abstract class 
SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
       RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, 
RecordType> inputSource = new RecordSupplierInputSource<>(
           ioConfig.getStream(),
           createRecordSupplier(),
-          ioConfig.isUseEarliestSequenceNumber()
+          ioConfig.isUseEarliestSequenceNumber(),
+          samplerConfig.getTimeoutMs() <= 0 ? null : 
samplerConfig.getTimeoutMs()
       );
       this.entityIterator = inputSource.createEntityIterator();
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
index af612572b5..cbd58d1adf 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
@@ -1230,7 +1230,7 @@ public class InputSourceSamplerTest extends 
InitializedNullHandlingTest
     );
 
     SamplerResponse response = inputSourceSampler.sample(
-        new RecordSupplierInputSource("topicName", new 
TestRecordSupplier(jsonBlockList), true),
+        new RecordSupplierInputSource("topicName", new 
TestRecordSupplier(jsonBlockList), true, 3000),
         createInputFormat(),
         dataSchema,
         new SamplerConfig(200, 3000/*default timeout is 10s, shorten it to 
speed up*/, null, null)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
index eb3a083527..3446403072 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
@@ -72,7 +72,7 @@ public class RecordSupplierInputSourceTest extends 
InitializedNullHandlingTest
   public void testRead() throws IOException
   {
     final RandomCsvSupplier supplier = new RandomCsvSupplier();
-    final InputSource inputSource = new RecordSupplierInputSource<>("topic", 
supplier, false);
+    final InputSource inputSource = new RecordSupplierInputSource<>("topic", 
supplier, false, null);
     final List<String> colNames = IntStream.range(0, NUM_COLS)
                                            .mapToObj(i -> 
StringUtils.format("col_%d", i))
                                            .collect(Collectors.toList());
@@ -100,6 +100,35 @@ public class RecordSupplierInputSourceTest extends 
InitializedNullHandlingTest
     Assert.assertTrue(supplier.isClosed());
   }
 
+  @Test
+  public void testReadTimeout() throws IOException
+  {
+    final RandomCsvSupplier supplier = new RandomCsvSupplier();
+    final InputSource inputSource = new RecordSupplierInputSource<>("topic", 
supplier, false, -1000);
+    final List<String> colNames = IntStream.range(0, NUM_COLS)
+                                           .mapToObj(i -> 
StringUtils.format("col_%d", i))
+                                           .collect(Collectors.toList());
+    final InputFormat inputFormat = new CsvInputFormat(colNames, null, null, 
false, 0);
+    final InputSourceReader reader = inputSource.reader(
+        new InputRowSchema(
+            new TimestampSpec("col_0", "auto", null),
+            new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(colNames.subList(1, 
colNames.size()))),
+            ColumnsFilter.all()
+        ),
+        inputFormat,
+        temporaryFolder.newFolder()
+    );
+
+    int read = 0;
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      for (; read < NUM_ROWS && iterator.hasNext(); read++) {
+        iterator.next();
+      }
+    }
+    Assert.assertEquals(0, read);
+    Assert.assertTrue(supplier.isClosed());
+  }
+
   private static class RandomCsvSupplier implements RecordSupplier<Integer, 
Integer, ByteEntity>
   {
     private static final int STR_LEN = 8;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to