This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2fdaa2fcab Make RecordSupplierInputSource respect sampler timeout when
stream is empty (#13296)
2fdaa2fcab is described below
commit 2fdaa2fcabc7ceb91568ce1e6b1fcede2da7602c
Author: Jonathan Wei <[email protected]>
AuthorDate: Thu Nov 3 17:45:35 2022 -0500
Make RecordSupplierInputSource respect sampler timeout when stream is empty
(#13296)
* Make RecordSupplierInputSource respect sampler timeout when stream is
empty
* Rename timeout param, make it nullable, add timeout test
---
.../seekablestream/RecordSupplierInputSource.java | 24 ++++++++++++++++-
.../seekablestream/SeekableStreamSamplerSpec.java | 6 +++--
.../overlord/sampler/InputSourceSamplerTest.java | 2 +-
.../RecordSupplierInputSourceTest.java | 31 +++++++++++++++++++++-
4 files changed, 58 insertions(+), 5 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
index c387571507..ee54f2ac22 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
@@ -31,6 +31,7 @@ import
org.apache.druid.indexing.overlord.sampler.SamplerException;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import javax.annotation.Nullable;
@@ -45,19 +46,28 @@ import java.util.stream.Collectors;
*/
public class RecordSupplierInputSource<PartitionIdType, SequenceOffsetType,
RecordType extends ByteEntity> extends AbstractInputSource
{
+ private static final Logger LOG = new
Logger(RecordSupplierInputSource.class);
+
private final String topic;
private final RecordSupplier<PartitionIdType, SequenceOffsetType,
RecordType> recordSupplier;
private final boolean useEarliestOffset;
+ /**
+ * Maximum amount of time in which the entity iterator will return results.
If null, no timeout is applied.
+ */
+ private final Integer iteratorTimeoutMs;
+
public RecordSupplierInputSource(
String topic,
RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType>
recordSupplier,
- boolean useEarliestOffset
+ boolean useEarliestOffset,
+ Integer iteratorTimeoutMs
)
{
this.topic = topic;
this.recordSupplier = recordSupplier;
this.useEarliestOffset = useEarliestOffset;
+ this.iteratorTimeoutMs = iteratorTimeoutMs;
try {
assignAndSeek(recordSupplier);
}
@@ -123,13 +133,24 @@ public class RecordSupplierInputSource<PartitionIdType,
SequenceOffsetType, Reco
private Iterator<OrderedPartitionableRecord<PartitionIdType,
SequenceOffsetType, RecordType>> recordIterator;
private Iterator<? extends ByteEntity> bytesIterator;
private volatile boolean closed;
+ private final long createTime = System.currentTimeMillis();
+ private final Long terminationTime = iteratorTimeoutMs != null ?
createTime + iteratorTimeoutMs : null;
private void waitNextIteratorIfNecessary()
{
while (!closed && (bytesIterator == null || !bytesIterator.hasNext()))
{
while (!closed && (recordIterator == null ||
!recordIterator.hasNext())) {
+ if (terminationTime != null && System.currentTimeMillis() >
terminationTime) {
+ LOG.info(
+ "Configured sampler timeout [%s] has been exceeded,
returning without a bytesIterator.",
+ iteratorTimeoutMs
+ );
+ bytesIterator = null;
+ return;
+ }
recordIterator =
recordSupplier.poll(SeekableStreamSamplerSpec.POLL_TIMEOUT_MS).iterator();
}
+
if (!closed) {
bytesIterator = recordIterator.next().getData().iterator();
}
@@ -152,6 +173,7 @@ public class RecordSupplierInputSource<PartitionIdType,
SequenceOffsetType, Reco
@Override
public void close()
{
+ LOG.info("Closing entity iterator.");
closed = true;
recordSupplier.close();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index 3be7a4feb1..f27cdb50fc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -106,7 +106,8 @@ public abstract class
SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
inputSource = new RecordSupplierInputSource<>(
ioConfig.getStream(),
recordSupplier,
- ioConfig.isUseEarliestSequenceNumber()
+ ioConfig.isUseEarliestSequenceNumber(),
+ samplerConfig.getTimeoutMs() <= 0 ? null :
samplerConfig.getTimeoutMs()
);
inputFormat = Preconditions.checkNotNull(
ioConfig.getInputFormat(),
@@ -173,7 +174,8 @@ public abstract class
SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
RecordSupplierInputSource<PartitionIdType, SequenceOffsetType,
RecordType> inputSource = new RecordSupplierInputSource<>(
ioConfig.getStream(),
createRecordSupplier(),
- ioConfig.isUseEarliestSequenceNumber()
+ ioConfig.isUseEarliestSequenceNumber(),
+ samplerConfig.getTimeoutMs() <= 0 ? null :
samplerConfig.getTimeoutMs()
);
this.entityIterator = inputSource.createEntityIterator();
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
index af612572b5..cbd58d1adf 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
@@ -1230,7 +1230,7 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
);
SamplerResponse response = inputSourceSampler.sample(
- new RecordSupplierInputSource("topicName", new
TestRecordSupplier(jsonBlockList), true),
+ new RecordSupplierInputSource("topicName", new
TestRecordSupplier(jsonBlockList), true, 3000),
createInputFormat(),
dataSchema,
new SamplerConfig(200, 3000/*default timeout is 10s, shorten it to
speed up*/, null, null)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
index eb3a083527..3446403072 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
@@ -72,7 +72,7 @@ public class RecordSupplierInputSourceTest extends
InitializedNullHandlingTest
public void testRead() throws IOException
{
final RandomCsvSupplier supplier = new RandomCsvSupplier();
- final InputSource inputSource = new RecordSupplierInputSource<>("topic",
supplier, false);
+ final InputSource inputSource = new RecordSupplierInputSource<>("topic",
supplier, false, null);
final List<String> colNames = IntStream.range(0, NUM_COLS)
.mapToObj(i ->
StringUtils.format("col_%d", i))
.collect(Collectors.toList());
@@ -100,6 +100,35 @@ public class RecordSupplierInputSourceTest extends
InitializedNullHandlingTest
Assert.assertTrue(supplier.isClosed());
}
+ @Test
+ public void testReadTimeout() throws IOException
+ {
+ final RandomCsvSupplier supplier = new RandomCsvSupplier();
+ final InputSource inputSource = new RecordSupplierInputSource<>("topic",
supplier, false, -1000);
+ final List<String> colNames = IntStream.range(0, NUM_COLS)
+ .mapToObj(i ->
StringUtils.format("col_%d", i))
+ .collect(Collectors.toList());
+ final InputFormat inputFormat = new CsvInputFormat(colNames, null, null,
false, 0);
+ final InputSourceReader reader = inputSource.reader(
+ new InputRowSchema(
+ new TimestampSpec("col_0", "auto", null),
+ new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(colNames.subList(1,
colNames.size()))),
+ ColumnsFilter.all()
+ ),
+ inputFormat,
+ temporaryFolder.newFolder()
+ );
+
+ int read = 0;
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ for (; read < NUM_ROWS && iterator.hasNext(); read++) {
+ iterator.next();
+ }
+ }
+ Assert.assertEquals(0, read);
+ Assert.assertTrue(supplier.isClosed());
+ }
+
private static class RandomCsvSupplier implements RecordSupplier<Integer,
Integer, ByteEntity>
{
private static final int STR_LEN = 8;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]