This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
new 49796a9 fix SequenceMetadata deserialization (#7256) (#7262)
49796a9 is described below
commit 49796a917120256bcf39c6f3d2446c066a1c2e39
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Mar 13 20:54:54 2019 -0700
fix SequenceMetadata deserialization (#7256) (#7262)
* wip
* fix tests, stop reading if we are at end offset
* fix build
* remove restore at end offsets fix in favor of a separate PR
* use typereference from method for serialization too
---
.../IncrementalPublishingKafkaIndexTaskRunner.java | 11 +-
.../indexing/kafka/LegacyKafkaIndexTaskRunner.java | 12 +-
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 171 ++++++++--
.../indexing/kinesis/KinesisIndexTaskRunner.java | 11 +-
.../indexing/kinesis/KinesisIndexTaskTest.java | 247 +++++++++++---
.../SeekableStreamIndexTaskRunner.java | 360 ++-------------------
.../indexing/seekablestream/SequenceMetadata.java | 335 +++++++++++++++++++
.../supervisor/SeekableStreamSupervisor.java | 3 +-
8 files changed, 747 insertions(+), 403 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 6424c29..ae092d5 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -28,6 +28,7 @@ import
org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -111,7 +112,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner
extends SeekableStreamInd
}
@Override
- protected SeekableStreamPartitions<Integer, Long>
deserializeSeekableStreamPartitionsFromMetadata(
+ protected SeekableStreamPartitions<Integer, Long>
deserializePartitionsFromMetadata(
ObjectMapper mapper,
Object object
)
@@ -225,6 +226,14 @@ public class IncrementalPublishingKafkaIndexTaskRunner
extends SeekableStreamInd
return false;
}
+ @Override
+ public TypeReference<List<SequenceMetadata<Integer, Long>>>
getSequenceMetadataTypeReference()
+ {
+ return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
+ {
+ };
+ }
+
@Nullable
@Override
protected TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index b065361..61ebf49 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -50,6 +51,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -627,6 +629,14 @@ public class LegacyKafkaIndexTaskRunner extends
SeekableStreamIndexTaskRunner<In
return false;
}
+ @Override
+ public TypeReference<List<SequenceMetadata<Integer, Long>>>
getSequenceMetadataTypeReference()
+ {
+ return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
+ {
+ };
+ }
+
@Nonnull
@Override
protected List<OrderedPartitionableRecord<Integer, Long>> getRecords(
@@ -726,7 +736,7 @@ public class LegacyKafkaIndexTaskRunner extends
SeekableStreamIndexTaskRunner<In
@Override
- protected SeekableStreamPartitions<Integer, Long>
deserializeSeekableStreamPartitionsFromMetadata(
+ protected SeekableStreamPartitions<Integer, Long>
deserializePartitionsFromMetadata(
ObjectMapper mapper,
Object object
)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 76433c2..a2224e5 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -282,6 +282,26 @@ public class KafkaIndexTaskTest
);
}
+ private static List<ProducerRecord<byte[], byte[]>>
generateSinglePartitionRecords(String topic)
+ {
+ return ImmutableList.of(
+ new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2008", "A", "x", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2009", "B", "x", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2010", "C", "x", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2011", "D", "x", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2011", "d", "x", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2012", "E", "x", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2009", "b", "x", "10",
"20.0", "1.0"))
+ );
+ }
+
private static String getTopicName()
{
return "topic" + topicPostfix++;
@@ -863,23 +883,7 @@ public class KafkaIndexTaskTest
if (!isIncrementalHandoffSupported) {
return;
}
-
- List<ProducerRecord<byte[], byte[]>> records = ImmutableList.of(
- new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2008", "A", "x", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2009", "B", "x", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2010", "C", "x", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2011", "D", "x", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2011", "d", "x", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2012", "E", "x", "10",
"20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, JB("2009", "b", "x", "10",
"20.0", "1.0"))
- );
+ records = generateSinglePartitionRecords(topic);
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing
should happen
@@ -894,22 +898,14 @@ public class KafkaIndexTaskTest
Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
- final SeekableStreamPartitions<Integer, Long> startPartitions = new
SeekableStreamPartitions<>(
- topic,
- ImmutableMap.of(0, 0L)
- );
- final SeekableStreamPartitions<Integer, Long> checkpoint1 = new
SeekableStreamPartitions<>(
- topic,
- ImmutableMap.of(0, 5L)
- );
- final SeekableStreamPartitions<Integer, Long> checkpoint2 = new
SeekableStreamPartitions<>(
- topic,
- ImmutableMap.of(0, 12L)
- );
- final SeekableStreamPartitions<Integer, Long> endPartitions = new
SeekableStreamPartitions<>(
- topic,
- ImmutableMap.of(0, Long.MAX_VALUE)
- );
+ final SeekableStreamPartitions<Integer, Long> startPartitions =
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L));
+ final SeekableStreamPartitions<Integer, Long> checkpoint1 =
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L));
+ final SeekableStreamPartitions<Integer, Long> checkpoint2 =
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L));
+ final SeekableStreamPartitions<Integer, Long> endPartitions =
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0,
Long.MAX_VALUE));
final KafkaIndexTask task = createTask(
null,
@@ -1924,6 +1920,115 @@ public class KafkaIndexTaskTest
}
@Test(timeout = 60_000L)
+ public void testRestoreAfterPersistingSequences() throws Exception
+ {
+ if (!isIncrementalHandoffSupported) {
+ return;
+ }
+
+ records = generateSinglePartitionRecords(topic);
+ maxRowsPerSegment = 2;
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+ consumerProps.put("max.poll.records", "1");
+
+ final KafkaIndexTask task1 = createTask(
+ null,
+ new KafkaIndexTaskIOConfig(
+ 0,
+ "sequence0",
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L)),
+ consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ true,
+ null,
+ null,
+ false
+ )
+ );
+
+ final SeekableStreamPartitions<Integer, Long> checkpoint = new
SeekableStreamPartitions<>(
+ topic,
+ ImmutableMap.of(0, 5L)
+ );
+
+ final ListenableFuture<TaskStatus> future1 = runTask(task1);
+
+ // Insert some data, but not enough for the task to finish
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records,
5)) {
+ kafkaProducer.send(record).get();
+ }
+ }
+
+ while (task1.getRunner().getStatus() != Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ final Map<Integer, Long> currentOffsets =
ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
+ Assert.assertEquals(checkpoint.getPartitionSequenceNumberMap(),
currentOffsets);
+ // Set endOffsets to persist sequences
+ task1.getRunner().setEndOffsets(ImmutableMap.of(0, 5L), false);
+
+ // Stop without publishing segment
+ task1.stopGracefully(toolboxFactory.build(task1).getConfig());
+ unlockAppenderatorBasePersistDirForTask(task1);
+
+ Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
+
+ // Start a new task
+ final KafkaIndexTask task2 = createTask(
+ task1.getId(),
+ new KafkaIndexTaskIOConfig(
+ 0,
+ "sequence0",
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L)),
+ consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ true,
+ null,
+ null,
+ false
+ )
+ );
+
+ final ListenableFuture<TaskStatus> future2 = runTask(task2);
+ // Wait for the task to start reading
+
+ // Insert remaining data
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 5))
{
+ kafkaProducer.send(record).get();
+ }
+ }
+
+ // Wait for task to exit
+ Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
+
+ // Check metrics
+ Assert.assertEquals(5,
task1.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(0,
task1.getRunner().getRowIngestionMeters().getUnparseable());
+ Assert.assertEquals(0,
task1.getRunner().getRowIngestionMeters().getThrownAway());
+ Assert.assertEquals(4,
task2.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(0,
task2.getRunner().getRowIngestionMeters().getUnparseable());
+ Assert.assertEquals(0,
task2.getRunner().getRowIngestionMeters().getThrownAway());
+
+ // Check published segments & metadata
+ SegmentDescriptor desc1 = SD(task1, "2008/P1D", 0);
+ SegmentDescriptor desc2 = SD(task1, "2008/P1D", 1);
+ SegmentDescriptor desc3 = SD(task1, "2009/P1D", 0);
+ SegmentDescriptor desc4 = SD(task1, "2009/P1D", 1);
+ SegmentDescriptor desc5 = SD(task1, "2010/P1D", 0);
+ SegmentDescriptor desc6 = SD(task1, "2011/P1D", 0);
+ SegmentDescriptor desc7 = SD(task1, "2012/P1D", 0);
+ Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5,
desc6, desc7), publishedDescriptors());
+ Assert.assertEquals(
+ new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic,
ImmutableMap.of(0, 9L))),
+
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+ );
+ }
+
+ @Test(timeout = 60_000L)
public void testRunWithPauseAndResume() throws Exception
{
final KafkaIndexTask task = createTask(
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index 3e7e5e7..50326f7 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -28,6 +28,7 @@ import
org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -92,7 +93,7 @@ public class KinesisIndexTaskRunner extends
SeekableStreamIndexTaskRunner<String
}
@Override
- protected SeekableStreamPartitions<String, String>
deserializeSeekableStreamPartitionsFromMetadata(
+ protected SeekableStreamPartitions<String, String>
deserializePartitionsFromMetadata(
ObjectMapper mapper,
Object object
)
@@ -176,6 +177,14 @@ public class KinesisIndexTaskRunner extends
SeekableStreamIndexTaskRunner<String
return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum);
}
+ @Override
+ public TypeReference<List<SequenceMetadata<String, String>>>
getSequenceMetadataTypeReference()
+ {
+ return new TypeReference<List<SequenceMetadata<String, String>>>()
+ {
+ };
+ }
+
@Nullable
@Override
protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 58fa72a..f60fe50 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -199,28 +199,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
private static String shardId1 = "1";
private static String shardId0 = "0";
private static KinesisRecordSupplier recordSupplier;
- private static List<OrderedPartitionableRecord<String, String>> records =
ImmutableList.of(
- new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a", "y",
"10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b", "y",
"10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c", "y",
"10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d", "y",
"10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e", "y",
"10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(
- stream,
- "1",
- "5",
- JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
- ),
- new OrderedPartitionableRecord<>(stream, "1", "6",
Collections.singletonList(StringUtils.toUtf8("unparseable"))),
- new OrderedPartitionableRecord<>(stream, "1", "7",
Collections.singletonList(StringUtils.toUtf8("unparseable2"))),
- new OrderedPartitionableRecord<>(stream, "1", "8",
Collections.singletonList(StringUtils.toUtf8("{}"))),
- new OrderedPartitionableRecord<>(stream, "1", "9", JB("2013", "f", "y",
"10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "10", JB("2049", "f", "y",
"notanumber", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "11", JB("2049", "f", "y",
"10", "notanumber", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "12", JB("2049", "f", "y",
"10", "20.0", "notanumber")),
- new OrderedPartitionableRecord<>(stream, "0", "0", JB("2012", "g", "y",
"10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "0", "1", JB("2011", "h", "y",
"10", "20.0", "1.0"))
- );
+ private static List<OrderedPartitionableRecord<String, String>> records;
private static ServiceEmitter emitter;
private static ListeningExecutorService taskExec;
@@ -316,6 +295,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
maxSavedParseExceptions = null;
skipAvailabilityCheck = false;
doHandoff = true;
+ records = generateRecords(stream);
reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" +
System.currentTimeMillis(), "json");
maxRecordsPerPoll = 1;
@@ -348,6 +328,52 @@ public class KinesisIndexTaskTest extends EasyMockSupport
emitter.close();
}
+ private static List<OrderedPartitionableRecord<String, String>>
generateRecords(String stream)
+ {
+ return ImmutableList.of(
+ new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(
+ stream,
+ "1",
+ "5",
+ JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+ ),
+ new OrderedPartitionableRecord<>(stream, "1", "6",
Collections.singletonList(StringUtils.toUtf8("unparseable"))),
+ new OrderedPartitionableRecord<>(stream, "1", "7",
Collections.singletonList(StringUtils.toUtf8("unparseable2"))),
+ new OrderedPartitionableRecord<>(stream, "1", "8",
Collections.singletonList(StringUtils.toUtf8("{}"))),
+ new OrderedPartitionableRecord<>(stream, "1", "9", JB("2013", "f",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "10", JB("2049", "f",
"y", "notanumber", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "11", JB("2049", "f",
"y", "10", "notanumber", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "12", JB("2049", "f",
"y", "10", "20.0", "notanumber")),
+ new OrderedPartitionableRecord<>(stream, "0", "0", JB("2012", "g",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "0", "1", JB("2011", "h",
"y", "10", "20.0", "1.0"))
+ );
+ }
+
+ private static List<OrderedPartitionableRecord<String, String>>
generateSinglePartitionRecords(String stream)
+ {
+ return ImmutableList.of(
+ new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "5", JB("2012", "a",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "6", JB("2013", "b",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "7", JB("2010", "c",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "8", JB("2011", "d",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "9", JB("2011", "e",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "10", JB("2008", "a",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "11", JB("2009", "b",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "12", JB("2010", "c",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "13", JB("2012", "d",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "14", JB("2013", "e",
"y", "10", "20.0", "1.0"))
+ );
+ }
@Test(timeout = 120_000L)
public void testRunAfterDataInserted() throws Exception
{
@@ -2214,6 +2240,165 @@ public class KinesisIndexTaskTest extends
EasyMockSupport
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1",
desc2));
}
+ @Test(timeout = 120_000L)
+ public void testRestoreAfterPersistingSequences() throws Exception
+ {
+ maxRowsPerSegment = 2;
+ maxRecordsPerPoll = 1;
+ records = generateSinglePartitionRecords(stream);
+
+ recordSupplier.assign(anyObject());
+ expectLastCall().anyTimes();
+
+
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+
+ recordSupplier.seek(anyObject(), anyString());
+ expectLastCall().anyTimes();
+
+ // simulate 1 record at a time
+
expect(recordSupplier.poll(anyLong())).andReturn(Collections.singletonList(records.get(0)))
+ .once()
+
.andReturn(Collections.singletonList(records.get(1)))
+ .once()
+
.andReturn(Collections.singletonList(records.get(2)))
+ .once()
+
.andReturn(Collections.singletonList(records.get(3)))
+ .once()
+
.andReturn(Collections.singletonList(records.get(4)))
+ .once()
+ .andReturn(Collections.emptyList())
+ .anyTimes();
+
+ replayAll();
+
+ final KinesisIndexTask task1 = createTask(
+ "task1",
+ new KinesisIndexTaskIOConfig(
+ null,
+ "sequence0",
+ new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+ shardId1,
+ "0"
+ )),
+ new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+ shardId1,
+ "6"
+ )),
+ true,
+ null,
+ null,
+ "awsEndpoint",
+ null,
+ null,
+ null,
+ null,
+ null,
+ false
+ )
+ );
+
+ final SeekableStreamPartitions<String, String> checkpoint1 = new
SeekableStreamPartitions<>(
+ stream,
+ ImmutableMap.of(shardId1, "4")
+ );
+
+ final ListenableFuture<TaskStatus> future1 = runTask(task1);
+
+ while (task1.getRunner().getStatus() !=
SeekableStreamIndexTaskRunner.Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ final Map<String, String> currentOffsets =
ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
+ Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(),
currentOffsets);
+ task1.getRunner().setEndOffsets(currentOffsets, false);
+
+ // Stop without publishing segment
+ task1.stopGracefully(toolboxFactory.build(task1).getConfig());
+ unlockAppenderatorBasePersistDirForTask(task1);
+
+ Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
+
+ verifyAll();
+ reset(recordSupplier);
+
+ recordSupplier.assign(anyObject());
+ expectLastCall().anyTimes();
+
+
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+
+
+ recordSupplier.seek(anyObject(), anyString());
+ expectLastCall().anyTimes();
+
+
expect(recordSupplier.poll(anyLong())).andReturn(Collections.singletonList(records.get(5)))
+ .once()
+
.andReturn(Collections.singletonList(records.get(6)))
+ .once()
+ .andReturn(Collections.emptyList())
+ .anyTimes();
+
+ recordSupplier.close();
+ expectLastCall();
+
+ replayAll();
+
+ // Start a new task
+ final KinesisIndexTask task2 = createTask(
+ task1.getId(),
+ new KinesisIndexTaskIOConfig(
+ null,
+ "sequence0",
+ new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+ shardId1,
+ "0"
+ )),
+ new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+ shardId1,
+ "6"
+ )),
+ true,
+ null,
+ null,
+ "awsEndpoint",
+ null,
+ null,
+ ImmutableSet.of(shardId1),
+ null,
+ null,
+ false
+ )
+ );
+
+ final ListenableFuture<TaskStatus> future2 = runTask(task2);
+
+ // Wait for task to exit
+ Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
+
+ verifyAll();
+
+ // Check metrics
+ Assert.assertEquals(5,
task1.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(0,
task1.getRunner().getRowIngestionMeters().getUnparseable());
+ Assert.assertEquals(0,
task1.getRunner().getRowIngestionMeters().getThrownAway());
+ Assert.assertEquals(1,
task2.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(0,
task2.getRunner().getRowIngestionMeters().getUnparseable());
+ Assert.assertEquals(0,
task2.getRunner().getRowIngestionMeters().getThrownAway());
+
+ // Check published segments & metadata
+ SegmentDescriptor desc1 = SD(task1, "2008/P1D", 0);
+ SegmentDescriptor desc2 = SD(task1, "2009/P1D", 0);
+ SegmentDescriptor desc3 = SD(task1, "2010/P1D", 0);
+ SegmentDescriptor desc4 = SD(task1, "2011/P1D", 0);
+ SegmentDescriptor desc5 = SD(task1, "2013/P1D", 0);
+ Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5),
publishedDescriptors());
+ Assert.assertEquals(
+ new KinesisDataSourceMetadata(
+ new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+ shardId1,
+ "6"
+ ))),
+
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+ );
+ }
@Test(timeout = 120_000L)
public void testRunWithPauseAndResume() throws Exception
@@ -2427,23 +2612,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
@Test(timeout = 5000L)
public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
{
- final List<OrderedPartitionableRecord<String, String>> records =
ImmutableList.of(
- new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "5", JB("2012", "a",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "6", JB("2013", "b",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "7", JB("2010", "c",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "8", JB("2011", "d",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "9", JB("2011", "e",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "10", JB("2008", "a",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "11", JB("2009", "b",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "12", JB("2010", "c",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "13", JB("2012", "d",
"y", "10", "20.0", "1.0")),
- new OrderedPartitionableRecord<>(stream, "1", "14", JB("2013", "e",
"y", "10", "20.0", "1.0"))
- );
+ records = generateSinglePartitionRecords(stream);
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing
should happen
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 9c5352f..00c708c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -20,8 +20,6 @@
package org.apache.druid.indexing.seekablestream;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -53,7 +51,6 @@ import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import
org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
-import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
@@ -75,7 +72,6 @@ import
org.apache.druid.segment.realtime.appenderator.Appenderator;
import
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
-import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.Access;
@@ -142,8 +138,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
private static final EmittingLogger log = new
EmittingLogger(SeekableStreamIndexTaskRunner.class);
- private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
- private static final String METADATA_PUBLISH_PARTITIONS =
"publishPartitions";
+ static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
+ static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
private final Map<PartitionIdType, SequenceOffsetType> endOffsets;
private final ConcurrentMap<PartitionIdType, SequenceOffsetType> currOffsets
= new ConcurrentHashMap<>();
@@ -210,7 +206,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
protected volatile boolean pauseRequested = false;
private volatile long nextCheckpointTime;
- private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
+ private volatile CopyOnWriteArrayList<SequenceMetadata<PartitionIdType,
SequenceOffsetType>> sequences;
private volatile Throwable backgroundThreadException;
public SeekableStreamIndexTaskRunner(
@@ -276,7 +272,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> previous
= sequenceOffsets.next();
while (sequenceOffsets.hasNext()) {
Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> current
= sequenceOffsets.next();
- sequences.add(new SequenceMetadata(
+ sequences.add(new SequenceMetadata<>(
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(),
previous.getKey()),
previous.getValue(),
@@ -287,7 +283,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
previous = current;
exclusive = true;
}
- sequences.add(new SequenceMetadata(
+ sequences.add(new SequenceMetadata<>(
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(),
previous.getKey()),
previous.getValue(),
@@ -296,7 +292,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
exclusive ? previous.getValue().keySet() : null
));
} else {
- sequences.add(new SequenceMetadata(
+ sequences.add(new SequenceMetadata<>(
0,
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
ioConfig.getStartPartitions().getPartitionSequenceNumberMap(),
@@ -369,7 +365,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} else {
@SuppressWarnings("unchecked")
final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
- final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType>
restoredNextPartitions = deserializeSeekableStreamPartitionsFromMetadata(
+ final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType>
restoredNextPartitions = deserializePartitionsFromMetadata(
toolbox.getObjectMapper(),
restoredMetadataMap.get(METADATA_NEXT_PARTITIONS)
);
@@ -553,9 +549,9 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
boolean isPersistRequired = false;
- final SequenceMetadata sequenceToUse = sequences
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceToUse = sequences
.stream()
- .filter(sequenceMetadata ->
sequenceMetadata.canHandle(record))
+ .filter(sequenceMetadata ->
sequenceMetadata.canHandle(this, record))
.findFirst()
.orElse(null);
@@ -702,11 +698,11 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
status = Status.PUBLISHING;
}
- for (SequenceMetadata sequenceMetadata : sequences) {
+ for (SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceMetadata : sequences) {
if (!publishingSequences.contains(sequenceMetadata.getSequenceName()))
{
// this is done to prevent checks in sequence specific commit
supplier from failing
sequenceMetadata.setEndOffsets(currOffsets);
- sequenceMetadata.updateAssignments(currOffsets);
+ sequenceMetadata.updateAssignments(this, currOffsets);
publishingSequences.add(sequenceMetadata.getSequenceName());
// persist already done in finally, so directly add to publishQueue
publishAndRegisterHandoff(sequenceMetadata);
@@ -822,11 +818,6 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return TaskStatus.success(task.getId());
}
- /**
- * checks if the input seqNum marks end of shard. Used by Kinesis only
- */
- protected abstract boolean isEndOfShard(SequenceOffsetType seqNum);
-
private void checkPublishAndHandoffFailure() throws ExecutionException,
InterruptedException
{
// Check if any publishFuture failed.
@@ -856,14 +847,14 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
handOffWaitList.removeAll(handoffFinished);
}
- private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata)
+ private void publishAndRegisterHandoff(SequenceMetadata<PartitionIdType,
SequenceOffsetType> sequenceMetadata)
{
log.info("Publishing segments for sequence [%s]", sequenceMetadata);
final ListenableFuture<SegmentsAndMetadata> publishFuture =
Futures.transform(
driver.publish(
- sequenceMetadata.createPublisher(toolbox,
ioConfig.isUseTransaction()),
- sequenceMetadata.getCommitterSupplier(stream,
lastPersistedOffsets).get(),
+ sequenceMetadata.createPublisher(this, toolbox,
ioConfig.isUseTransaction()),
+ sequenceMetadata.getCommitterSupplier(this, stream,
lastPersistedOffsets).get(),
Collections.singletonList(sequenceMetadata.getSequenceName())
),
(Function<SegmentsAndMetadata, SegmentsAndMetadata>)
publishedSegmentsAndMetadata -> {
@@ -948,11 +939,9 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final File sequencesPersistFile = getSequencesPersistFile(toolbox);
if (sequencesPersistFile.exists()) {
sequences = new CopyOnWriteArrayList<>(
- toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
+ toolbox.getObjectMapper().<List<SequenceMetadata<PartitionIdType,
SequenceOffsetType>>>readValue(
sequencesPersistFile,
- new TypeReference<List<SequenceMetadata>>()
- {
- }
+ getSequenceMetadataTypeReference()
)
);
return true;
@@ -965,9 +954,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
{
log.info("Persisting Sequences Metadata [%s]", sequences);
toolbox.getObjectMapper().writerWithType(
- new TypeReference<List<SequenceMetadata>>()
- {
- }
+ getSequenceMetadataTypeReference()
).writeValue(getSequencesPersistFile(toolbox), sequences);
}
@@ -1012,8 +999,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private void maybePersistAndPublishSequences(Supplier<Committer>
committerSupplier)
throws InterruptedException
{
- for (SequenceMetadata sequenceMetadata : sequences) {
- sequenceMetadata.updateAssignments(currOffsets);
+ for (SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceMetadata : sequences) {
+ sequenceMetadata.updateAssignments(this, currOffsets);
if (!sequenceMetadata.isOpen() &&
!publishingSequences.contains(sequenceMetadata.getSequenceName())) {
publishingSequences.add(sequenceMetadata.getSequenceName());
try {
@@ -1388,7 +1375,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// and after acquiring pauseLock to correctly guard against duplicate
requests
Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No
Sequences found to set end sequences");
- final SequenceMetadata latestSequence = sequences.get(sequences.size()
- 1);
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
latestSequence = sequences.get(sequences.size() - 1);
// if a partition has not been read yet (contained in
initialOffsetsSnapshot), then
// do not mark the starting sequence number as exclusive
Set<PartitionIdType> exclusivePartitions = sequenceNumbers.keySet()
@@ -1399,7 +1386,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
.collect(Collectors.toSet());
if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
- &&
latestSequence.exclusiveStartPartitions.equals(exclusivePartitions)
+ &&
latestSequence.getExclusiveStartPartitions().equals(exclusivePartitions)
&& !finish)
|| (latestSequence.getEndOffsets().equals(sequenceNumbers) &&
finish)) {
log.warn("Ignoring duplicate request, end sequences already set for
sequences [%s]", sequenceNumbers);
@@ -1419,8 +1406,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
for (Map.Entry<PartitionIdType, SequenceOffsetType> entry :
sequenceNumbers.entrySet()) {
- if
(createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey())))
- < 0) {
+ if
(createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey())))
< 0) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(
StringUtils.format(
@@ -1443,7 +1429,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
exclusiveStartingPartitions.addAll(exclusivePartitions);
// create new sequence
- final SequenceMetadata newSequence = new SequenceMetadata(
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
newSequence = new SequenceMetadata<>(
latestSequence.getSequenceId() + 1,
StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(),
latestSequence.getSequenceId() + 1),
sequenceNumbers,
@@ -1606,291 +1592,6 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return startTime;
}
- private class SequenceMetadata
- {
- private final int sequenceId;
- private final String sequenceName;
- private final Set<PartitionIdType> exclusiveStartPartitions;
- private final Set<PartitionIdType> assignments;
- private final boolean sentinel;
- private boolean checkpointed;
- /**
- * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This
lock is required because
- * {@link #setEndOffsets)} can be called by both the main thread and the
HTTP thread.
- */
- private final ReentrantLock lock = new ReentrantLock();
-
- final Map<PartitionIdType, SequenceOffsetType> startOffsets;
- final Map<PartitionIdType, SequenceOffsetType> endOffsets;
-
- @JsonCreator
- public SequenceMetadata(
- @JsonProperty("sequenceId") int sequenceId,
- @JsonProperty("sequenceName") String sequenceName,
- @JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType>
startOffsets,
- @JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType>
endOffsets,
- @JsonProperty("checkpointed") boolean checkpointed,
- @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType>
exclusiveStartPartitions
- )
- {
- Preconditions.checkNotNull(sequenceName);
- Preconditions.checkNotNull(startOffsets);
- Preconditions.checkNotNull(endOffsets);
- this.sequenceId = sequenceId;
- this.sequenceName = sequenceName;
- this.startOffsets = ImmutableMap.copyOf(startOffsets);
- this.endOffsets = new HashMap<>(endOffsets);
- this.assignments = new HashSet<>(startOffsets.keySet());
- this.checkpointed = checkpointed;
- this.sentinel = false;
- this.exclusiveStartPartitions = exclusiveStartPartitions == null
- ? Collections.emptySet()
- : exclusiveStartPartitions;
- }
-
- @JsonProperty
- public Set<PartitionIdType> getExclusiveStartPartitions()
- {
- return exclusiveStartPartitions;
- }
-
- @JsonProperty
- public int getSequenceId()
- {
- return sequenceId;
- }
-
- @JsonProperty
- public boolean isCheckpointed()
- {
- lock.lock();
- try {
- return checkpointed;
- }
- finally {
- lock.unlock();
- }
- }
-
- @JsonProperty
- public String getSequenceName()
- {
- return sequenceName;
- }
-
- @JsonProperty
- public Map<PartitionIdType, SequenceOffsetType> getStartOffsets()
- {
- return startOffsets;
- }
-
- @JsonProperty
- public Map<PartitionIdType, SequenceOffsetType> getEndOffsets()
- {
- lock.lock();
- try {
- return endOffsets;
- }
- finally {
- lock.unlock();
- }
- }
-
- @JsonProperty
- public boolean isSentinel()
- {
- return sentinel;
- }
-
- void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> newEndOffsets)
- {
- lock.lock();
- try {
- endOffsets.putAll(newEndOffsets);
- checkpointed = true;
- }
- finally {
- lock.unlock();
- }
- }
-
- void updateAssignments(Map<PartitionIdType, SequenceOffsetType>
nextPartitionOffset)
- {
- lock.lock();
- try {
- assignments.clear();
- nextPartitionOffset.forEach((key, value) -> {
- if
(endOffsets.get(key).equals(SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER)
- ||
createSequenceNumber(endOffsets.get(key)).compareTo(createSequenceNumber(nextPartitionOffset.get(key)))
- > 0) {
- assignments.add(key);
- }
- });
- }
- finally {
- lock.unlock();
- }
- }
-
- boolean isOpen()
- {
- return !assignments.isEmpty();
- }
-
- boolean canHandle(OrderedPartitionableRecord<PartitionIdType,
SequenceOffsetType> record)
- {
- lock.lock();
- try {
- final OrderedSequenceNumber<SequenceOffsetType> partitionEndOffset =
createSequenceNumber(endOffsets.get(record.getPartitionId()));
- final OrderedSequenceNumber<SequenceOffsetType> partitionStartOffset =
createSequenceNumber(startOffsets.get(
- record.getPartitionId()));
- final OrderedSequenceNumber<SequenceOffsetType> recordOffset =
createSequenceNumber(record.getSequenceNumber());
- if (!isOpen() || recordOffset == null || partitionEndOffset == null ||
partitionStartOffset == null) {
- return false;
- }
- boolean ret;
- if (isStartingSequenceOffsetsExclusive()) {
- ret = recordOffset.compareTo(partitionStartOffset)
- >=
(getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0);
- } else {
- ret = recordOffset.compareTo(partitionStartOffset) >= 0;
- }
-
- if (isEndSequenceOffsetsExclusive()) {
- ret &= recordOffset.compareTo(partitionEndOffset) < 0;
- } else {
- ret &= recordOffset.compareTo(partitionEndOffset) <= 0;
- }
-
- return ret;
- }
- finally {
- lock.unlock();
- }
- }
-
- @Override
- public String toString()
- {
- lock.lock();
- try {
- return "SequenceMetadata{" +
- "sequenceName='" + sequenceName + '\'' +
- ", sequenceId=" + sequenceId +
- ", startOffsets=" + startOffsets +
- ", endOffsets=" + endOffsets +
- ", assignments=" + assignments +
- ", sentinel=" + sentinel +
- ", checkpointed=" + checkpointed +
- '}';
- }
- finally {
- lock.unlock();
- }
- }
-
- Supplier<Committer> getCommitterSupplier(
- String stream,
- Map<PartitionIdType, SequenceOffsetType> lastPersistedOffsets
- )
- {
- // Set up committer.
- return () ->
- new Committer()
- {
- @Override
- public Object getMetadata()
- {
- lock.lock();
-
- try {
- Preconditions.checkState(
- assignments.isEmpty(),
- "This committer can be used only once all the records till
sequences [%s] have been consumed, also make"
- + " sure to call updateAssignments before using this
committer",
- endOffsets
- );
-
-
- // merge endOffsets for this sequence with globally
lastPersistedOffsets
- // This is done because this committer would be persisting
only sub set of segments
- // corresponding to the current sequence. Generally,
lastPersistedOffsets should already
- // cover endOffsets but just to be sure take max of sequences
and persist that
- for (Map.Entry<PartitionIdType, SequenceOffsetType>
partitionOffset : endOffsets.entrySet()) {
- SequenceOffsetType newOffsets = partitionOffset.getValue();
- if
(lastPersistedOffsets.containsKey(partitionOffset.getKey()) &&
-
createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey())).compareTo(
- createSequenceNumber(newOffsets)) > 0) {
- newOffsets =
lastPersistedOffsets.get(partitionOffset.getKey());
- }
- lastPersistedOffsets.put(
- partitionOffset.getKey(),
- newOffsets
- );
- }
-
- // Publish metadata can be different from persist metadata as
we are going to publish only
- // subset of segments
- return ImmutableMap.of(
- METADATA_NEXT_PARTITIONS, new
SeekableStreamPartitions<>(stream, lastPersistedOffsets),
- METADATA_PUBLISH_PARTITIONS, new
SeekableStreamPartitions<>(stream, endOffsets)
- );
- }
- finally {
- lock.unlock();
- }
- }
-
- @Override
- public void run()
- {
- // Do nothing.
- }
- };
-
- }
-
- TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean
useTransaction)
- {
- return (segments, commitMetadata) -> {
- final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType>
finalPartitions = deserializeSeekableStreamPartitionsFromMetadata(
- toolbox.getObjectMapper(),
- ((Map) Preconditions
- .checkNotNull(commitMetadata,
"commitMetadata")).get(METADATA_PUBLISH_PARTITIONS)
- );
-
- // Sanity check, we should only be publishing things that match our
desired end state.
- if
(!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
- throw new ISE(
- "WTF?! Driver for sequence [%s], attempted to publish invalid
metadata[%s].",
- toString(),
- commitMetadata
- );
- }
-
- final SegmentTransactionalInsertAction action;
-
- if (useTransaction) {
- action = new SegmentTransactionalInsertAction(
- segments,
- createDataSourceMetadata(new SeekableStreamPartitions<>(
- finalPartitions.getStream(),
- getStartOffsets()
- )),
- createDataSourceMetadata(finalPartitions)
- );
- } else {
- action = new SegmentTransactionalInsertAction(segments, null, null);
- }
-
- log.info("Publishing with isTransaction[%s].", useTransaction);
-
- return toolbox.getTaskActionClient().submit(action);
- };
- }
-
- }
-
private boolean verifyInitialRecordAndSkipExclusivePartition(
final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>
record
)
@@ -1933,7 +1634,12 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
/**
- * deserailizes the checkpoints into of Map<sequenceId, Map<PartitionIdType,
SequenceOffsetType>>
+ * checks if the input seqNum marks end of shard. Used by Kinesis only
+ */
+ protected abstract boolean isEndOfShard(SequenceOffsetType seqNum);
+
+ /**
+ * deserializes the checkpoints into of Map<sequenceId, Map<PartitionIdType,
SequenceOffsetType>>
*
* @param toolbox task toolbox
* @param checkpointsString the json-serialized checkpoint string
@@ -1949,7 +1655,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
) throws IOException;
/**
- * Calculates the sequence number used to update `currentOffsets` after
finishing reading a record.
+ * Calculates the sequence number used to update `currentOffsets` after
finished reading a record.
* In Kafka this returns sequenceNumeber + 1 since that's the next expected
offset
* In Kinesis this simply returns sequenceNumber, since the sequence numbers
in Kinesis are not
* contiguous and finding the next sequence number requires an expensive API
call
@@ -1961,14 +1667,14 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
protected abstract SequenceOffsetType
getSequenceNumberToStoreAfterRead(SequenceOffsetType sequenceNumber);
/**
- * deserialzies stored metadata into SeekableStreamPartitions
+ * deserializes stored metadata into SeekableStreamPartitions
*
* @param mapper json objectMapper
* @param object metadata
*
* @return SeekableStreamPartitions
*/
- protected abstract SeekableStreamPartitions<PartitionIdType,
SequenceOffsetType> deserializeSeekableStreamPartitionsFromMetadata(
+ protected abstract SeekableStreamPartitions<PartitionIdType,
SequenceOffsetType> deserializePartitionsFromMetadata(
ObjectMapper mapper,
Object object
);
@@ -2038,4 +1744,6 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
* partition we read from stream
*/
protected abstract boolean isStartingSequenceOffsetsExclusive();
+
+ protected abstract TypeReference<List<SequenceMetadata<PartitionIdType,
SequenceOffsetType>>> getSequenceMetadataTypeReference();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
new file mode 100644
index 0000000..7fbc800
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.indexing.common.TaskToolbox;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.java.util.common.ISE;
+import
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SequenceMetadata<PartitionIdType, SequenceOffsetType>
+{
+ private final int sequenceId;
+ private final String sequenceName;
+ private final Set<PartitionIdType> exclusiveStartPartitions;
+ private final Set<PartitionIdType> assignments;
+ private final boolean sentinel;
+ private boolean checkpointed;
+ /**
+ * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This
lock is required because
+ * {@link #setEndOffsets)} can be called by both the main thread and the
HTTP thread.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ final Map<PartitionIdType, SequenceOffsetType> startOffsets;
+ final Map<PartitionIdType, SequenceOffsetType> endOffsets;
+
+ @JsonCreator
+ public SequenceMetadata(
+ @JsonProperty("sequenceId") int sequenceId,
+ @JsonProperty("sequenceName") String sequenceName,
+ @JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType>
startOffsets,
+ @JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType>
endOffsets,
+ @JsonProperty("checkpointed") boolean checkpointed,
+ @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType>
exclusiveStartPartitions
+ )
+ {
+ Preconditions.checkNotNull(sequenceName);
+ Preconditions.checkNotNull(startOffsets);
+ Preconditions.checkNotNull(endOffsets);
+ this.sequenceId = sequenceId;
+ this.sequenceName = sequenceName;
+ this.startOffsets = ImmutableMap.copyOf(startOffsets);
+ this.endOffsets = new HashMap<>(endOffsets);
+ this.assignments = new HashSet<>(startOffsets.keySet());
+ this.checkpointed = checkpointed;
+ this.sentinel = false;
+ this.exclusiveStartPartitions = exclusiveStartPartitions == null
+ ? Collections.emptySet()
+ : exclusiveStartPartitions;
+ }
+
+ @JsonProperty
+ public Set<PartitionIdType> getExclusiveStartPartitions()
+ {
+ return exclusiveStartPartitions;
+ }
+
+ @JsonProperty
+ public int getSequenceId()
+ {
+ return sequenceId;
+ }
+
+ @JsonProperty
+ public boolean isCheckpointed()
+ {
+ lock.lock();
+ try {
+ return checkpointed;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @JsonProperty
+ public String getSequenceName()
+ {
+ return sequenceName;
+ }
+
+ @JsonProperty
+ public Map<PartitionIdType, SequenceOffsetType> getStartOffsets()
+ {
+ return startOffsets;
+ }
+
+ @JsonProperty
+ public Map<PartitionIdType, SequenceOffsetType> getEndOffsets()
+ {
+ lock.lock();
+ try {
+ return endOffsets;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @JsonProperty
+ public boolean isSentinel()
+ {
+ return sentinel;
+ }
+
+ void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> newEndOffsets)
+ {
+ lock.lock();
+ try {
+ endOffsets.putAll(newEndOffsets);
+ checkpointed = true;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ void updateAssignments(
+ SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType>
runner,
+ Map<PartitionIdType, SequenceOffsetType> nextPartitionOffset
+ )
+ {
+ lock.lock();
+ try {
+ assignments.clear();
+ nextPartitionOffset.forEach((key, value) -> {
+ SequenceOffsetType endOffset = endOffsets.get(key);
+ if (SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset)
+ ||
runner.createSequenceNumber(endOffset).compareTo(runner.createSequenceNumber(nextPartitionOffset.get(key)))
> 0) {
+ assignments.add(key);
+ }
+ });
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ boolean isOpen()
+ {
+ return !assignments.isEmpty();
+ }
+
+ boolean canHandle(
+ SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType>
runner,
+ OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record
+ )
+ {
+ lock.lock();
+ try {
+ final OrderedSequenceNumber<SequenceOffsetType> partitionEndOffset =
runner.createSequenceNumber(endOffsets.get(record.getPartitionId()));
+ final OrderedSequenceNumber<SequenceOffsetType> partitionStartOffset =
runner.createSequenceNumber(startOffsets.get(
+ record.getPartitionId()));
+ final OrderedSequenceNumber<SequenceOffsetType> recordOffset =
runner.createSequenceNumber(record.getSequenceNumber());
+ if (!isOpen() || recordOffset == null || partitionEndOffset == null ||
partitionStartOffset == null) {
+ return false;
+ }
+ boolean ret;
+ if (runner.isStartingSequenceOffsetsExclusive()) {
+ ret = recordOffset.compareTo(partitionStartOffset)
+ >=
(getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0);
+ } else {
+ ret = recordOffset.compareTo(partitionStartOffset) >= 0;
+ }
+
+ if (runner.isEndSequenceOffsetsExclusive()) {
+ ret &= recordOffset.compareTo(partitionEndOffset) < 0;
+ } else {
+ ret &= recordOffset.compareTo(partitionEndOffset) <= 0;
+ }
+
+ return ret;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ lock.lock();
+ try {
+ return "SequenceMetadata{" +
+ "sequenceName='" + sequenceName + '\'' +
+ ", sequenceId=" + sequenceId +
+ ", startOffsets=" + startOffsets +
+ ", endOffsets=" + endOffsets +
+ ", assignments=" + assignments +
+ ", sentinel=" + sentinel +
+ ", checkpointed=" + checkpointed +
+ '}';
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ Supplier<Committer> getCommitterSupplier(
+ SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType>
runner,
+ String stream,
+ Map<PartitionIdType, SequenceOffsetType> lastPersistedOffsets
+ )
+ {
+ // Set up committer.
+ return () ->
+ new Committer()
+ {
+ @Override
+ public Object getMetadata()
+ {
+ lock.lock();
+
+ try {
+ Preconditions.checkState(
+ assignments.isEmpty(),
+ "This committer can be used only once all the records till
sequences [%s] have been consumed, also make"
+ + " sure to call updateAssignments before using this
committer",
+ endOffsets
+ );
+
+
+ // merge endOffsets for this sequence with globally
lastPersistedOffsets
+ // This is done because this committer would be persisting only
sub set of segments
+ // corresponding to the current sequence. Generally,
lastPersistedOffsets should already
+ // cover endOffsets but just to be sure take max of sequences
and persist that
+ for (Map.Entry<PartitionIdType, SequenceOffsetType>
partitionOffset : endOffsets.entrySet()) {
+ SequenceOffsetType newOffsets = partitionOffset.getValue();
+ if (lastPersistedOffsets.containsKey(partitionOffset.getKey())
+ &&
runner.createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey()))
+
.compareTo(runner.createSequenceNumber(newOffsets)) > 0) {
+ newOffsets =
lastPersistedOffsets.get(partitionOffset.getKey());
+ }
+ lastPersistedOffsets.put(
+ partitionOffset.getKey(),
+ newOffsets
+ );
+ }
+
+ // Publish metadata can be different from persist metadata as we
are going to publish only
+ // subset of segments
+ return ImmutableMap.of(
+ SeekableStreamIndexTaskRunner.METADATA_NEXT_PARTITIONS,
+ new SeekableStreamPartitions<>(stream, lastPersistedOffsets),
+ SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS,
+ new SeekableStreamPartitions<>(stream, endOffsets)
+ );
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ // Do nothing.
+ }
+ };
+
+ }
+
+ TransactionalSegmentPublisher createPublisher(
+ SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType>
runner,
+ TaskToolbox toolbox,
+ boolean useTransaction
+ )
+ {
+ return (segments, commitMetadata) -> {
+ final Map commitMetaMap = (Map)
Preconditions.checkNotNull(commitMetadata, "commitMetadata");
+ final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType>
finalPartitions =
+ runner.deserializePartitionsFromMetadata(
+ toolbox.getObjectMapper(),
+
commitMetaMap.get(SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS)
+ );
+
+ // Sanity check, we should only be publishing things that match our
desired end state.
+ if
(!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
+ throw new ISE(
+ "WTF?! Driver for sequence [%s], attempted to publish invalid
metadata[%s].",
+ toString(),
+ commitMetadata
+ );
+ }
+
+ final SegmentTransactionalInsertAction action;
+
+ if (useTransaction) {
+ action = new SegmentTransactionalInsertAction(
+ segments,
+ runner.createDataSourceMetadata(
+ new SeekableStreamPartitions<>(finalPartitions.getStream(),
getStartOffsets())
+ ),
+ runner.createDataSourceMetadata(finalPartitions)
+ );
+ } else {
+ action = new SegmentTransactionalInsertAction(segments, null, null);
+ }
+
+ return toolbox.getTaskActionClient().submit(action);
+ };
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 25250ac..b93d15f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -119,8 +119,7 @@ import java.util.stream.Stream;
* @param <PartitionIdType> the type of the partition id, for example,
partitions in Kafka are int type while partitions in Kinesis are String type
* @param <SequenceOffsetType> the type of the sequence number or offsets, for
example, Kafka uses long offsets while Kinesis uses String sequence numbers
*/
-public abstract class SeekableStreamSupervisor<PartitionIdType,
SequenceOffsetType>
- implements Supervisor
+public abstract class SeekableStreamSupervisor<PartitionIdType,
SequenceOffsetType> implements Supervisor
{
public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED =
"IS_INCREMENTAL_HANDOFF_SUPPORTED";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]