This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 45014bd5b4 Handle all types of exceptions when initializing input 
source in sampler API (#14355)
45014bd5b4 is described below

commit 45014bd5b458851e2a8601101a4fa818cebb3004
Author: Andreas Maechler <[email protected]>
AuthorDate: Fri Jun 2 08:13:53 2023 -0600

    Handle all types of exceptions when initializing input source in sampler 
API (#14355)
    
    The sampler API returns a `400 bad request` response if it encounters a 
`SamplerException`.
    Otherwise, it returns a generic `500 Internal server error` response, with 
the message
    "The RuntimeException could not be mapped to a response, re-throwing to the 
HTTP container".
    
    This commit updates `RecordSupplierInputSource` to handle all types of 
exceptions instead of just
    `InterruptedException`and wrap them in a `SamplerException` so that the 
actual error is
    propagated back to the user.
---
 .../indexing/overlord/sampler/SamplerConfig.java   |  4 +-
 .../seekablestream/RecordSupplierInputSource.java  | 45 +++++++++++++---------
 .../RecordSupplierInputSourceTest.java             | 22 ++++++++++-
 3 files changed, 49 insertions(+), 22 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
index 8824decc05..b81d6964e4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
@@ -55,7 +55,7 @@ public class SamplerConfig
 
   /**
    * The maximum number of rows to return in a response. The actual number of 
returned rows may be less if:
-   *   - The sampled source contains less data.
+   *   - The sampled source contains fewer rows
    *   - {@link SamplerConfig#timeoutMs} elapses before this value is reached
    *   - {@link 
org.apache.druid.segment.indexing.granularity.GranularitySpec#isRollup()} is 
true and input rows get
    *     rolled-up into fewer indexed rows.
@@ -85,7 +85,7 @@ public class SamplerConfig
 
   /**
    * Maximum number of bytes in memory that the {@link 
org.apache.druid.segment.incremental.IncrementalIndex} used by
-   * {@link InputSourceSampler#sample(org.apache.druid.data.input.InputSource, 
org.apache.druid.data.input.InputFormat, 
org.apache.druid.segment.indexing.DataSchema, SamplerConfig})
+   * {@link InputSourceSampler#sample(org.apache.druid.data.input.InputSource, 
org.apache.druid.data.input.InputFormat, 
org.apache.druid.segment.indexing.DataSchema, SamplerConfig)}
    * will be allowed to accumulate before aborting sampling. Particularly 
useful for limiting footprint of sample
    * operations as well as overall response size from sample requests. 
However, it is not directly correlated to
    * response size since it also contains the "raw" input data, so actual 
responses will likely be at least twice the
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
index ee54f2ac22..e594c45739 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.seekablestream;
 
+import com.google.common.base.Throwables;
 import org.apache.druid.data.input.AbstractInputSource;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.data.input.InputFormat;
@@ -68,29 +69,35 @@ public class RecordSupplierInputSource<PartitionIdType, 
SequenceOffsetType, Reco
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
-    try {
-      assignAndSeek(recordSupplier);
-    }
-    catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
-    }
+
+    assignAndSeek(recordSupplier);
   }
 
   private void assignAndSeek(RecordSupplier<PartitionIdType, 
SequenceOffsetType, RecordType> recordSupplier)
-      throws InterruptedException
   {
-    final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
-        .getPartitionIds(topic)
-        .stream()
-        .map(partitionId -> StreamPartition.of(topic, partitionId))
-        .collect(Collectors.toSet());
-
-    recordSupplier.assign(partitions);
-
-    if (useEarliestOffset) {
-      recordSupplier.seekToEarliest(partitions);
-    } else {
-      recordSupplier.seekToLatest(partitions);
+    try {
+      final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
+          .getPartitionIds(topic)
+          .stream()
+          .map(partitionId -> StreamPartition.of(topic, partitionId))
+          .collect(Collectors.toSet());
+
+      recordSupplier.assign(partitions);
+
+      if (useEarliestOffset) {
+        recordSupplier.seekToEarliest(partitions);
+      } else {
+        recordSupplier.seekToLatest(partitions);
+      }
+    }
+    catch (Exception e) {
+      throw new SamplerException(
+          e,
+          "Exception while seeking to the [%s] offset of partitions in topic 
[%s]: %s",
+          useEarliestOffset ? "earliest" : "latest",
+          topic,
+          Throwables.getRootCause(e).getMessage()
+      );
     }
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
index cd714969e8..75a9bd0831 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
@@ -33,9 +33,11 @@ import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputStatsImpl;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.SamplerException;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
@@ -45,6 +47,7 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
 
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
@@ -62,7 +65,6 @@ import java.util.stream.IntStream;
 
 public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
 {
-
   private static final int NUM_COLS = 16;
   private static final int NUM_ROWS = 128;
   private static final String TIMESTAMP_STRING = "2019-01-01";
@@ -135,6 +137,24 @@ public class RecordSupplierInputSourceTest extends 
InitializedNullHandlingTest
     Assert.assertTrue(supplier.isClosed());
   }
 
+  @Test
+  public void 
testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
+  {
+    final RecordSupplier<?, ?, ?> supplier = 
Mockito.mock(RecordSupplier.class);
+    Mockito.when(supplier.getPartitionIds("test-stream"))
+           .thenThrow(new StreamException(new Exception("Something bad 
happened")));
+
+    //noinspection ResultOfObjectAllocationIgnored
+    final SamplerException exception = Assert.assertThrows(
+        SamplerException.class,
+        () -> new RecordSupplierInputSource<>("test-stream", supplier, false, 
null)
+    );
+    Assert.assertEquals(
+        "Exception while seeking to the [latest] offset of partitions in topic 
[test-stream]: Something bad happened",
+        exception.getMessage()
+    );
+  }
+
   private static class RandomCsvSupplier implements RecordSupplier<Integer, 
Integer, ByteEntity>
   {
     private static final int STR_LEN = 8;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to