suneet-s commented on code in PR #17442:
URL: https://github.com/apache/druid/pull/17442#discussion_r1829740240
##########
extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java:
##########
@@ -366,4 +368,47 @@ public void testReportPayload()
Assert.assertEquals(30 * 60, payload.getDurationSeconds());
}
+ @Test
+ public void testCreateTaskIOConfig()
+ {
+ supervisor = getSupervisor(
+ 1,
+ 1,
+ false,
+ "PT30M",
+ null,
+ null,
+ RabbitStreamSupervisorTest.dataSchema,
+ tuningConfig
+ );
+
+ Assert.assertEquals(supervisor.createTaskIoConfig(
Review Comment:
Can you split this into 2 lines - one to get the actual task duration, and
another to do the assert please.
The assert order should be reversed ie assertEquals(<expencted>, <actual>)
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java:
##########
@@ -134,4 +138,9 @@ public InputFormat getInputFormat()
{
return inputFormat;
}
+
+ public Duration getTaskDuration()
Review Comment:
Looks like this should be marked as nullable
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java:
##########
@@ -51,7 +53,8 @@ public SeekableStreamIndexTaskIOConfig(
final Boolean useTransaction,
final DateTime minimumMessageTime,
final DateTime maximumMessageTime,
- @Nullable final InputFormat inputFormat
+ @Nullable final InputFormat inputFormat,
+ @Nullable final Duration taskDuration // can be null for backward
compabitility
Review Comment:
Is there an advantage to using Duration? Would it be simpler to pass this in
as a long of minutes
```suggestion
@Nullable final Long refreshRejectionPeriodsInMinutes // can be null
for backward compabitility
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -2092,4 +2109,47 @@ protected abstract void possiblyResetDataSourceMetadata(
protected abstract boolean isEndOffsetExclusive();
protected abstract TypeReference<List<SequenceMetadata<PartitionIdType,
SequenceOffsetType>>> getSequenceMetadataTypeReference();
+
+
+ private void addTaskDurationToMinMaxTimes()
+ {
+ if (minMessageTime.isPresent()) {
+ minMessageTime = Optional.of(minMessageTime.get()
+
.plusMinutes(ioConfig.getTaskDuration()
+
.toStandardMinutes()
+
.getMinutes()));
+ }
+
+ if (maxMessageTime.isPresent()) {
+ maxMessageTime = Optional.of(maxMessageTime.get()
+
.plusMinutes(ioConfig.getTaskDuration()
+
.toStandardMinutes()
+
.getMinutes()));
+ }
+ }
+
+ public boolean withinMinMaxRecordTime(final InputRow row)
+ {
+ final boolean beforeMinimumMessageTime = minMessageTime.isPresent() &&
+
minMessageTime.get().isAfter(row.getTimestamp());
+ final boolean afterMaximumMessageTime = maxMessageTime.isPresent() &&
+
maxMessageTime.get().isBefore(row.getTimestamp());
+
+ if (log.isDebugEnabled()) {
+ if (beforeMinimumMessageTime) {
+ log.debug(
+ "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
+ row.getTimestamp(),
+ ioConfig.getMinimumMessageTime().get()
Review Comment:
```suggestion
minMessageTime
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -2092,4 +2109,47 @@ protected abstract void possiblyResetDataSourceMetadata(
protected abstract boolean isEndOffsetExclusive();
protected abstract TypeReference<List<SequenceMetadata<PartitionIdType,
SequenceOffsetType>>> getSequenceMetadataTypeReference();
+
+
+ private void addTaskDurationToMinMaxTimes()
+ {
+ if (minMessageTime.isPresent()) {
+ minMessageTime = Optional.of(minMessageTime.get()
+
.plusMinutes(ioConfig.getTaskDuration()
+
.toStandardMinutes()
+
.getMinutes()));
+ }
+
+ if (maxMessageTime.isPresent()) {
+ maxMessageTime = Optional.of(maxMessageTime.get()
+
.plusMinutes(ioConfig.getTaskDuration()
+
.toStandardMinutes()
+
.getMinutes()));
+ }
+ }
+
+ public boolean withinMinMaxRecordTime(final InputRow row)
+ {
+ final boolean beforeMinimumMessageTime = minMessageTime.isPresent() &&
+
minMessageTime.get().isAfter(row.getTimestamp());
+ final boolean afterMaximumMessageTime = maxMessageTime.isPresent() &&
+
maxMessageTime.get().isBefore(row.getTimestamp());
+
+ if (log.isDebugEnabled()) {
+ if (beforeMinimumMessageTime) {
+ log.debug(
+ "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
+ row.getTimestamp(),
+ ioConfig.getMinimumMessageTime().get()
+ );
+ } else if (afterMaximumMessageTime) {
+ log.debug(
+ "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
+ row.getTimestamp(),
+ ioConfig.getMaximumMessageTime().get()
Review Comment:
```suggestion
maxMessageTime
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -2092,4 +2109,47 @@ protected abstract void possiblyResetDataSourceMetadata(
protected abstract boolean isEndOffsetExclusive();
protected abstract TypeReference<List<SequenceMetadata<PartitionIdType,
SequenceOffsetType>>> getSequenceMetadataTypeReference();
+
+
+ private void addTaskDurationToMinMaxTimes()
+ {
+ if (minMessageTime.isPresent()) {
+ minMessageTime = Optional.of(minMessageTime.get()
Review Comment:
Is it ok that intelliJ is warning that this is an unsafe update of a
volatile variable?
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java:
##########
@@ -43,16 +43,20 @@ public class KafkaConsumerMonitor extends AbstractMonitor
ImmutableMap.<String, String>builder()
.put("bytes-consumed-total", "kafka/consumer/bytesConsumed")
.put("records-consumed-total",
"kafka/consumer/recordsConsumed")
+ .put("io-wait-time-ns-total", "kafka/consumer/io/time")
Review Comment:
unintended change from another patch
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -246,6 +248,9 @@ public enum Status
private final Map<PartitionIdType, Long> partitionsThroughput = new
HashMap<>();
+ private volatile Optional<DateTime> minMessageTime;
+ private volatile Optional<DateTime> maxMessageTime;
Review Comment:
nit: Instead of Optional, these could be DateTime objects and they could be
instantiated like which would simplify the code slightly
```
minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]