jihoonson commented on a change in pull request #10761:
URL: https://github.com/apache/druid/pull/10761#discussion_r565672493
##########
File path: docs/development/extensions-core/kinesis-ingestion.md
##########
@@ -193,6 +193,7 @@ The tuningConfig is optional and default parameters will be
used if no tuningCon
| `maxSavedParseExceptions` | Integer | When a parse
exception occurs, Druid can keep track of the most recent parse exceptions.
"maxSavedParseExceptions" limits how many exception instances will be saved.
These saved exceptions will be made available after the task finishes in the
[task completion report](../../ingestion/tasks.md#reports). Overridden if
`reportParseExceptions` is set.
| no, default == 0
|
| `maxRecordsPerPoll` | Integer | The maximum number
of records/events to be fetched from buffer per poll. The actual maximum will
be `Max(maxRecordsPerPoll, Max(bufferSize, 1))`
| no, default == 100
|
| `repartitionTransitionDuration` | ISO8601 Period | When shards are
split or merged, the supervisor will recompute shard -> task group mappings,
and signal any running tasks created under the old mappings to stop early at
(current time + `repartitionTransitionDuration`). Stopping the tasks early
allows Druid to begin reading from the new shards more quickly. The repartition
transition wait time controlled by this property gives the stream additional
time to write records to the new shards after the split/merge, which helps
avoid the issues with empty shard handling described at
https://github.com/apache/druid/issues/7600.
| no, (default == PT2M)
|
+| `offsetFetchPeriod` | ISO8601 Period | How often the
supervisor queries Kinesis and the indexing tasks to fetch current offsets and
calculate lag.
| no (default == PT30S, min == PT5S)
|
Review comment:
It could be better to clarify what will happen when `offsetFetchPeriod`
is smaller than `PT5S` because people can still set a smaller than the minimum.
Maybe we can add something like "The minimum period is `PT5S`.
`offsetFetchPeriod` smaller than the minimum is ignored, but the minimum period
is used instead."
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -106,12 +107,35 @@
private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000;
private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10;
- private static boolean isServiceExceptionRecoverable(AmazonServiceException
ex)
+ /**
+ * Checks whether an exception can be retried or not. Implementation is
copied
+ * from {@link
com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition} except
deprecated methods
+ * have been replaced with their recent versions.
+ */
+ @VisibleForTesting
+ static boolean isClientExceptionRecoverable(AmazonClientException exception)
Review comment:
The original method was copied from
`S3Utils.isServiceExceptionRecoverable()`. It would be better to consolidate
these methods into one instead of fixing both separately. A problem is
`S3Utils` is in `s3-extensions` and Druid extensions cannot have dependency on
each other (I think this is the reason why the method was originally copied). I
suggest creating a new util class in `aws-common` which is in Druid core and
move this logic to the new class.
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -106,12 +107,35 @@
private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000;
private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10;
- private static boolean isServiceExceptionRecoverable(AmazonServiceException
ex)
+ /**
+ * Checks whether an exception can be retried or not. Implementation is
copied
+ * from {@link
com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition} except
deprecated methods
Review comment:
nit: out of curiosity, what is the reason of not using
`SDKDefaultRetryCondition` directly? Is it to not handle the extra params of
that method (`originalRequest` and `retriesAttempted`)? If that's the case, it
seems reasonable to me to copy it since those params are not in use anyway.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
##########
@@ -578,6 +578,10 @@ public void run()
log.debug("All partitions have been fully read.");
publishOnStop.set(true);
stopRequested.set(true);
+
+ // We let the fireDepartmentMetrics know that all messages have
been read. This way, some metrics such as
+ // high message gap need not be reported
+ fireDepartmentMetrics.markProcessingDone();
Review comment:
The `messageGap` metric is used by both push and pull based streaming
ingestion. However, since the push-based streaming ingestion (Tranquility) is
deprecated, it looks OK to me to not fix it.
I'm more concerned about the test coverage of this change. Can you add a
unit test in `KafkaIndexTaskTest` and `KinesisIndexTaskTest`? It seems possible
to test this with the below changes:
- You need to expose the `FireDepartmentMetrics` created in the taskRunner.
- You need to know when the task enters the `PUBLISHING` status. One way to
do this is modifying the
`SegmentHandoffNotifier.registerSegmentHandoffCallback()` to notify the test
that the task is doing handoff. For example, `registerSegmentHandoffCallback`
can decrease the count of a latch, and the test can do something like this:
```java
latch.await();
Assert.assertNotEquals(
FireDepartmentMetrics.DEFAULT_COMPLETION_TIME,
task.getRunner().getFireDepartmentMetrics().completionTime()
);
```
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -366,8 +390,8 @@ private Runnable fetchRecords()
log.error(e, "encounted AWS error while attempting to fetch records,
will not retry");
throw e;
}
- catch (AmazonServiceException e) {
Review comment:
Good catch. I think the intention of `S3Utils.S3RETRY` is the same as
that of `SDKDefaultRetryCondition`, i.e., retrying all `IOException`s
(including the ones wrapped inside of another exception) and certain
`AmazonServiceException`s if they are transient. Missing retries of
`IOException`s is probably a mistake.
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java
##########
@@ -45,6 +47,7 @@
private final AtomicLong sinkCount = new AtomicLong(0);
private final AtomicLong messageMaxTimestamp = new AtomicLong(0);
private final AtomicLong messageGap = new AtomicLong(0);
+ private final AtomicLong completionTime = new
AtomicLong(DEFAULT_COMPLETION_TIME);
Review comment:
`completionTime` seems too broad. Suggest
`messageProcessingCompletionTime`.
##########
File path: services/src/main/java/org/apache/druid/cli/PullDependencies.java
##########
@@ -83,6 +83,9 @@
.put("commons-beanutils", "commons-beanutils")
.put("org.apache.commons", "commons-compress")
.put("org.apache.zookeeper", "zookeeper")
+ .put("com.fasterxml.jackson.core", "jackson-databind")
+ .put("com.fasterxml.jackson.core", "jackson-core")
+ .put("com.fasterxml.jackson.core", "jackson-annotations")
Review comment:
@abhishekagarwal87 thanks for the context. This seems to work, but I'm
worrying about the test coverage. Did you do any sort of testing to verify this
change especially with Hadoop?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]