kfaraz commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213162306


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java:
##########
@@ -130,7 +130,7 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType> 
partition,
    *
    * @return set of partitions
    */
-  Set<PartitionIdType> getPartitionIds(String stream);

Review Comment:
   Do we need to add `StreamException` to the method declaration? It is an 
unchecked exception anyway. Plus, we might have to declare it for most of the 
other methods of this class too.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
+      throw new SamplerException(e, "Thread interrupted while seeking to 
partitions");
+    }
+    catch (StreamException e) {
+      throw new SamplerException(e, "Exception creating 
RecordSupplierInputSource while seeking to partitions: %s", 
Throwables.getRootCause(e).getMessage());

Review Comment:
   Since this would be a user facing message, I am not sure we need to include 
the part about `creating RecordSupplierInputSource`.
   
   We could have something similar to what we had earlier with a bit more info.
   ```
   Exception while seeking to <earliest/latest> offset of partitions in the 
stream [<topic>]
   ```
   
   I would even suggest moving the try-catch to the `assignAndSeek()` method 
itself and then using an appropriate message based on the stage the exception 
occurred in: get partitions, assign partitions, seek to offset.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java:
##########
@@ -135,6 +137,20 @@ public void testReadTimeout() throws IOException
     Assert.assertTrue(supplier.isClosed());
   }
 
+  @Test
+  public void 
testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
+  {
+    final RecordSupplier<?, ?, ?> supplier = 
Mockito.mock(RecordSupplier.class);
+    Mockito.when(supplier.getPartitionIds("test-stream"))
+           .thenThrow(new StreamException(new Exception("Something bad 
happened")));
+
+    //noinspection ResultOfObjectAllocationIgnored
+    Assert.assertThrows(

Review Comment:
   Maybe also verify (part of) the exception message.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");

Review Comment:
   It seems weird to throw a `SamplerException` from an `InputSource` 
implementation but apparently this impl is used only with the sampler. So I 
guess that is okay.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
+      throw new SamplerException(e, "Thread interrupted while seeking to 
partitions");
+    }
+    catch (StreamException e) {

Review Comment:
   I think I might prefer your alternative approach of catching all 
`Exception`s here and wrapping them up in a `SamplerException`, as we are not 
doing anything special with a `StreamException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to