This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 45014bd5b4 Handle all types of exceptions when initializing input
source in sampler API (#14355)
45014bd5b4 is described below
commit 45014bd5b458851e2a8601101a4fa818cebb3004
Author: Andreas Maechler <[email protected]>
AuthorDate: Fri Jun 2 08:13:53 2023 -0600
Handle all types of exceptions when initializing input source in sampler
API (#14355)
The sampler API returns a `400 bad request` response if it encounters a
`SamplerException`.
Otherwise, it returns a generic `500 Internal server error` response, with
the message
"The RuntimeException could not be mapped to a response, re-throwing to the
HTTP container".
This commit updates `RecordSupplierInputSource` to handle all types of
exceptions instead of just
`InterruptedException`and wrap them in a `SamplerException` so that the
actual error is
propagated back to the user.
---
.../indexing/overlord/sampler/SamplerConfig.java | 4 +-
.../seekablestream/RecordSupplierInputSource.java | 45 +++++++++++++---------
.../RecordSupplierInputSourceTest.java | 22 ++++++++++-
3 files changed, 49 insertions(+), 22 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
index 8824decc05..b81d6964e4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
@@ -55,7 +55,7 @@ public class SamplerConfig
/**
* The maximum number of rows to return in a response. The actual number of
returned rows may be less if:
- * - The sampled source contains less data.
+ * - The sampled source contains fewer rows
* - {@link SamplerConfig#timeoutMs} elapses before this value is reached
* - {@link
org.apache.druid.segment.indexing.granularity.GranularitySpec#isRollup()} is
true and input rows get
* rolled-up into fewer indexed rows.
@@ -85,7 +85,7 @@ public class SamplerConfig
/**
* Maximum number of bytes in memory that the {@link
org.apache.druid.segment.incremental.IncrementalIndex} used by
- * {@link InputSourceSampler#sample(org.apache.druid.data.input.InputSource,
org.apache.druid.data.input.InputFormat,
org.apache.druid.segment.indexing.DataSchema, SamplerConfig})
+ * {@link InputSourceSampler#sample(org.apache.druid.data.input.InputSource,
org.apache.druid.data.input.InputFormat,
org.apache.druid.segment.indexing.DataSchema, SamplerConfig)}
* will be allowed to accumulate before aborting sampling. Particularly
useful for limiting footprint of sample
* operations as well as overall response size from sample requests.
However, it is not directly correlated to
* response size since it also contains the "raw" input data, so actual
responses will likely be at least twice the
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
index ee54f2ac22..e594c45739 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.seekablestream;
+import com.google.common.base.Throwables;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
@@ -68,29 +69,35 @@ public class RecordSupplierInputSource<PartitionIdType,
SequenceOffsetType, Reco
this.recordSupplier = recordSupplier;
this.useEarliestOffset = useEarliestOffset;
this.iteratorTimeoutMs = iteratorTimeoutMs;
- try {
- assignAndSeek(recordSupplier);
- }
- catch (InterruptedException e) {
- throw new SamplerException(e, "Exception while seeking to partitions");
- }
+
+ assignAndSeek(recordSupplier);
}
private void assignAndSeek(RecordSupplier<PartitionIdType,
SequenceOffsetType, RecordType> recordSupplier)
- throws InterruptedException
{
- final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
- .getPartitionIds(topic)
- .stream()
- .map(partitionId -> StreamPartition.of(topic, partitionId))
- .collect(Collectors.toSet());
-
- recordSupplier.assign(partitions);
-
- if (useEarliestOffset) {
- recordSupplier.seekToEarliest(partitions);
- } else {
- recordSupplier.seekToLatest(partitions);
+ try {
+ final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
+ .getPartitionIds(topic)
+ .stream()
+ .map(partitionId -> StreamPartition.of(topic, partitionId))
+ .collect(Collectors.toSet());
+
+ recordSupplier.assign(partitions);
+
+ if (useEarliestOffset) {
+ recordSupplier.seekToEarliest(partitions);
+ } else {
+ recordSupplier.seekToLatest(partitions);
+ }
+ }
+ catch (Exception e) {
+ throw new SamplerException(
+ e,
+ "Exception while seeking to the [%s] offset of partitions in topic
[%s]: %s",
+ useEarliestOffset ? "earliest" : "latest",
+ topic,
+ Throwables.getRootCause(e).getMessage()
+ );
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
index cd714969e8..75a9bd0831 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
@@ -33,9 +33,11 @@ import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.SamplerException;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
@@ -45,6 +47,7 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
@@ -62,7 +65,6 @@ import java.util.stream.IntStream;
public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
{
-
private static final int NUM_COLS = 16;
private static final int NUM_ROWS = 128;
private static final String TIMESTAMP_STRING = "2019-01-01";
@@ -135,6 +137,24 @@ public class RecordSupplierInputSourceTest extends
InitializedNullHandlingTest
Assert.assertTrue(supplier.isClosed());
}
+ @Test
+ public void
testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
+ {
+ final RecordSupplier<?, ?, ?> supplier =
Mockito.mock(RecordSupplier.class);
+ Mockito.when(supplier.getPartitionIds("test-stream"))
+ .thenThrow(new StreamException(new Exception("Something bad
happened")));
+
+ //noinspection ResultOfObjectAllocationIgnored
+ final SamplerException exception = Assert.assertThrows(
+ SamplerException.class,
+ () -> new RecordSupplierInputSource<>("test-stream", supplier, false,
null)
+ );
+ Assert.assertEquals(
+ "Exception while seeking to the [latest] offset of partitions in topic
[test-stream]: Something bad happened",
+ exception.getMessage()
+ );
+ }
+
private static class RandomCsvSupplier implements RecordSupplier<Integer,
Integer, ByteEntity>
{
private static final int STR_LEN = 8;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]