kfaraz commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213162306
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java:
##########
@@ -130,7 +130,7 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType>
partition,
*
* @return set of partitions
*/
- Set<PartitionIdType> getPartitionIds(String stream);
Review Comment:
Do we need to add `StreamException` to the method declaration? It is an
unchecked exception anyway. Plus, we might have to declare it for most of the
other methods of this class too.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
this.recordSupplier = recordSupplier;
this.useEarliestOffset = useEarliestOffset;
this.iteratorTimeoutMs = iteratorTimeoutMs;
+
try {
assignAndSeek(recordSupplier);
}
catch (InterruptedException e) {
- throw new SamplerException(e, "Exception while seeking to partitions");
+ throw new SamplerException(e, "Thread interrupted while seeking to
partitions");
+ }
+ catch (StreamException e) {
+ throw new SamplerException(e, "Exception creating
RecordSupplierInputSource while seeking to partitions: %s",
Throwables.getRootCause(e).getMessage());
Review Comment:
Since this would be a user facing message, I am not sure we need to include
the part about `creating RecordSupplierInputSource`.
We could have something similar to what we had earlier with a bit more info.
```
Exception while seeking to <earliest/latest> offset of partitions in the
stream [<topic>]
```
I would even suggest moving the try-catch to the `assignAndSeek()` method
itself and then using an appropriate message based on the stage the exception
occurred in: get partitions, assign partitions, seek to offset.
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java:
##########
@@ -135,6 +137,20 @@ public void testReadTimeout() throws IOException
Assert.assertTrue(supplier.isClosed());
}
+ @Test
+ public void
testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
+ {
+ final RecordSupplier<?, ?, ?> supplier =
Mockito.mock(RecordSupplier.class);
+ Mockito.when(supplier.getPartitionIds("test-stream"))
+ .thenThrow(new StreamException(new Exception("Something bad
happened")));
+
+ //noinspection ResultOfObjectAllocationIgnored
+ Assert.assertThrows(
Review Comment:
Maybe also verify (part of) the exception message.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
this.recordSupplier = recordSupplier;
this.useEarliestOffset = useEarliestOffset;
this.iteratorTimeoutMs = iteratorTimeoutMs;
+
try {
assignAndSeek(recordSupplier);
}
catch (InterruptedException e) {
- throw new SamplerException(e, "Exception while seeking to partitions");
Review Comment:
It seems weird to throw a `SamplerException` from an `InputSource`
implementation but apparently this impl is used only with the sampler. So I
guess that is okay.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
this.recordSupplier = recordSupplier;
this.useEarliestOffset = useEarliestOffset;
this.iteratorTimeoutMs = iteratorTimeoutMs;
+
try {
assignAndSeek(recordSupplier);
}
catch (InterruptedException e) {
- throw new SamplerException(e, "Exception while seeking to partitions");
+ throw new SamplerException(e, "Thread interrupted while seeking to
partitions");
+ }
+ catch (StreamException e) {
Review Comment:
I think I might prefer your alternative approach of catching all
`Exception`s here and wrapping them up in a `SamplerException`, as we are not
doing anything special with a `StreamException`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]