This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c5e2f82752d Minor cleanup of SeekableStreamSupervisor classes (#17733)
c5e2f82752d is described below
commit c5e2f82752db0b11fd1bbe4fd35e5c1f0efdb980
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Feb 18 17:39:51 2025 +0530
Minor cleanup of SeekableStreamSupervisor classes (#17733)
Changes:
- Change fields minimumMessageTime and maximumMessageTime from Optional to
plain nullable fields
- Clean up other logs and comments
---
.../RabbitStreamIndexTaskIOConfigTest.java | 30 ++--
.../indexing/kafka/supervisor/KafkaSupervisor.java | 21 +--
.../druid/indexing/kafka/KafkaIOConfigTest.java | 15 +-
.../supervisor/KafkaSupervisorIOConfigTest.java | 3 +-
.../kafka/supervisor/KafkaSupervisorTest.java | 92 ++++++-------
.../indexing/kinesis/KinesisRecordSupplier.java | 17 +--
.../indexing/kinesis/KinesisIOConfigTest.java | 24 ++--
.../supervisor/KinesisSupervisorIOConfigTest.java | 3 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 46 +++----
.../SeekableStreamIndexTaskIOConfig.java | 31 ++---
.../SeekableStreamIndexTaskRunner.java | 5 +-
.../supervisor/SeekableStreamSupervisor.java | 152 +++++++++++----------
.../SeekableStreamIndexTaskRunnerTest.java | 9 +-
.../SeekableStreamSupervisorStateTest.java | 96 ++++++-------
14 files changed, 260 insertions(+), 284 deletions(-)
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java
b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java
index ef77c58163c..7783e148777 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java
@@ -19,15 +19,12 @@
package org.apache.druid.indexing.rabbitstream;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.indexing.IOConfig;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import java.util.Collections;
@@ -38,12 +35,9 @@ public class RabbitStreamIndexTaskIOConfigTest
public RabbitStreamIndexTaskIOConfigTest()
{
mapper = new DefaultObjectMapper();
- mapper.registerModules((Iterable<Module>) new
RabbitStreamIndexTaskModule().getJacksonModules());
+ mapper.registerModules(new
RabbitStreamIndexTaskModule().getJacksonModules());
}
- @Rule
- public final ExpectedException exception = ExpectedException.none();
-
@Test
public void testSerdeWithDefaults() throws Exception
{
@@ -56,31 +50,29 @@ public class RabbitStreamIndexTaskIOConfigTest
+ "}";
RabbitStreamIndexTaskIOConfig config = (RabbitStreamIndexTaskIOConfig)
mapper.readValue(
- mapper.writeValueAsString(
- mapper.readValue(
- jsonStr,
- IOConfig.class)),
- IOConfig.class);
+ mapper.writeValueAsString(mapper.readValue(jsonStr, IOConfig.class)),
+ IOConfig.class
+ );
Assert.assertNull(config.getTaskGroupId());
Assert.assertEquals("my-sequence-name", config.getBaseSequenceName());
Assert.assertEquals("mystream",
config.getStartSequenceNumbers().getStream());
- Assert.assertEquals(Long.class,
-
config.getStartSequenceNumbers().getPartitionSequenceNumberMap().get("stream-1").getClass());
Assert.assertEquals(
- ImmutableMap.of("stream-0", Long.valueOf(1), "stream-1",
Long.valueOf(10)),
- config.getStartSequenceNumbers().getPartitionSequenceNumberMap());
+ ImmutableMap.of("stream-0", 1L, "stream-1", 10L),
+ config.getStartSequenceNumbers().getPartitionSequenceNumberMap()
+ );
Assert.assertEquals("mystream",
config.getEndSequenceNumbers().getStream());
Assert.assertEquals(
- ImmutableMap.of("stream-0", Long.valueOf(15L), "stream-1",
Long.valueOf(200L)),
- config.getEndSequenceNumbers().getPartitionSequenceNumberMap());
+ ImmutableMap.of("stream-0", 15L, "stream-1", 200L),
+ config.getEndSequenceNumbers().getPartitionSequenceNumberMap()
+ );
Assert.assertTrue(config.isUseTransaction());
- Assert.assertFalse("minimumMessageTime",
config.getMinimumMessageTime().isPresent());
+ Assert.assertNull("minimumMessageTime", config.getMinimumMessageTime());
Assert.assertEquals(config.getUri(), "rabbitmq-stream://localhost:5552");
Assert.assertEquals(Collections.emptySet(),
config.getStartSequenceNumbers().getExclusivePartitions());
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index f449c158789..a0c7494e3bb 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -55,8 +55,6 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.joda.time.DateTime;
@@ -91,12 +89,9 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
private static final Long NOT_SET = -1L;
private static final Long END_OF_PARTITION = Long.MAX_VALUE;
- private final ServiceEmitter emitter;
- private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
private final Pattern pattern;
private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
-
private final KafkaSupervisorSpec spec;
public KafkaSupervisor(
@@ -122,8 +117,6 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
);
this.spec = spec;
- this.emitter = spec.getEmitter();
- this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
this.pattern = getIoConfig().isMultiTopic() ?
Pattern.compile(getIoConfig().getStream()) : null;
}
@@ -267,7 +260,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
if
(!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
log.warn(
- "Lag metric: Kafka partitions %s do not match task partitions %s",
+ "Kafka partitions[%s] do not match task partitions[%s]",
latestSequenceFromStream.keySet(),
highestCurrentOffsets.keySet()
);
@@ -307,9 +300,6 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
}
@Override
- // suppress use of CollectionUtils.mapValues() since the valueMapper
function is dependent on map key here
- @SuppressWarnings("SSBasedInspection")
- // Used while generating Supervisor lag reports per task
protected Map<KafkaTopicPartition, Long>
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
{
if (latestSequenceFromStream == null || currentOffsets == null) {
@@ -333,6 +323,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
@Override
protected Map<KafkaTopicPartition, Long>
getTimeLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
{
+ // Currently not supported
return null;
}
@@ -389,6 +380,11 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
return computeLags(partitionRecordLag);
}
+ /**
+ * Fetches the latest offsets from the Kafka stream and updates the map
+ * {@link #latestSequenceFromStream}. The actual lag is computed lazily in
+ * {@link #getPartitionRecordLag}.
+ */
@Override
protected void updatePartitionLagFromStream()
{
@@ -410,9 +406,6 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
recordSupplier.seekToLatest(partitions);
- // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
- // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
- // task offsets from the latest offsets from the stream when it is needed
latestSequenceFromStream =
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId,
recordSupplier::getPosition));
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
index d7e50fe13c0..187c13df9ff 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
@@ -42,7 +41,7 @@ public class KafkaIOConfigTest
public KafkaIOConfigTest()
{
mapper = new DefaultObjectMapper();
- mapper.registerModules((Iterable<Module>) new
KafkaIndexTaskModule().getJacksonModules());
+ mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
}
@Rule
@@ -78,8 +77,8 @@ public class KafkaIOConfigTest
200L),
config.getEndSequenceNumbers().getPartitionSequenceNumberMap());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers",
"localhost:9092"), config.getConsumerProperties());
Assert.assertTrue(config.isUseTransaction());
- Assert.assertFalse("minimumMessageTime",
config.getMinimumMessageTime().isPresent());
- Assert.assertFalse("maximumMessageTime",
config.getMaximumMessageTime().isPresent());
+ Assert.assertNull("minimumMessageTime", config.getMinimumMessageTime());
+ Assert.assertNull("maximumMessageTime", config.getMaximumMessageTime());
Assert.assertEquals(Collections.emptySet(),
config.getStartSequenceNumbers().getExclusivePartitions());
}
@@ -113,8 +112,8 @@ public class KafkaIOConfigTest
200L),
config.getEndSequenceNumbers().getPartitionSequenceNumberMap());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers",
"localhost:9092"), config.getConsumerProperties());
Assert.assertTrue(config.isUseTransaction());
- Assert.assertFalse("minimumMessageTime",
config.getMinimumMessageTime().isPresent());
- Assert.assertFalse("maximumMessageTime",
config.getMaximumMessageTime().isPresent());
+ Assert.assertNull("minimumMessageTime", config.getMinimumMessageTime());
+ Assert.assertNull("maximumMessageTime", config.getMaximumMessageTime());
Assert.assertEquals(Collections.emptySet(),
config.getStartSequenceNumbers().getExclusivePartitions());
}
@@ -151,8 +150,8 @@ public class KafkaIOConfigTest
200L),
config.getEndSequenceNumbers().getPartitionSequenceNumberMap());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers",
"localhost:9092"), config.getConsumerProperties());
Assert.assertFalse(config.isUseTransaction());
- Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"),
config.getMinimumMessageTime().get());
- Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"),
config.getMaximumMessageTime().get());
+ Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"),
config.getMinimumMessageTime());
+ Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"),
config.getMaximumMessageTime());
Assert.assertEquals(Collections.emptySet(),
config.getStartSequenceNumbers().getExclusivePartitions());
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index c2aee78b3cb..64ac05865d6 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
@@ -51,7 +50,7 @@ public class KafkaSupervisorIOConfigTest
public KafkaSupervisorIOConfigTest()
{
mapper = new DefaultObjectMapper();
- mapper.registerModules((Iterable<Module>) new
KafkaIndexTaskModule().getJacksonModules());
+ mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
}
@Rule
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c24ba243d6f..1151bec626c 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -423,8 +423,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("myCustomValue",
taskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
- Assert.assertFalse("minimumMessageTime",
taskConfig.getMinimumMessageTime().isPresent());
- Assert.assertFalse("maximumMessageTime",
taskConfig.getMaximumMessageTime().isPresent());
+ Assert.assertNull("minimumMessageTime",
taskConfig.getMinimumMessageTime());
+ Assert.assertNull("maximumMessageTime",
taskConfig.getMaximumMessageTime());
Assert.assertEquals(topic,
taskConfig.getStartSequenceNumbers().getStream());
Assert.assertEquals(
@@ -563,8 +563,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("myCustomValue",
taskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
- Assert.assertFalse("minimumMessageTime",
taskConfig.getMinimumMessageTime().isPresent());
- Assert.assertFalse("maximumMessageTime",
taskConfig.getMaximumMessageTime().isPresent());
+ Assert.assertNull("minimumMessageTime",
taskConfig.getMinimumMessageTime());
+ Assert.assertNull("maximumMessageTime",
taskConfig.getMaximumMessageTime());
Assert.assertEquals(topic,
taskConfig.getStartSequenceNumbers().getStream());
Assert.assertEquals(
@@ -816,15 +816,15 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertTrue(
"minimumMessageTime",
-
task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(59).isBeforeNow()
+
task1.getIOConfig().getMinimumMessageTime().plusMinutes(59).isBeforeNow()
);
Assert.assertTrue(
"minimumMessageTime",
-
task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(61).isAfterNow()
+
task1.getIOConfig().getMinimumMessageTime().plusMinutes(61).isAfterNow()
);
Assert.assertEquals(
- task1.getIOConfig().getMinimumMessageTime().get(),
- task2.getIOConfig().getMinimumMessageTime().get()
+ task1.getIOConfig().getMinimumMessageTime(),
+ task2.getIOConfig().getMinimumMessageTime()
);
}
@@ -855,15 +855,15 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertTrue(
"maximumMessageTime",
- task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(59 +
60).isAfterNow()
+ task1.getIOConfig().getMaximumMessageTime().minusMinutes(59 +
60).isAfterNow()
);
Assert.assertTrue(
"maximumMessageTime",
- task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(61 +
60).isBeforeNow()
+ task1.getIOConfig().getMaximumMessageTime().minusMinutes(61 +
60).isBeforeNow()
);
Assert.assertEquals(
- task1.getIOConfig().getMaximumMessageTime().get(),
- task2.getIOConfig().getMaximumMessageTime().get()
+ task1.getIOConfig().getMaximumMessageTime(),
+ task2.getIOConfig().getMaximumMessageTime()
);
}
@@ -1527,8 +1527,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
.times(1);
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
- taskQueue.shutdown("id4", "Task [%s] failed to stop in a timely manner,
killing task", "id4");
- taskQueue.shutdown("id5", "Task [%s] failed to stop in a timely manner,
killing task", "id5");
+ taskQueue.shutdown("id4", "Failed to stop in a timely manner");
+ taskQueue.shutdown("id5", "Failed to stop in a timely manner");
replayAll();
supervisor.start();
@@ -1682,7 +1682,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
// check that replica tasks are created with the same minimumMessageTime
as tasks inherited from another supervisor
- Assert.assertEquals(now, ((KafkaIndexTask)
captured.getValue()).getIOConfig().getMinimumMessageTime().get());
+ Assert.assertEquals(now, ((KafkaIndexTask)
captured.getValue()).getIOConfig().getMinimumMessageTime());
// test that a task failing causes a new task to be re-queued with the
same parameters
String runningTaskId = captured.getValue().getId();
@@ -1728,10 +1728,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
// check that failed tasks are recreated with the same minimumMessageTime
as the task it replaced, even if that
// task came from another supervisor
- Assert.assertEquals(now, ((KafkaIndexTask)
aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime().get());
+ Assert.assertEquals(now, ((KafkaIndexTask)
aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime());
Assert.assertEquals(
maxi,
- ((KafkaIndexTask)
aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime().get()
+ ((KafkaIndexTask)
aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime()
);
}
@@ -2427,16 +2427,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
0,
singlePartitionMap(topic, 0, 0L, 2, 0L),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("id1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
1,
singlePartitionMap(topic, 1, 0L),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("id2"),
ImmutableSet.of()
);
@@ -2581,16 +2581,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
0,
singlePartitionMap(topic, 0, 0L, 2, 0L),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("id1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
1,
singlePartitionMap(topic, 1, 0L),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("id2"),
ImmutableSet.of()
);
@@ -2917,16 +2917,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
0,
singlePartitionMap(topic, 0, 0L, 2, 0L),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("id1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
1,
singlePartitionMap(topic, 1, 0L),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("id2"),
ImmutableSet.of()
);
@@ -4310,8 +4310,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("myCustomValue",
taskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
- Assert.assertFalse("minimumMessageTime",
taskConfig.getMinimumMessageTime().isPresent());
- Assert.assertFalse("maximumMessageTime",
taskConfig.getMaximumMessageTime().isPresent());
+ Assert.assertNull("minimumMessageTime",
taskConfig.getMinimumMessageTime());
+ Assert.assertNull("maximumMessageTime",
taskConfig.getMaximumMessageTime());
Assert.assertEquals(topic,
taskConfig.getStartSequenceNumbers().getStream());
Assert.assertEquals(
@@ -4367,8 +4367,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition(new KafkaTopicPartition(false,
topic, 0)),
singlePartitionMap(topic, 0, 0L),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -4376,8 +4376,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition(new KafkaTopicPartition(false,
topic, 1)),
singlePartitionMap(topic, 0, 0L),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -4409,8 +4409,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition(new KafkaTopicPartition(false,
topic, 0)),
singlePartitionMap(topic, 0, 0L),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -4418,8 +4418,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition(new KafkaTopicPartition(false,
topic, 1)),
singlePartitionMap(topic, 0, 0L),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -4657,8 +4657,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
42,
singlePartitionMap(topic, 0, 0L, 2, 0L),
- Optional.of(minMessageTime),
- Optional.of(maxMessageTime),
+ minMessageTime,
+ maxMessageTime,
ImmutableSet.of("id1", "id2", "id3", "id4"),
ImmutableSet.of()
);
@@ -4873,8 +4873,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
0,
task1.getIOConfig().getStartSequenceNumbers(),
task1.getIOConfig().getEndSequenceNumbers(),
- task1.getIOConfig().getMinimumMessageTime().get(),
- task1.getIOConfig().getMaximumMessageTime().get(),
+ task1.getIOConfig().getMinimumMessageTime(),
+ task1.getIOConfig().getMaximumMessageTime(),
dataSchema,
supervisor.getTuningConfig()
);
@@ -5075,7 +5075,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
- // Only the active i.e non-publishing tasks are resumed
+ // Only the active i.e. non-publishing tasks are resumed
EasyMock.expect(taskClient.getStartTimeAsync(readingTask.getId())).andReturn(Futures.immediateFuture(startTime));
EasyMock.expect(taskClient.resumeAsync(readingTask.getId())).andReturn(Futures.immediateFuture(true));
@@ -5804,8 +5804,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Override
public String generateSequenceName(
Map<KafkaTopicPartition, Long> startPartitions,
- Optional<DateTime> minimumMessageTime,
- Optional<DateTime> maximumMessageTime,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
DataSchema dataSchema,
SeekableStreamIndexTaskTuningConfig tuningConfig
)
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index 07a0da32a95..b649927c276 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -225,8 +225,6 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Kin
null
);
- recordsResult = null;
-
recordBufferOfferWaitMillis = recordBufferOfferTimeout;
while (!records.offer(
new
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, 0),
@@ -412,7 +410,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Kin
private final boolean backgroundFetchEnabled;
private volatile boolean closed = false;
- private AtomicBoolean partitionsFetchStarted = new AtomicBoolean();
+ private final AtomicBoolean partitionsFetchStarted = new AtomicBoolean();
public KinesisRecordSupplier(
AmazonKinesis amazonKinesis,
@@ -873,7 +871,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Kin
* {@link #GET_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first
sequence number from the result set.
* This method is thread safe as it does not depend on the internal state of
the supplier (it doesn't use the
* {@link PartitionResource} which have been assigned to the supplier), and
the Kinesis client is thread safe.
- *
+ * <p>
* When there are no records at the offset corresponding to the
ShardIteratorType,
* If shard is closed, return custom EOS sequence marker
* While getting the earliest sequence number, return a custom marker
corresponding to TRIM_HORIZON
@@ -973,7 +971,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Kin
// If no more new data after offsetToUse, it means there is no lag for
now.
// So report lag points as 0L.
- if (recordsResult.getRecords().size() == 0) {
+ if (recordsResult.getRecords().isEmpty()) {
return 0L;
} else {
recordsResult = getRecordsForLag(iteratorType, offsetToUse, partition);
@@ -992,10 +990,9 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Kin
offsetToUse
).getShardIterator();
- GetRecordsResult recordsResult = kinesis.getRecords(
- new
GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
+ return kinesis.getRecords(
+ new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
);
- return recordsResult;
}
/**
@@ -1011,7 +1008,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Kin
/**
* This method must be called before a seek operation ({@link #seek}, {@link
#seekToLatest}, or
* {@link #seekToEarliest}).
- *
+ * <p>
* When called, it will nuke the {@link #scheduledExec} that is shared by
all {@link PartitionResource}, filters
* records from the buffer for partitions which will have a seek operation
performed, and stops background fetch for
* each {@link PartitionResource} to prepare for the seek. If background
fetch is not currently running, the
@@ -1060,6 +1057,6 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Kin
records = newQ;
// restart fetching threads
- partitionResources.values().forEach(x -> x.stopBackgroundFetch());
+
partitionResources.values().forEach(PartitionResource::stopBackgroundFetch);
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
index 000be2830d6..f093f1c2ec2 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
@@ -22,10 +22,8 @@ package org.apache.druid.indexing.kinesis;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@@ -53,7 +51,7 @@ public class KinesisIOConfigTest
public KinesisIOConfigTest()
{
mapper = new DefaultObjectMapper();
- mapper.registerModules((Iterable<Module>) new
KinesisIndexingServiceModule().getJacksonModules());
+ mapper.registerModules(new
KinesisIndexingServiceModule().getJacksonModules());
}
@Rule
@@ -92,7 +90,7 @@ public class KinesisIOConfigTest
config.getEndSequenceNumbers().getPartitionSequenceNumberMap()
);
Assert.assertTrue(config.isUseTransaction());
- Assert.assertFalse("minimumMessageTime",
config.getMinimumMessageTime().isPresent());
+ Assert.assertNull("minimumMessageTime", config.getMinimumMessageTime());
Assert.assertEquals(config.getEndpoint(),
"kinesis.us-east-1.amazonaws.com");
Assert.assertEquals(config.getFetchDelayMillis(), 0);
Assert.assertEquals(Collections.emptySet(),
config.getStartSequenceNumbers().getExclusivePartitions());
@@ -140,10 +138,8 @@ public class KinesisIOConfigTest
config.getEndSequenceNumbers().getPartitionSequenceNumberMap()
);
Assert.assertFalse(config.isUseTransaction());
- Assert.assertTrue("maximumMessageTime",
config.getMaximumMessageTime().isPresent());
- Assert.assertTrue("minimumMessageTime",
config.getMinimumMessageTime().isPresent());
- Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"),
config.getMinimumMessageTime().get());
- Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"),
config.getMaximumMessageTime().get());
+ Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"),
config.getMinimumMessageTime());
+ Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"),
config.getMaximumMessageTime());
Assert.assertEquals(config.getEndpoint(),
"kinesis.us-east-2.amazonaws.com");
Assert.assertEquals(config.getStartSequenceNumbers().getExclusivePartitions(),
ImmutableSet.of("0"));
Assert.assertEquals(1000, config.getFetchDelayMillis());
@@ -350,8 +346,8 @@ public class KinesisIOConfigTest
private final SeekableStreamEndSequenceNumbers<String, String>
endPartitions;
private final Set<String> exclusiveStartSequenceNumberPartitions;
private final boolean useTransaction;
- private final Optional<DateTime> minimumMessageTime;
- private final Optional<DateTime> maximumMessageTime;
+ private final DateTime minimumMessageTime;
+ private final DateTime maximumMessageTime;
private final String endpoint;
private final Integer fetchDelayMillis;
private final String awsAssumedRoleArn;
@@ -377,8 +373,8 @@ public class KinesisIOConfigTest
this.endPartitions = endPartitions;
this.exclusiveStartSequenceNumberPartitions =
exclusiveStartSequenceNumberPartitions;
this.useTransaction = useTransaction;
- this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
- this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
+ this.minimumMessageTime = minimumMessageTime;
+ this.maximumMessageTime = maximumMessageTime;
this.endpoint = endpoint;
this.fetchDelayMillis = fetchDelayMillis;
this.awsAssumedRoleArn = awsAssumedRoleArn;
@@ -416,13 +412,13 @@ public class KinesisIOConfigTest
}
@JsonProperty
- public Optional<DateTime> getMinimumMessageTime()
+ public DateTime getMinimumMessageTime()
{
return minimumMessageTime;
}
@JsonProperty
- public Optional<DateTime> getMaximumMessageTime()
+ public DateTime getMaximumMessageTime()
{
return maximumMessageTime;
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
index 9f5c0bf7504..aa922b008a5 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexing.kinesis.supervisor;
import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
import org.apache.druid.indexing.kinesis.KinesisRegion;
@@ -39,7 +38,7 @@ public class KinesisSupervisorIOConfigTest
public KinesisSupervisorIOConfigTest()
{
mapper = new DefaultObjectMapper();
- mapper.registerModules((Iterable<Module>) new
KinesisIndexingServiceModule().getJacksonModules());
+ mapper.registerModules(new
KinesisIndexingServiceModule().getJacksonModules());
}
@Rule
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index f4913d7a8b7..4f8b3720a24 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -265,8 +265,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
KinesisIndexTaskIOConfig taskConfig = task.getIOConfig();
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
- Assert.assertFalse("minimumMessageTime",
taskConfig.getMinimumMessageTime().isPresent());
- Assert.assertFalse("maximumMessageTime",
taskConfig.getMaximumMessageTime().isPresent());
+ Assert.assertNull("minimumMessageTime",
taskConfig.getMinimumMessageTime());
+ Assert.assertNull("maximumMessageTime",
taskConfig.getMaximumMessageTime());
Assert.assertEquals(STREAM,
taskConfig.getStartSequenceNumbers().getStream());
Assert.assertEquals(
@@ -741,15 +741,15 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertTrue(
"minimumMessageTime",
-
task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(59).isBeforeNow()
+
task1.getIOConfig().getMinimumMessageTime().plusMinutes(59).isBeforeNow()
);
Assert.assertTrue(
"minimumMessageTime",
-
task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(61).isAfterNow()
+
task1.getIOConfig().getMinimumMessageTime().plusMinutes(61).isAfterNow()
);
Assert.assertEquals(
- task1.getIOConfig().getMinimumMessageTime().get(),
- task2.getIOConfig().getMinimumMessageTime().get()
+ task1.getIOConfig().getMinimumMessageTime(),
+ task2.getIOConfig().getMinimumMessageTime()
);
}
@@ -793,15 +793,15 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertTrue(
"maximumMessageTime",
- task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(59 +
60).isAfterNow()
+ task1.getIOConfig().getMaximumMessageTime().minusMinutes(59 +
60).isAfterNow()
);
Assert.assertTrue(
"maximumMessageTime",
- task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(61 +
60).isBeforeNow()
+ task1.getIOConfig().getMaximumMessageTime().minusMinutes(61 +
60).isBeforeNow()
);
Assert.assertEquals(
- task1.getIOConfig().getMaximumMessageTime().get(),
- task2.getIOConfig().getMaximumMessageTime().get()
+ task1.getIOConfig().getMaximumMessageTime(),
+ task2.getIOConfig().getMaximumMessageTime()
);
}
@@ -1292,7 +1292,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
verifyAll();
// check that replica tasks are created with the same minimumMessageTime
as tasks inherited from another supervisor
- Assert.assertEquals(now, ((KinesisIndexTask)
captured.getValue()).getIOConfig().getMinimumMessageTime().get());
+ Assert.assertEquals(now, ((KinesisIndexTask)
captured.getValue()).getIOConfig().getMinimumMessageTime());
// test that a task failing causes a new task to be re-queued with the
same parameters
String runningTaskId = captured.getValue().getId();
@@ -1340,11 +1340,11 @@ public class KinesisSupervisorTest extends
EasyMockSupport
// task came from another supervisor
Assert.assertEquals(
now,
- ((KinesisIndexTask)
aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime().get()
+ ((KinesisIndexTask)
aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime()
);
Assert.assertEquals(
maxi,
- ((KinesisIndexTask)
aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime().get()
+ ((KinesisIndexTask)
aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime()
);
}
@@ -3708,8 +3708,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition(SHARD_ID0),
ImmutableMap.of("0", "0"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -3717,8 +3717,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition(SHARD_ID1),
ImmutableMap.of("0", "0"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -3947,8 +3947,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
42,
ImmutableMap.of(SHARD_ID1, "3"),
- Optional.of(minMessageTime),
- Optional.of(maxMessageTime),
+ minMessageTime,
+ maxMessageTime,
ImmutableSet.of("id1", "id2", "id3", "id4"),
ImmutableSet.of()
);
@@ -4116,8 +4116,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
0,
task1.getIOConfig().getStartSequenceNumbers(),
task1.getIOConfig().getEndSequenceNumbers(),
- task1.getIOConfig().getMinimumMessageTime().get(),
- task1.getIOConfig().getMaximumMessageTime().get(),
+ task1.getIOConfig().getMinimumMessageTime(),
+ task1.getIOConfig().getMaximumMessageTime(),
dataSchema
);
@@ -5659,8 +5659,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Override
public String generateSequenceName(
Map<String, String> startPartitions,
- Optional<DateTime> minimumMessageTime,
- Optional<DateTime> maximumMessageTime,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
DataSchema dataSchema,
SeekableStreamIndexTaskTuningConfig tuningConfig
)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
index 928149cdee4..8f92f76e836 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
@@ -21,8 +21,8 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.segment.indexing.IOConfig;
import org.joda.time.DateTime;
@@ -33,14 +33,13 @@ public abstract class
SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
{
private static final boolean DEFAULT_USE_TRANSACTION = true;
- @Nullable
private final Integer taskGroupId;
private final String baseSequenceName;
private final SeekableStreamStartSequenceNumbers<PartitionIdType,
SequenceOffsetType> startSequenceNumbers;
private final SeekableStreamEndSequenceNumbers<PartitionIdType,
SequenceOffsetType> endSequenceNumbers;
private final boolean useTransaction;
- private final Optional<DateTime> minimumMessageTime;
- private final Optional<DateTime> maximumMessageTime;
+ private final DateTime minimumMessageTime;
+ private final DateTime maximumMessageTime;
private final InputFormat inputFormat;
private final Long refreshRejectionPeriodsInMinutes;
@@ -49,9 +48,9 @@ public abstract class
SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
final String baseSequenceName,
final SeekableStreamStartSequenceNumbers<PartitionIdType,
SequenceOffsetType> startSequenceNumbers,
final SeekableStreamEndSequenceNumbers<PartitionIdType,
SequenceOffsetType> endSequenceNumbers,
- final Boolean useTransaction,
- final DateTime minimumMessageTime,
- final DateTime maximumMessageTime,
+ @Nullable final Boolean useTransaction,
+ @Nullable final DateTime minimumMessageTime,
+ @Nullable final DateTime maximumMessageTime,
@Nullable final InputFormat inputFormat,
@Nullable final Long refreshRejectionPeriodsInMinutes // can be null for
backward compabitility
)
@@ -60,9 +59,9 @@ public abstract class
SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName,
"baseSequenceName");
this.startSequenceNumbers =
Preconditions.checkNotNull(startSequenceNumbers, "startSequenceNumbers");
this.endSequenceNumbers = Preconditions.checkNotNull(endSequenceNumbers,
"endSequenceNumbers");
- this.useTransaction = useTransaction != null ? useTransaction :
DEFAULT_USE_TRANSACTION;
- this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
- this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
+ this.useTransaction = Configs.valueOrDefault(useTransaction,
DEFAULT_USE_TRANSACTION);
+ this.minimumMessageTime = minimumMessageTime;
+ this.maximumMessageTime = maximumMessageTime;
this.inputFormat = inputFormat;
this.refreshRejectionPeriodsInMinutes = refreshRejectionPeriodsInMinutes;
@@ -73,8 +72,8 @@ public abstract class
SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
Preconditions.checkArgument(
startSequenceNumbers.getPartitionSequenceNumberMap()
- .keySet()
-
.equals(endSequenceNumbers.getPartitionSequenceNumberMap().keySet()),
+ .keySet()
+
.equals(endSequenceNumbers.getPartitionSequenceNumberMap().keySet()),
"start partition set and end partition set must match"
);
}
@@ -111,15 +110,15 @@ public abstract class
SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
}
@JsonProperty
- @JsonInclude(JsonInclude.Include.NON_ABSENT)
- public Optional<DateTime> getMaximumMessageTime()
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public DateTime getMaximumMessageTime()
{
return maximumMessageTime;
}
@JsonProperty
- @JsonInclude(JsonInclude.Include.NON_ABSENT)
- public Optional<DateTime> getMinimumMessageTime()
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public DateTime getMinimumMessageTime()
{
return minimumMessageTime;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 4156b0dba90..a30d5f2368e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
@@ -274,8 +275,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
this.ingestionState = IngestionState.NOT_STARTED;
this.lockGranularityToUse = lockGranularityToUse;
- minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
- maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
+ minMessageTime = Configs.valueOrDefault(ioConfig.getMinimumMessageTime(),
DateTimes.MIN);
+ maxMessageTime = Configs.valueOrDefault(ioConfig.getMaximumMessageTime(),
DateTimes.MAX);
rejectionPeriodUpdaterExec =
Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d");
if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 1948e22e2aa..faa68f3d8ea 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -159,7 +159,6 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000;
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
- private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
private static final int MAX_INITIALIZATION_RETRIES = 20;
private static final EmittingLogger log = new
EmittingLogger(SeekableStreamSupervisor.class);
@@ -198,8 +197,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final ImmutableMap<PartitionIdType, SequenceOffsetType>
unfilteredStartingSequencesForSequenceName;
final ConcurrentHashMap<String, TaskData> tasks = new
ConcurrentHashMap<>();
- final Optional<DateTime> minimumMessageTime;
- final Optional<DateTime> maximumMessageTime;
+ final DateTime minimumMessageTime;
+ final DateTime maximumMessageTime;
final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions;
final TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>
checkpointSequences = new TreeMap<>();
final String baseSequenceName;
@@ -211,8 +210,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
int groupId,
ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
@Nullable ImmutableMap<PartitionIdType, SequenceOffsetType>
unfilteredStartingSequencesForSequenceName,
- Optional<DateTime> minimumMessageTime,
- Optional<DateTime> maximumMessageTime,
+ @Nullable DateTime minimumMessageTime,
+ @Nullable DateTime maximumMessageTime,
@Nullable Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
)
{
@@ -239,8 +238,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
int groupId,
ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
@Nullable ImmutableMap<PartitionIdType, SequenceOffsetType>
unfilteredStartingSequencesForSequenceName,
- Optional<DateTime> minimumMessageTime,
- Optional<DateTime> maximumMessageTime,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
Set<PartitionIdType> exclusiveStartSequenceNumberPartitions,
String baseSequenceName
)
@@ -259,10 +258,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.baseSequenceName = baseSequenceName;
}
- int addNewCheckpoint(Map<PartitionIdType, SequenceOffsetType> checkpoint)
+ void addNewCheckpoint(Map<PartitionIdType, SequenceOffsetType> checkpoint)
{
checkpointSequences.put(checkpointSequences.lastKey() + 1, checkpoint);
- return checkpointSequences.lastKey();
}
Set<String> taskIds()
@@ -369,16 +367,6 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.errors = errors;
}
- public String getGroupId()
- {
- return groupId;
- }
-
- public String getTaskId()
- {
- return taskId;
- }
-
public List<ParseExceptionReport> getErrors()
{
return errors;
@@ -1609,8 +1597,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
public TaskGroup addTaskGroupToActivelyReadingTaskGroup(
int taskGroupId,
ImmutableMap<PartitionIdType, SequenceOffsetType> partitionOffsets,
- Optional<DateTime> minMsgTime,
- Optional<DateTime> maxMsgTime,
+ @Nullable DateTime minMsgTime,
+ @Nullable DateTime maxMsgTime,
Set<String> tasks,
Set<PartitionIdType> exclusiveStartingSequencePartitions
)
@@ -1637,8 +1625,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
public TaskGroup addTaskGroupToPendingCompletionTaskGroup(
int taskGroupId,
ImmutableMap<PartitionIdType, SequenceOffsetType> partitionOffsets,
- Optional<DateTime> minMsgTime,
- Optional<DateTime> maxMsgTime,
+ @Nullable DateTime minMsgTime,
+ @Nullable DateTime maxMsgTime,
Set<String> tasks,
Set<PartitionIdType> exclusiveStartingSequencePartitions
)
@@ -2190,8 +2178,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final TaskData prevTaskData =
taskGroup.tasks.putIfAbsent(taskId, new TaskData());
if (prevTaskData != null) {
throw new ISE(
- "taskGroup[%s] already exists for new
task[%s]",
- prevTaskData, taskId
+ "Task[%s] already exists in taskGroup[%d]
with data[%s]",
+ taskId, taskGroup.groupId, prevTaskData
);
}
verifySameSequenceNameForAllTasksInGroup(taskGroupId);
@@ -2459,7 +2447,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return sequence.compareTo(latestOffset) == 0;
}
) && earliestConsistentSequenceId.compareAndSet(-1,
sequenceCheckpoint.getKey())) || (
- pendingCompletionTaskGroups.getOrDefault(groupId, new
CopyOnWriteArrayList<>()).size() > 0
+ !pendingCompletionTaskGroups.getOrDefault(groupId, new
CopyOnWriteArrayList<>()).isEmpty()
&& earliestConsistentSequenceId.compareAndSet(-1,
taskCheckpoints.firstKey()))) {
final SortedMap<Integer, Map<PartitionIdType, SequenceOffsetType>>
latestCheckpoints = new TreeMap<>(
taskCheckpoints.tailMap(earliestConsistentSequenceId.get())
@@ -2470,7 +2458,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskGroup.checkpointSequences.putAll(latestCheckpoints);
} else {
log.debug(
- "Adding task [%s] to kill list, checkpoints[%s], latestoffsets
from DB [%s]",
+ "Adding task[%s] to kill list, checkpoints[%s], latestoffsets
from DB [%s]",
taskId,
taskCheckpoints,
latestOffsetsFromDb
@@ -2485,7 +2473,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
||
taskCheckpoints.tailMap(taskGroup.checkpointSequences.firstKey()).size()
!= taskGroup.checkpointSequences.size()) {
log.debug(
- "Adding task [%s] to kill list, checkpoints[%s], taskgroup
checkpoints [%s]",
+ "Adding task[%s] to kill list, checkpoints[%s], taskgroup
checkpoints [%s]",
taskId,
taskCheckpoints,
taskGroup.checkpointSequences
@@ -2496,12 +2484,12 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskIndex++;
}
- if ((tasksToKill.size() > 0 && tasksToKill.size() ==
taskGroup.tasks.size()) ||
- (taskGroup.tasks.size() == 0
- && pendingCompletionTaskGroups.getOrDefault(groupId, new
CopyOnWriteArrayList<>()).size() == 0)) {
+ if ((!tasksToKill.isEmpty() && tasksToKill.size() ==
taskGroup.tasks.size()) ||
+ (taskGroup.tasks.isEmpty()
+ && pendingCompletionTaskGroups.getOrDefault(groupId, new
CopyOnWriteArrayList<>()).isEmpty())) {
// killing all tasks or no task left in the group ?
// clear state about the taskgroup so that get latest sequence
information is fetched from metadata store
- log.warn("Clearing task group [%d] information as no valid tasks left
the group", groupId);
+ log.warn("Clearing task group[%d] information as no valid tasks left in
the group", groupId);
activelyReadingTaskGroups.remove(groupId);
for (PartitionIdType partitionId : taskGroup.startingSequences.keySet())
{
partitionOffsets.put(partitionId, getNotSetMarker());
@@ -2512,8 +2500,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
sequenceCheckpoint -> {
killTask(
sequenceCheckpoint.lhs,
- "Killing task [%s], as its checkpoints [%s] are not consistent
with group checkpoints[%s] or latest "
- + "persisted sequences in metadata store [%s]",
+ "Killing task[%s], as its checkpoints[%s] are not consistent
with group checkpoints[%s]"
+ + " or latest persisted sequences in metadata store[%s].",
sequenceCheckpoint.lhs,
sequenceCheckpoint.rhs,
taskGroup.checkpointSequences,
@@ -2548,7 +2536,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
if (!isTaskGroupPresent) {
- log.info("Creating new pending completion task group [%s] for
discovered task [%s].", groupId, taskId);
+ log.info("Creating new pending completion task group[%s] for
discovered task[%s].", groupId, taskId);
// reading the minimumMessageTime & maximumMessageTime from the
publishing task and setting it here is not necessary as this task cannot
// change to a state where it will read any more events.
@@ -2557,8 +2545,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
groupId,
ImmutableMap.copyOf(startingPartitions),
null,
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
null
);
@@ -2574,7 +2562,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
for (TaskGroup taskGroup : taskGroupList) {
if (taskGroup.startingSequences.equals(startingPartitions)) {
if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
- log.info("Added discovered task [%s] to existing pending completion
task group [%s]. PendingCompletionTaskGroup: %s", taskId, groupId,
taskGroup.taskIds());
+ log.info(
+ "Added discovered task[%s] to taskGroup[%s] with pending
completion tasks[%s].",
+ taskId, groupId, taskGroup.taskIds()
+ );
}
return;
}
@@ -2625,8 +2616,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
public Void apply(@Nullable Boolean result)
{
if (result == null || !result) {
- log.info("Task [%s] failed to stop in a timely manner, killing
task", id);
- killTask(id, "Task [%s] failed to stop in a timely manner,
killing task", id);
+ log.info("Killing task[%s] as it failed to stop in a timely
manner.", id);
+ killTask(id, "Failed to stop in a timely manner");
}
return null;
}
@@ -2701,8 +2692,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@VisibleForTesting
public String generateSequenceName(
Map<PartitionIdType, SequenceOffsetType> startPartitions,
- Optional<DateTime> minimumMessageTime,
- Optional<DateTime> maximumMessageTime,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
DataSchema dataSchema,
SeekableStreamIndexTaskTuningConfig tuningConfig
)
@@ -2712,10 +2703,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
for (Entry<PartitionIdType, SequenceOffsetType> entry :
startPartitions.entrySet()) {
sb.append(StringUtils.format("+%s(%s)", entry.getKey().toString(),
entry.getValue().toString()));
}
- String partitionOffsetStr = startPartitions.size() == 0 ? "" :
sb.toString().substring(1);
+ String partitionOffsetStr = startPartitions.isEmpty() ? "" :
sb.substring(1);
- String minMsgTimeStr = (minimumMessageTime.isPresent() ?
String.valueOf(minimumMessageTime.get().getMillis()) : "");
- String maxMsgTimeStr = (maximumMessageTime.isPresent() ?
String.valueOf(maximumMessageTime.get().getMillis()) : "");
+ String minMsgTimeStr = (minimumMessageTime == null ? "" :
String.valueOf(minimumMessageTime.getMillis()));
+ String maxMsgTimeStr = (maximumMessageTime == null ? "" :
String.valueOf(maximumMessageTime.getMillis()));
String dataSchemaStr, tuningConfigStr;
try {
@@ -2773,11 +2764,14 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return false;
}
- log.debug("Found [%d] partitions for stream [%s]",
partitionIdsFromSupplier.size(), ioConfig.getStream());
+ log.debug("Found [%d] partitions for stream[%s]",
partitionIdsFromSupplier.size(), ioConfig.getStream());
final int configuredTaskCount = spec.getIoConfig().getTaskCount();
if (configuredTaskCount > partitionIdsFromSupplier.size()) {
- log.warn("Configured task count[%s] for supervisor[%s] is greater than
the number of partitions[%d].", configuredTaskCount, supervisorId,
partitionIdsFromSupplier.size());
+ log.warn(
+ "Configured task count[%s] for supervisor[%s] is greater than the
number of partitions[%d].",
+ configuredTaskCount, supervisorId, partitionIdsFromSupplier.size()
+ );
}
Map<PartitionIdType, SequenceOffsetType> storedMetadata =
getOffsetsFromMetadataStorage();
Set<PartitionIdType> storedPartitions = storedMetadata.keySet();
@@ -2809,7 +2803,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
new HashSet<>(previousPartitionIds)
);
- log.debug("active partitions from supplier: " +
activePartitionsIdsFromSupplier);
+ log.debug("active partitions from supplier: %s",
activePartitionsIdsFromSupplier);
if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() !=
partitionIdsFromSupplier.size()) {
// this should never happen, but we check for it and exclude the expired
partitions if they somehow reappear
@@ -2819,9 +2813,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
partitionIdsFromSupplier
);
}
- if (activePartitionsIdsFromSupplier.size() == 0) {
+ if (activePartitionsIdsFromSupplier.isEmpty()) {
String errMsg = StringUtils.format(
- "No active partitions found for stream [%s] after removing closed
and previously expired partitions",
+ "No active partitions found for stream[%s] after removing closed and
previously expired partitions",
ioConfig.getStream()
);
stateManager.recordThrowableEvent(new StreamException(new ISE(errMsg)));
@@ -2832,7 +2826,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
boolean initialPartitionDiscovery = this.partitionIds.isEmpty();
for (PartitionIdType partitionId :
partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions) {
if (closedPartitions.contains(partitionId)) {
- log.info("partition [%s] is closed and has no more data, skipping.",
partitionId);
+ log.info("Skipping partition[%s] as it is closed and has no more
data.", partitionId);
continue;
}
@@ -2869,7 +2863,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (partitionOffsets.putIfAbsent(partitionId, getNotSetMarker()) ==
null) {
log.debug(
- "New partition [%s] discovered for stream [%s], added to task
group [%d]",
+ "Discovered new partition[%s] for stream [%s], added to
taskGroup[%d]",
partitionId,
ioConfig.getStream(),
taskGroupId
@@ -2879,10 +2873,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
- if (newlyDiscovered.size() > 0) {
+ if (!newlyDiscovered.isEmpty()) {
for (Int2ObjectMap.Entry<List<PartitionIdType>> taskGroupPartitions :
newlyDiscovered.int2ObjectEntrySet()) {
log.info(
- "New partitions %s discovered for stream [%s], added to task group
[%s]",
+ "Discovered new partitions[%s] for stream[%s], added to
taskGroup[%s].",
taskGroupPartitions.getValue(),
ioConfig.getStream(),
taskGroupPartitions.getIntKey()
@@ -3501,7 +3495,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
log.info(
- "Stopping all tasks in taskGroup[%s] because: [%s]",
+ "Stopping all tasks in taskGroup[%s] because [%s].",
taskGroup.groupId,
StringUtils.format(stopReasonFormat, args)
);
@@ -3554,7 +3548,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();
- Preconditions.checkNotNull(taskData.status, "task[%s] has null
status", taskId);
+ Preconditions.checkNotNull(taskData.status, "Task[%s] has null
status", taskId);
if (taskData.status.isFailure()) {
stateManager.recordCompletedTaskState(TaskState.FAILED);
@@ -3569,7 +3563,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (taskData.status.isSuccess()) {
// If one of the pending completion tasks was successful, stop the
rest of the tasks in the group as
// we no longer need them to publish their segment.
- log.info("Task [%s] completed successfully, stopping tasks %s",
taskId, group.taskIds());
+ log.info("Task[%s] completed successfully, stopping tasks[%s]",
taskId, group.taskIds());
stateManager.recordCompletedTaskState(TaskState.SUCCESS);
futures.add(
stopTasksInGroup(group, "Task [%s] completed successfully,
stopping tasks %s", taskId, group.taskIds())
@@ -3582,10 +3576,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if ((!foundSuccess && group.completionTimeout.isBeforeNow()) ||
entireTaskGroupFailed) {
if (entireTaskGroupFailed) {
- log.warn("All tasks in group [%d] failed to publish, killing all
tasks for these partitions", groupId);
+ log.warn("All tasks in taskGroup[%d] failed to publish, killing
all tasks for these partitions", groupId);
} else {
log.makeAlert(
- "No task in [%s] for taskGroup [%d] succeeded before the
completion timeout elapsed [%s]! "
+ "No task in [%s] for taskGroup[%d] succeeded before the
completion timeout elapsed [%s]! "
+ "Check metrics and logs to see if the creation, publish or
handoff"
+ " of any segment is taking longer than usual.",
group.taskIds(),
@@ -3764,18 +3758,20 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
for (Integer groupId : partitionGroups.keySet()) {
if (!activelyReadingTaskGroups.containsKey(groupId)) {
log.info("Creating new taskGroup[%d] for partitions[%s].", groupId,
partitionGroups.get(groupId));
- Optional<DateTime> minimumMessageTime;
+ final DateTime minimumMessageTime;
if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) {
- minimumMessageTime =
Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
+ minimumMessageTime =
ioConfig.getLateMessageRejectionStartDateTime().get();
} else {
- minimumMessageTime =
(ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
-
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
- ) : Optional.absent());
+ minimumMessageTime =
ioConfig.getLateMessageRejectionPeriod().isPresent()
+ ?
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
+ : null;
}
- Optional<DateTime> maximumMessageTime =
(ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of(
-
DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get())
- ) : Optional.absent());
+ final DateTime maximumMessageTime =
ioConfig.getEarlyMessageRejectionPeriod().isPresent()
+ ? DateTimes.nowUtc()
+
.plus(ioConfig.getTaskDuration())
+
.plus(ioConfig.getEarlyMessageRejectionPeriod().get())
+ : null;
final Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>>
unfilteredStartingOffsets =
generateStartingSequencesForPartitionGroup(groupId);
@@ -3848,7 +3844,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Integer groupId = entry.getKey();
if (taskGroup.startingSequences == null ||
- taskGroup.startingSequences.size() == 0 ||
+ taskGroup.startingSequences.isEmpty() ||
taskGroup.startingSequences.values().stream().allMatch(x -> x ==
null || isEndOfShard(x))) {
log.debug("Nothing to read in any partition for taskGroup[%d],
skipping task creation.", groupId);
continue;
@@ -4047,8 +4043,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.get(groupId)
.exclusiveStartSequenceNumberPartitions;
- DateTime minimumMessageTime = group.minimumMessageTime.orNull();
- DateTime maximumMessageTime = group.maximumMessageTime.orNull();
+ DateTime minimumMessageTime = group.minimumMessageTime;
+ DateTime maximumMessageTime = group.maximumMessageTime;
SeekableStreamIndexTaskIOConfig newIoConfig = createTaskIoConfig(
groupId,
@@ -4159,7 +4155,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
{
Map<PartitionIdType, SequenceOffsetType> offsetsFromMetadataStorage =
getOffsetsFromMetadataStorage();
if (!spec.isSuspended()) {
- if (activelyReadingTaskGroups.size() > 0 ||
pendingCompletionTaskGroups.size() > 0) {
+ if (!activelyReadingTaskGroups.isEmpty() ||
!pendingCompletionTaskGroups.isEmpty()) {
Map<PartitionIdType, SequenceOffsetType> currentOffsets =
Stream.concat(
activelyReadingTaskGroups
@@ -4392,7 +4388,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/**
* default implementation, schedules periodic fetch of latest offsets and
{@link #emitLag} reporting for Kafka and Kinesis
- * and periodic reporting of {@Link #emitNoticesQueueSize} for various data
sources.
+ * and periodic reporting of {@link #emitNoticesQueueSize} for various data
sources.
*/
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
@@ -4424,15 +4420,21 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
/**
- * calculate lag per partition for kafka as a measure of message count,
kinesis implementation returns an empty
- * map
+ * Calculates record lag per partition using the current offsets from tasks.
*
- * @return map of partition id -> lag
+ * @return Map from partition ID to record lag.
+ * @see #getPartitionRecordLag
*/
protected abstract Map<PartitionIdType, Long> getRecordLagPerPartition(
Map<PartitionIdType, SequenceOffsetType> currentOffsets
);
+ /**
+ * Calculates time lag per partition using the current offsets from tasks.
+ *
+ * @return Map from partition ID to time lag.
+ * @see #getPartitionTimeLag
+ */
protected abstract Map<PartitionIdType, Long> getTimeLagPerPartition(
Map<PartitionIdType, SequenceOffsetType> currentOffsets
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index f78fd680e36..db724931b39 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -90,8 +89,8 @@ public class SeekableStreamIndexTaskRunnerTest
DateTime now = DateTimes.nowUtc();
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(120L);
-
Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().plusHours(2)));
-
Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().minusHours(2)));
+
Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(DateTimes.nowUtc().plusHours(2));
+
Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(DateTimes.nowUtc().minusHours(2));
Mockito.when(ioConfig.getInputFormat()).thenReturn(new
JsonInputFormat(null, null, null, null, null));
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
@@ -143,8 +142,8 @@ public class SeekableStreamIndexTaskRunnerTest
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
// min max time not populated.
-
Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.absent());
-
Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.absent());
+ Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(null);
+ Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(null);
Mockito.when(ioConfig.getInputFormat()).thenReturn(new
JsonInputFormat(null, null, null, null, null));
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index fe03a957ad2..58c838615ac 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -1850,8 +1850,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "0"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -1859,8 +1859,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "0"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -1896,8 +1896,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -1905,8 +1905,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -1956,8 +1956,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
checkpointOffsets,
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -2003,24 +2003,24 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
final SeekableStreamSupervisor.TaskGroup taskGroup0 =
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task0"),
ImmutableSet.of()
);
final SeekableStreamSupervisor.TaskGroup taskGroup1 =
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
final SeekableStreamSupervisor.TaskGroup taskGroup2 =
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
ImmutableMap.of("2", "100"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -2097,8 +2097,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -2106,8 +2106,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -2115,8 +2115,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
ImmutableMap.of("2", "100"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task3"),
ImmutableSet.of()
);
@@ -2168,8 +2168,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -2177,8 +2177,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -2186,8 +2186,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
ImmutableMap.of("2", "100"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task3"),
ImmutableSet.of()
);
@@ -2243,8 +2243,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -2252,8 +2252,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -2310,8 +2310,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -2319,8 +2319,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -2353,8 +2353,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "0"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -2362,8 +2362,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "0"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -2392,8 +2392,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "0"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -2401,8 +2401,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "0"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
@@ -2442,8 +2442,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "0"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task1"),
ImmutableSet.of()
);
@@ -2451,8 +2451,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "0"),
- Optional.absent(),
- Optional.absent(),
+ null,
+ null,
ImmutableSet.of("task2"),
ImmutableSet.of()
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]