This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new 49796a9  fix SequenceMetadata deserialization (#7256) (#7262)
49796a9 is described below

commit 49796a917120256bcf39c6f3d2446c066a1c2e39
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Mar 13 20:54:54 2019 -0700

    fix SequenceMetadata deserialization (#7256) (#7262)
    
    * wip
    
    * fix tests, stop reading if we are at end offset
    
    * fix build
    
    * remove restore at end offsets fix in favor of a separate PR
    
    * use typereference from method for serialization too
---
 .../IncrementalPublishingKafkaIndexTaskRunner.java |  11 +-
 .../indexing/kafka/LegacyKafkaIndexTaskRunner.java |  12 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 171 ++++++++--
 .../indexing/kinesis/KinesisIndexTaskRunner.java   |  11 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     | 247 +++++++++++---
 .../SeekableStreamIndexTaskRunner.java             | 360 ++-------------------
 .../indexing/seekablestream/SequenceMetadata.java  | 335 +++++++++++++++++++
 .../supervisor/SeekableStreamSupervisor.java       |   3 +-
 8 files changed, 747 insertions(+), 403 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 6424c29..ae092d5 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -28,6 +28,7 @@ import 
org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -111,7 +112,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner 
extends SeekableStreamInd
   }
 
   @Override
-  protected SeekableStreamPartitions<Integer, Long> 
deserializeSeekableStreamPartitionsFromMetadata(
+  protected SeekableStreamPartitions<Integer, Long> 
deserializePartitionsFromMetadata(
       ObjectMapper mapper,
       Object object
   )
@@ -225,6 +226,14 @@ public class IncrementalPublishingKafkaIndexTaskRunner 
extends SeekableStreamInd
     return false;
   }
 
+  @Override
+  public TypeReference<List<SequenceMetadata<Integer, Long>>> 
getSequenceMetadataTypeReference()
+  {
+    return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
+    {
+    };
+  }
+
   @Nullable
   @Override
   protected TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index b065361..61ebf49 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.kafka;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -50,6 +51,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -627,6 +629,14 @@ public class LegacyKafkaIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<In
     return false;
   }
 
+  @Override
+  public TypeReference<List<SequenceMetadata<Integer, Long>>> 
getSequenceMetadataTypeReference()
+  {
+    return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
+    {
+    };
+  }
+
   @Nonnull
   @Override
   protected List<OrderedPartitionableRecord<Integer, Long>> getRecords(
@@ -726,7 +736,7 @@ public class LegacyKafkaIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<In
 
 
   @Override
-  protected SeekableStreamPartitions<Integer, Long> 
deserializeSeekableStreamPartitionsFromMetadata(
+  protected SeekableStreamPartitions<Integer, Long> 
deserializePartitionsFromMetadata(
       ObjectMapper mapper,
       Object object
   )
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 76433c2..a2224e5 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -282,6 +282,26 @@ public class KafkaIndexTaskTest
     );
   }
 
+  private static List<ProducerRecord<byte[], byte[]>> 
generateSinglePartitionRecords(String topic)
+  {
+    return ImmutableList.of(
+        new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2008", "A", "x", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "B", "x", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2010", "C", "x", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "D", "x", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "x", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2012", "E", "x", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "x", "10", 
"20.0", "1.0"))
+    );
+  }
+
   private static String getTopicName()
   {
     return "topic" + topicPostfix++;
@@ -863,23 +883,7 @@ public class KafkaIndexTaskTest
     if (!isIncrementalHandoffSupported) {
       return;
     }
-
-    List<ProducerRecord<byte[], byte[]>> records = ImmutableList.of(
-        new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2008", "A", "x", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2009", "B", "x", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2010", "C", "x", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2011", "D", "x", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "x", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2012", "E", "x", "10", 
"20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "x", "10", 
"20.0", "1.0"))
-    );
+    records = generateSinglePartitionRecords(topic);
 
     final String baseSequenceName = "sequence0";
     // as soon as any segment has more than one record, incremental publishing 
should happen
@@ -894,22 +898,14 @@ public class KafkaIndexTaskTest
     Map<String, Object> consumerProps = kafkaServer.consumerProperties();
     consumerProps.put("max.poll.records", "1");
 
-    final SeekableStreamPartitions<Integer, Long> startPartitions = new 
SeekableStreamPartitions<>(
-        topic,
-        ImmutableMap.of(0, 0L)
-    );
-    final SeekableStreamPartitions<Integer, Long> checkpoint1 = new 
SeekableStreamPartitions<>(
-        topic,
-        ImmutableMap.of(0, 5L)
-    );
-    final SeekableStreamPartitions<Integer, Long> checkpoint2 = new 
SeekableStreamPartitions<>(
-        topic,
-        ImmutableMap.of(0, 12L)
-    );
-    final SeekableStreamPartitions<Integer, Long> endPartitions = new 
SeekableStreamPartitions<>(
-        topic,
-        ImmutableMap.of(0, Long.MAX_VALUE)
-    );
+    final SeekableStreamPartitions<Integer, Long> startPartitions =
+        new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L));
+    final SeekableStreamPartitions<Integer, Long> checkpoint1 =
+        new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L));
+    final SeekableStreamPartitions<Integer, Long> checkpoint2 =
+        new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L));
+    final SeekableStreamPartitions<Integer, Long> endPartitions =
+        new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 
Long.MAX_VALUE));
 
     final KafkaIndexTask task = createTask(
         null,
@@ -1924,6 +1920,115 @@ public class KafkaIndexTaskTest
   }
 
   @Test(timeout = 60_000L)
+  public void testRestoreAfterPersistingSequences() throws Exception
+  {
+    if (!isIncrementalHandoffSupported) {
+      return;
+    }
+
+    records = generateSinglePartitionRecords(topic);
+    maxRowsPerSegment = 2;
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final KafkaIndexTask task1 = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence0",
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L)),
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            false
+        )
+    );
+
+    final SeekableStreamPartitions<Integer, Long> checkpoint = new 
SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(0, 5L)
+    );
+
+    final ListenableFuture<TaskStatus> future1 = runTask(task1);
+
+    // Insert some data, but not enough for the task to finish
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+      for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 
5)) {
+        kafkaProducer.send(record).get();
+      }
+    }
+
+    while (task1.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    final Map<Integer, Long> currentOffsets = 
ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint.getPartitionSequenceNumberMap(), 
currentOffsets);
+    // Set endOffsets to persist sequences
+    task1.getRunner().setEndOffsets(ImmutableMap.of(0, 5L), false);
+
+    // Stop without publishing segment
+    task1.stopGracefully(toolboxFactory.build(task1).getConfig());
+    unlockAppenderatorBasePersistDirForTask(task1);
+
+    Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
+
+    // Start a new task
+    final KafkaIndexTask task2 = createTask(
+        task1.getId(),
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence0",
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L)),
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            false
+        )
+    );
+
+    final ListenableFuture<TaskStatus> future2 = runTask(task2);
+    // Wait for the task to start reading
+
+    // Insert remaining data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+      for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 5)) 
{
+        kafkaProducer.send(record).get();
+      }
+    }
+
+    // Wait for task to exit
+    Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
+
+    // Check metrics
+    Assert.assertEquals(5, 
task1.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, 
task1.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, 
task1.getRunner().getRowIngestionMeters().getThrownAway());
+    Assert.assertEquals(4, 
task2.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, 
task2.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, 
task2.getRunner().getRowIngestionMeters().getThrownAway());
+
+    // Check published segments & metadata
+    SegmentDescriptor desc1 = SD(task1, "2008/P1D", 0);
+    SegmentDescriptor desc2 = SD(task1, "2008/P1D", 1);
+    SegmentDescriptor desc3 = SD(task1, "2009/P1D", 0);
+    SegmentDescriptor desc4 = SD(task1, "2009/P1D", 1);
+    SegmentDescriptor desc5 = SD(task1, "2010/P1D", 0);
+    SegmentDescriptor desc6 = SD(task1, "2011/P1D", 0);
+    SegmentDescriptor desc7 = SD(task1, "2012/P1D", 0);
+    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, 
desc6, desc7), publishedDescriptors());
+    Assert.assertEquals(
+        new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, 
ImmutableMap.of(0, 9L))),
+        
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+    );
+  }
+
+  @Test(timeout = 60_000L)
   public void testRunWithPauseAndResume() throws Exception
   {
     final KafkaIndexTask task = createTask(
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index 3e7e5e7..50326f7 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -28,6 +28,7 @@ import 
org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -92,7 +93,7 @@ public class KinesisIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<String
   }
 
   @Override
-  protected SeekableStreamPartitions<String, String> 
deserializeSeekableStreamPartitionsFromMetadata(
+  protected SeekableStreamPartitions<String, String> 
deserializePartitionsFromMetadata(
       ObjectMapper mapper,
       Object object
   )
@@ -176,6 +177,14 @@ public class KinesisIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<String
     return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum);
   }
 
+  @Override
+  public TypeReference<List<SequenceMetadata<String, String>>> 
getSequenceMetadataTypeReference()
+  {
+    return new TypeReference<List<SequenceMetadata<String, String>>>()
+    {
+    };
+  }
+
   @Nullable
   @Override
   protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 58fa72a..f60fe50 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -199,28 +199,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
   private static String shardId1 = "1";
   private static String shardId0 = "0";
   private static KinesisRecordSupplier recordSupplier;
-  private static List<OrderedPartitionableRecord<String, String>> records = 
ImmutableList.of(
-      new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a", "y", 
"10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b", "y", 
"10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c", "y", 
"10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d", "y", 
"10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e", "y", 
"10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(
-          stream,
-          "1",
-          "5",
-          JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
-      ),
-      new OrderedPartitionableRecord<>(stream, "1", "6", 
Collections.singletonList(StringUtils.toUtf8("unparseable"))),
-      new OrderedPartitionableRecord<>(stream, "1", "7", 
Collections.singletonList(StringUtils.toUtf8("unparseable2"))),
-      new OrderedPartitionableRecord<>(stream, "1", "8", 
Collections.singletonList(StringUtils.toUtf8("{}"))),
-      new OrderedPartitionableRecord<>(stream, "1", "9", JB("2013", "f", "y", 
"10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "10", JB("2049", "f", "y", 
"notanumber", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "11", JB("2049", "f", "y", 
"10", "notanumber", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "1", "12", JB("2049", "f", "y", 
"10", "20.0", "notanumber")),
-      new OrderedPartitionableRecord<>(stream, "0", "0", JB("2012", "g", "y", 
"10", "20.0", "1.0")),
-      new OrderedPartitionableRecord<>(stream, "0", "1", JB("2011", "h", "y", 
"10", "20.0", "1.0"))
-  );
+  private static List<OrderedPartitionableRecord<String, String>> records;
 
   private static ServiceEmitter emitter;
   private static ListeningExecutorService taskExec;
@@ -316,6 +295,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     maxSavedParseExceptions = null;
     skipAvailabilityCheck = false;
     doHandoff = true;
+    records = generateRecords(stream);
     reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" + 
System.currentTimeMillis(), "json");
     maxRecordsPerPoll = 1;
 
@@ -348,6 +328,52 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     emitter.close();
   }
 
+  private static List<OrderedPartitionableRecord<String, String>> 
generateRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "5",
+            JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+        ),
+        new OrderedPartitionableRecord<>(stream, "1", "6", 
Collections.singletonList(StringUtils.toUtf8("unparseable"))),
+        new OrderedPartitionableRecord<>(stream, "1", "7", 
Collections.singletonList(StringUtils.toUtf8("unparseable2"))),
+        new OrderedPartitionableRecord<>(stream, "1", "8", 
Collections.singletonList(StringUtils.toUtf8("{}"))),
+        new OrderedPartitionableRecord<>(stream, "1", "9", JB("2013", "f", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "10", JB("2049", "f", 
"y", "notanumber", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "11", JB("2049", "f", 
"y", "10", "notanumber", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "12", JB("2049", "f", 
"y", "10", "20.0", "notanumber")),
+        new OrderedPartitionableRecord<>(stream, "0", "0", JB("2012", "g", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "0", "1", JB("2011", "h", 
"y", "10", "20.0", "1.0"))
+    );
+  }
+
+  private static List<OrderedPartitionableRecord<String, String>> 
generateSinglePartitionRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "5", JB("2012", "a", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "6", JB("2013", "b", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "7", JB("2010", "c", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "8", JB("2011", "d", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "9", JB("2011", "e", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "10", JB("2008", "a", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "11", JB("2009", "b", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "12", JB("2010", "c", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "13", JB("2012", "d", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "14", JB("2013", "e", 
"y", "10", "20.0", "1.0"))
+    );
+  }
   @Test(timeout = 120_000L)
   public void testRunAfterDataInserted() throws Exception
   {
@@ -2214,6 +2240,165 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
     Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", 
desc2));
   }
 
+  @Test(timeout = 120_000L)
+  public void testRestoreAfterPersistingSequences() throws Exception
+  {
+    maxRowsPerSegment = 2;
+    maxRecordsPerPoll = 1;
+    records = generateSinglePartitionRecords(stream);
+
+    recordSupplier.assign(anyObject());
+    expectLastCall().anyTimes();
+
+    
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+
+    recordSupplier.seek(anyObject(), anyString());
+    expectLastCall().anyTimes();
+
+    // simulate 1 record at a time
+    
expect(recordSupplier.poll(anyLong())).andReturn(Collections.singletonList(records.get(0)))
+                                          .once()
+                                          
.andReturn(Collections.singletonList(records.get(1)))
+                                          .once()
+                                          
.andReturn(Collections.singletonList(records.get(2)))
+                                          .once()
+                                          
.andReturn(Collections.singletonList(records.get(3)))
+                                          .once()
+                                          
.andReturn(Collections.singletonList(records.get(4)))
+                                          .once()
+                                          .andReturn(Collections.emptyList())
+                                          .anyTimes();
+
+    replayAll();
+
+    final KinesisIndexTask task1 = createTask(
+        "task1",
+        new KinesisIndexTaskIOConfig(
+            null,
+            "sequence0",
+            new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+                shardId1,
+                "0"
+            )),
+            new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+                shardId1,
+                "6"
+            )),
+            true,
+            null,
+            null,
+            "awsEndpoint",
+            null,
+            null,
+            null,
+            null,
+            null,
+            false
+        )
+    );
+
+    final SeekableStreamPartitions<String, String> checkpoint1 = new 
SeekableStreamPartitions<>(
+        stream,
+        ImmutableMap.of(shardId1, "4")
+    );
+
+    final ListenableFuture<TaskStatus> future1 = runTask(task1);
+
+    while (task1.getRunner().getStatus() != 
SeekableStreamIndexTaskRunner.Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    final Map<String, String> currentOffsets = 
ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), 
currentOffsets);
+    task1.getRunner().setEndOffsets(currentOffsets, false);
+
+    // Stop without publishing segment
+    task1.stopGracefully(toolboxFactory.build(task1).getConfig());
+    unlockAppenderatorBasePersistDirForTask(task1);
+
+    Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
+
+    verifyAll();
+    reset(recordSupplier);
+
+    recordSupplier.assign(anyObject());
+    expectLastCall().anyTimes();
+
+    
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+
+
+    recordSupplier.seek(anyObject(), anyString());
+    expectLastCall().anyTimes();
+
+    
expect(recordSupplier.poll(anyLong())).andReturn(Collections.singletonList(records.get(5)))
+                                          .once()
+                                          
.andReturn(Collections.singletonList(records.get(6)))
+                                          .once()
+                                          .andReturn(Collections.emptyList())
+                                          .anyTimes();
+
+    recordSupplier.close();
+    expectLastCall();
+
+    replayAll();
+
+    // Start a new task
+    final KinesisIndexTask task2 = createTask(
+        task1.getId(),
+        new KinesisIndexTaskIOConfig(
+            null,
+            "sequence0",
+            new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+                shardId1,
+                "0"
+            )),
+            new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+                shardId1,
+                "6"
+            )),
+            true,
+            null,
+            null,
+            "awsEndpoint",
+            null,
+            null,
+            ImmutableSet.of(shardId1),
+            null,
+            null,
+            false
+        )
+    );
+
+    final ListenableFuture<TaskStatus> future2 = runTask(task2);
+
+    // Wait for task to exit
+    Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
+
+    verifyAll();
+
+    // Check metrics
+    Assert.assertEquals(5, 
task1.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, 
task1.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, 
task1.getRunner().getRowIngestionMeters().getThrownAway());
+    Assert.assertEquals(1, 
task2.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, 
task2.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, 
task2.getRunner().getRowIngestionMeters().getThrownAway());
+
+    // Check published segments & metadata
+    SegmentDescriptor desc1 = SD(task1, "2008/P1D", 0);
+    SegmentDescriptor desc2 = SD(task1, "2009/P1D", 0);
+    SegmentDescriptor desc3 = SD(task1, "2010/P1D", 0);
+    SegmentDescriptor desc4 = SD(task1, "2011/P1D", 0);
+    SegmentDescriptor desc5 = SD(task1, "2013/P1D", 0);
+    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5), 
publishedDescriptors());
+    Assert.assertEquals(
+        new KinesisDataSourceMetadata(
+            new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+                shardId1,
+                "6"
+            ))),
+        
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+    );
+  }
 
   @Test(timeout = 120_000L)
   public void testRunWithPauseAndResume() throws Exception
@@ -2427,23 +2612,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
   @Test(timeout = 5000L)
   public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
   {
-    final List<OrderedPartitionableRecord<String, String>> records = 
ImmutableList.of(
-        new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "5", JB("2012", "a", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "6", JB("2013", "b", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "7", JB("2010", "c", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "8", JB("2011", "d", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "9", JB("2011", "e", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "10", JB("2008", "a", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "11", JB("2009", "b", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "12", JB("2010", "c", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "13", JB("2012", "d", 
"y", "10", "20.0", "1.0")),
-        new OrderedPartitionableRecord<>(stream, "1", "14", JB("2013", "e", 
"y", "10", "20.0", "1.0"))
-    );
+    records = generateSinglePartitionRecords(stream);
 
     final String baseSequenceName = "sequence0";
     // as soon as any segment has more than one record, incremental publishing 
should happen
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 9c5352f..00c708c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -20,8 +20,6 @@
 package org.apache.druid.indexing.seekablestream;
 
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -53,7 +51,6 @@ import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import 
org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
 import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
-import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
@@ -75,7 +72,6 @@ import 
org.apache.druid.segment.realtime.appenderator.Appenderator;
 import 
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
 import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
-import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.Access;
@@ -142,8 +138,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   }
 
   private static final EmittingLogger log = new 
EmittingLogger(SeekableStreamIndexTaskRunner.class);
-  private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
-  private static final String METADATA_PUBLISH_PARTITIONS = 
"publishPartitions";
+  static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
+  static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
 
   private final Map<PartitionIdType, SequenceOffsetType> endOffsets;
   private final ConcurrentMap<PartitionIdType, SequenceOffsetType> currOffsets 
= new ConcurrentHashMap<>();
@@ -210,7 +206,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   protected volatile boolean pauseRequested = false;
   private volatile long nextCheckpointTime;
 
-  private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
+  private volatile CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, 
SequenceOffsetType>> sequences;
   private volatile Throwable backgroundThreadException;
 
   public SeekableStreamIndexTaskRunner(
@@ -276,7 +272,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> previous 
= sequenceOffsets.next();
         while (sequenceOffsets.hasNext()) {
           Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> current 
= sequenceOffsets.next();
-          sequences.add(new SequenceMetadata(
+          sequences.add(new SequenceMetadata<>(
               previous.getKey(),
               StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 
previous.getKey()),
               previous.getValue(),
@@ -287,7 +283,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           previous = current;
           exclusive = true;
         }
-        sequences.add(new SequenceMetadata(
+        sequences.add(new SequenceMetadata<>(
             previous.getKey(),
             StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 
previous.getKey()),
             previous.getValue(),
@@ -296,7 +292,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
             exclusive ? previous.getValue().keySet() : null
         ));
       } else {
-        sequences.add(new SequenceMetadata(
+        sequences.add(new SequenceMetadata<>(
             0,
             StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
             ioConfig.getStartPartitions().getPartitionSequenceNumberMap(),
@@ -369,7 +365,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       } else {
         @SuppressWarnings("unchecked")
         final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
-        final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> 
restoredNextPartitions = deserializeSeekableStreamPartitionsFromMetadata(
+        final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> 
restoredNextPartitions = deserializePartitionsFromMetadata(
             toolbox.getObjectMapper(),
             restoredMetadataMap.get(METADATA_NEXT_PARTITIONS)
         );
@@ -553,9 +549,9 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                 }
                 boolean isPersistRequired = false;
 
-                final SequenceMetadata sequenceToUse = sequences
+                final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceToUse = sequences
                     .stream()
-                    .filter(sequenceMetadata -> 
sequenceMetadata.canHandle(record))
+                    .filter(sequenceMetadata -> 
sequenceMetadata.canHandle(this, record))
                     .findFirst()
                     .orElse(null);
 
@@ -702,11 +698,11 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         status = Status.PUBLISHING;
       }
 
-      for (SequenceMetadata sequenceMetadata : sequences) {
+      for (SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata : sequences) {
         if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) 
{
           // this is done to prevent checks in sequence specific commit 
supplier from failing
           sequenceMetadata.setEndOffsets(currOffsets);
-          sequenceMetadata.updateAssignments(currOffsets);
+          sequenceMetadata.updateAssignments(this, currOffsets);
           publishingSequences.add(sequenceMetadata.getSequenceName());
           // persist already done in finally, so directly add to publishQueue
           publishAndRegisterHandoff(sequenceMetadata);
@@ -822,11 +818,6 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return TaskStatus.success(task.getId());
   }
 
-  /**
-   * checks if the input seqNum marks end of shard. Used by Kinesis only
-   */
-  protected abstract boolean isEndOfShard(SequenceOffsetType seqNum);
-
   private void checkPublishAndHandoffFailure() throws ExecutionException, 
InterruptedException
   {
     // Check if any publishFuture failed.
@@ -856,14 +847,14 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     handOffWaitList.removeAll(handoffFinished);
   }
 
-  private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata)
+  private void publishAndRegisterHandoff(SequenceMetadata<PartitionIdType, 
SequenceOffsetType> sequenceMetadata)
   {
     log.info("Publishing segments for sequence [%s]", sequenceMetadata);
 
     final ListenableFuture<SegmentsAndMetadata> publishFuture = 
Futures.transform(
         driver.publish(
-            sequenceMetadata.createPublisher(toolbox, 
ioConfig.isUseTransaction()),
-            sequenceMetadata.getCommitterSupplier(stream, 
lastPersistedOffsets).get(),
+            sequenceMetadata.createPublisher(this, toolbox, 
ioConfig.isUseTransaction()),
+            sequenceMetadata.getCommitterSupplier(this, stream, 
lastPersistedOffsets).get(),
             Collections.singletonList(sequenceMetadata.getSequenceName())
         ),
         (Function<SegmentsAndMetadata, SegmentsAndMetadata>) 
publishedSegmentsAndMetadata -> {
@@ -948,11 +939,9 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     final File sequencesPersistFile = getSequencesPersistFile(toolbox);
     if (sequencesPersistFile.exists()) {
       sequences = new CopyOnWriteArrayList<>(
-          toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
+          toolbox.getObjectMapper().<List<SequenceMetadata<PartitionIdType, 
SequenceOffsetType>>>readValue(
               sequencesPersistFile,
-              new TypeReference<List<SequenceMetadata>>()
-              {
-              }
+              getSequenceMetadataTypeReference()
           )
       );
       return true;
@@ -965,9 +954,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   {
     log.info("Persisting Sequences Metadata [%s]", sequences);
     toolbox.getObjectMapper().writerWithType(
-        new TypeReference<List<SequenceMetadata>>()
-        {
-        }
+        getSequenceMetadataTypeReference()
     ).writeValue(getSequencesPersistFile(toolbox), sequences);
   }
 
@@ -1012,8 +999,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   private void maybePersistAndPublishSequences(Supplier<Committer> 
committerSupplier)
       throws InterruptedException
   {
-    for (SequenceMetadata sequenceMetadata : sequences) {
-      sequenceMetadata.updateAssignments(currOffsets);
+    for (SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata : sequences) {
+      sequenceMetadata.updateAssignments(this, currOffsets);
       if (!sequenceMetadata.isOpen() && 
!publishingSequences.contains(sequenceMetadata.getSequenceName())) {
         publishingSequences.add(sequenceMetadata.getSequenceName());
         try {
@@ -1388,7 +1375,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         // and after acquiring pauseLock to correctly guard against duplicate 
requests
         Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No 
Sequences found to set end sequences");
 
-        final SequenceMetadata latestSequence = sequences.get(sequences.size() 
- 1);
+        final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
latestSequence = sequences.get(sequences.size() - 1);
         // if a partition has not been read yet (contained in 
initialOffsetsSnapshot), then
         // do not mark the starting sequence number as exclusive
         Set<PartitionIdType> exclusivePartitions = sequenceNumbers.keySet()
@@ -1399,7 +1386,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                                                                   
.collect(Collectors.toSet());
 
         if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
-             && 
latestSequence.exclusiveStartPartitions.equals(exclusivePartitions)
+             && 
latestSequence.getExclusiveStartPartitions().equals(exclusivePartitions)
              && !finish)
             || (latestSequence.getEndOffsets().equals(sequenceNumbers) && 
finish)) {
           log.warn("Ignoring duplicate request, end sequences already set for 
sequences [%s]", sequenceNumbers);
@@ -1419,8 +1406,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         }
 
         for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
sequenceNumbers.entrySet()) {
-          if 
(createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey())))
-              < 0) {
+          if 
(createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey())))
 < 0) {
             return Response.status(Response.Status.BAD_REQUEST)
                            .entity(
                                StringUtils.format(
@@ -1443,7 +1429,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           exclusiveStartingPartitions.addAll(exclusivePartitions);
 
           // create new sequence
-          final SequenceMetadata newSequence = new SequenceMetadata(
+          final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
newSequence = new SequenceMetadata<>(
               latestSequence.getSequenceId() + 1,
               StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), 
latestSequence.getSequenceId() + 1),
               sequenceNumbers,
@@ -1606,291 +1592,6 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return startTime;
   }
 
-  private class SequenceMetadata
-  {
-    private final int sequenceId;
-    private final String sequenceName;
-    private final Set<PartitionIdType> exclusiveStartPartitions;
-    private final Set<PartitionIdType> assignments;
-    private final boolean sentinel;
-    private boolean checkpointed;
-    /**
-     * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This 
lock is required because
-     * {@link #setEndOffsets)} can be called by both the main thread and the 
HTTP thread.
-     */
-    private final ReentrantLock lock = new ReentrantLock();
-
-    final Map<PartitionIdType, SequenceOffsetType> startOffsets;
-    final Map<PartitionIdType, SequenceOffsetType> endOffsets;
-
-    @JsonCreator
-    public SequenceMetadata(
-        @JsonProperty("sequenceId") int sequenceId,
-        @JsonProperty("sequenceName") String sequenceName,
-        @JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType> 
startOffsets,
-        @JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType> 
endOffsets,
-        @JsonProperty("checkpointed") boolean checkpointed,
-        @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> 
exclusiveStartPartitions
-    )
-    {
-      Preconditions.checkNotNull(sequenceName);
-      Preconditions.checkNotNull(startOffsets);
-      Preconditions.checkNotNull(endOffsets);
-      this.sequenceId = sequenceId;
-      this.sequenceName = sequenceName;
-      this.startOffsets = ImmutableMap.copyOf(startOffsets);
-      this.endOffsets = new HashMap<>(endOffsets);
-      this.assignments = new HashSet<>(startOffsets.keySet());
-      this.checkpointed = checkpointed;
-      this.sentinel = false;
-      this.exclusiveStartPartitions = exclusiveStartPartitions == null
-                                      ? Collections.emptySet()
-                                      : exclusiveStartPartitions;
-    }
-
-    @JsonProperty
-    public Set<PartitionIdType> getExclusiveStartPartitions()
-    {
-      return exclusiveStartPartitions;
-    }
-
-    @JsonProperty
-    public int getSequenceId()
-    {
-      return sequenceId;
-    }
-
-    @JsonProperty
-    public boolean isCheckpointed()
-    {
-      lock.lock();
-      try {
-        return checkpointed;
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    @JsonProperty
-    public String getSequenceName()
-    {
-      return sequenceName;
-    }
-
-    @JsonProperty
-    public Map<PartitionIdType, SequenceOffsetType> getStartOffsets()
-    {
-      return startOffsets;
-    }
-
-    @JsonProperty
-    public Map<PartitionIdType, SequenceOffsetType> getEndOffsets()
-    {
-      lock.lock();
-      try {
-        return endOffsets;
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    @JsonProperty
-    public boolean isSentinel()
-    {
-      return sentinel;
-    }
-
-    void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> newEndOffsets)
-    {
-      lock.lock();
-      try {
-        endOffsets.putAll(newEndOffsets);
-        checkpointed = true;
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    void updateAssignments(Map<PartitionIdType, SequenceOffsetType> 
nextPartitionOffset)
-    {
-      lock.lock();
-      try {
-        assignments.clear();
-        nextPartitionOffset.forEach((key, value) -> {
-          if 
(endOffsets.get(key).equals(SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER)
-              || 
createSequenceNumber(endOffsets.get(key)).compareTo(createSequenceNumber(nextPartitionOffset.get(key)))
-                 > 0) {
-            assignments.add(key);
-          }
-        });
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    boolean isOpen()
-    {
-      return !assignments.isEmpty();
-    }
-
-    boolean canHandle(OrderedPartitionableRecord<PartitionIdType, 
SequenceOffsetType> record)
-    {
-      lock.lock();
-      try {
-        final OrderedSequenceNumber<SequenceOffsetType> partitionEndOffset = 
createSequenceNumber(endOffsets.get(record.getPartitionId()));
-        final OrderedSequenceNumber<SequenceOffsetType> partitionStartOffset = 
createSequenceNumber(startOffsets.get(
-            record.getPartitionId()));
-        final OrderedSequenceNumber<SequenceOffsetType> recordOffset = 
createSequenceNumber(record.getSequenceNumber());
-        if (!isOpen() || recordOffset == null || partitionEndOffset == null || 
partitionStartOffset == null) {
-          return false;
-        }
-        boolean ret;
-        if (isStartingSequenceOffsetsExclusive()) {
-          ret = recordOffset.compareTo(partitionStartOffset)
-                >= 
(getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0);
-        } else {
-          ret = recordOffset.compareTo(partitionStartOffset) >= 0;
-        }
-
-        if (isEndSequenceOffsetsExclusive()) {
-          ret &= recordOffset.compareTo(partitionEndOffset) < 0;
-        } else {
-          ret &= recordOffset.compareTo(partitionEndOffset) <= 0;
-        }
-
-        return ret;
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    @Override
-    public String toString()
-    {
-      lock.lock();
-      try {
-        return "SequenceMetadata{" +
-               "sequenceName='" + sequenceName + '\'' +
-               ", sequenceId=" + sequenceId +
-               ", startOffsets=" + startOffsets +
-               ", endOffsets=" + endOffsets +
-               ", assignments=" + assignments +
-               ", sentinel=" + sentinel +
-               ", checkpointed=" + checkpointed +
-               '}';
-      }
-      finally {
-        lock.unlock();
-      }
-    }
-
-    Supplier<Committer> getCommitterSupplier(
-        String stream,
-        Map<PartitionIdType, SequenceOffsetType> lastPersistedOffsets
-    )
-    {
-      // Set up committer.
-      return () ->
-          new Committer()
-          {
-            @Override
-            public Object getMetadata()
-            {
-              lock.lock();
-
-              try {
-                Preconditions.checkState(
-                    assignments.isEmpty(),
-                    "This committer can be used only once all the records till 
sequences [%s] have been consumed, also make"
-                    + " sure to call updateAssignments before using this 
committer",
-                    endOffsets
-                );
-
-
-                // merge endOffsets for this sequence with globally 
lastPersistedOffsets
-                // This is done because this committer would be persisting 
only sub set of segments
-                // corresponding to the current sequence. Generally, 
lastPersistedOffsets should already
-                // cover endOffsets but just to be sure take max of sequences 
and persist that
-                for (Map.Entry<PartitionIdType, SequenceOffsetType> 
partitionOffset : endOffsets.entrySet()) {
-                  SequenceOffsetType newOffsets = partitionOffset.getValue();
-                  if 
(lastPersistedOffsets.containsKey(partitionOffset.getKey()) &&
-                      
createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey())).compareTo(
-                          createSequenceNumber(newOffsets)) > 0) {
-                    newOffsets = 
lastPersistedOffsets.get(partitionOffset.getKey());
-                  }
-                  lastPersistedOffsets.put(
-                      partitionOffset.getKey(),
-                      newOffsets
-                  );
-                }
-
-                // Publish metadata can be different from persist metadata as 
we are going to publish only
-                // subset of segments
-                return ImmutableMap.of(
-                    METADATA_NEXT_PARTITIONS, new 
SeekableStreamPartitions<>(stream, lastPersistedOffsets),
-                    METADATA_PUBLISH_PARTITIONS, new 
SeekableStreamPartitions<>(stream, endOffsets)
-                );
-              }
-              finally {
-                lock.unlock();
-              }
-            }
-
-            @Override
-            public void run()
-            {
-              // Do nothing.
-            }
-          };
-
-    }
-
-    TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean 
useTransaction)
-    {
-      return (segments, commitMetadata) -> {
-        final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> 
finalPartitions = deserializeSeekableStreamPartitionsFromMetadata(
-            toolbox.getObjectMapper(),
-            ((Map) Preconditions
-                .checkNotNull(commitMetadata, 
"commitMetadata")).get(METADATA_PUBLISH_PARTITIONS)
-        );
-
-        // Sanity check, we should only be publishing things that match our 
desired end state.
-        if 
(!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
-          throw new ISE(
-              "WTF?! Driver for sequence [%s], attempted to publish invalid 
metadata[%s].",
-              toString(),
-              commitMetadata
-          );
-        }
-
-        final SegmentTransactionalInsertAction action;
-
-        if (useTransaction) {
-          action = new SegmentTransactionalInsertAction(
-              segments,
-              createDataSourceMetadata(new SeekableStreamPartitions<>(
-                  finalPartitions.getStream(),
-                  getStartOffsets()
-              )),
-              createDataSourceMetadata(finalPartitions)
-          );
-        } else {
-          action = new SegmentTransactionalInsertAction(segments, null, null);
-        }
-
-        log.info("Publishing with isTransaction[%s].", useTransaction);
-
-        return toolbox.getTaskActionClient().submit(action);
-      };
-    }
-
-  }
-
   private boolean verifyInitialRecordAndSkipExclusivePartition(
       final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> 
record
   )
@@ -1933,7 +1634,12 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   }
 
   /**
-   * deserailizes the checkpoints into of Map<sequenceId, Map<PartitionIdType, 
SequenceOffsetType>>
+   * checks if the input seqNum marks end of shard. Used by Kinesis only
+   */
+  protected abstract boolean isEndOfShard(SequenceOffsetType seqNum);
+
+  /**
+   * deserializes the checkpoints into of Map<sequenceId, Map<PartitionIdType, 
SequenceOffsetType>>
    *
    * @param toolbox           task toolbox
    * @param checkpointsString the json-serialized checkpoint string
@@ -1949,7 +1655,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   ) throws IOException;
 
   /**
-   * Calculates the sequence number used to update `currentOffsets` after 
finishing reading a record.
+   * Calculates the sequence number used to update `currentOffsets` after 
finished reading a record.
    * In Kafka this returns sequenceNumeber + 1 since that's the next expected 
offset
    * In Kinesis this simply returns sequenceNumber, since the sequence numbers 
in Kinesis are not
    * contiguous and finding the next sequence number requires an expensive API 
call
@@ -1961,14 +1667,14 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   protected abstract SequenceOffsetType 
getSequenceNumberToStoreAfterRead(SequenceOffsetType sequenceNumber);
 
   /**
-   * deserialzies stored metadata into SeekableStreamPartitions
+   * deserializes stored metadata into SeekableStreamPartitions
    *
    * @param mapper json objectMapper
    * @param object metadata
    *
    * @return SeekableStreamPartitions
    */
-  protected abstract SeekableStreamPartitions<PartitionIdType, 
SequenceOffsetType> deserializeSeekableStreamPartitionsFromMetadata(
+  protected abstract SeekableStreamPartitions<PartitionIdType, 
SequenceOffsetType> deserializePartitionsFromMetadata(
       ObjectMapper mapper,
       Object object
   );
@@ -2038,4 +1744,6 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
    * partition we read from stream
    */
   protected abstract boolean isStartingSequenceOffsetsExclusive();
+
+  protected abstract TypeReference<List<SequenceMetadata<PartitionIdType, 
SequenceOffsetType>>> getSequenceMetadataTypeReference();
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
new file mode 100644
index 0000000..7fbc800
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.indexing.common.TaskToolbox;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.java.util.common.ISE;
+import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SequenceMetadata<PartitionIdType, SequenceOffsetType>
+{
+  private final int sequenceId;
+  private final String sequenceName;
+  private final Set<PartitionIdType> exclusiveStartPartitions;
+  private final Set<PartitionIdType> assignments;
+  private final boolean sentinel;
+  private boolean checkpointed;
+  /**
+   * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This 
lock is required because
+   * {@link #setEndOffsets)} can be called by both the main thread and the 
HTTP thread.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  final Map<PartitionIdType, SequenceOffsetType> startOffsets;
+  final Map<PartitionIdType, SequenceOffsetType> endOffsets;
+
+  @JsonCreator
+  public SequenceMetadata(
+      @JsonProperty("sequenceId") int sequenceId,
+      @JsonProperty("sequenceName") String sequenceName,
+      @JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType> 
startOffsets,
+      @JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType> 
endOffsets,
+      @JsonProperty("checkpointed") boolean checkpointed,
+      @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> 
exclusiveStartPartitions
+  )
+  {
+    Preconditions.checkNotNull(sequenceName);
+    Preconditions.checkNotNull(startOffsets);
+    Preconditions.checkNotNull(endOffsets);
+    this.sequenceId = sequenceId;
+    this.sequenceName = sequenceName;
+    this.startOffsets = ImmutableMap.copyOf(startOffsets);
+    this.endOffsets = new HashMap<>(endOffsets);
+    this.assignments = new HashSet<>(startOffsets.keySet());
+    this.checkpointed = checkpointed;
+    this.sentinel = false;
+    this.exclusiveStartPartitions = exclusiveStartPartitions == null
+                                    ? Collections.emptySet()
+                                    : exclusiveStartPartitions;
+  }
+
+  @JsonProperty
+  public Set<PartitionIdType> getExclusiveStartPartitions()
+  {
+    return exclusiveStartPartitions;
+  }
+
+  @JsonProperty
+  public int getSequenceId()
+  {
+    return sequenceId;
+  }
+
+  @JsonProperty
+  public boolean isCheckpointed()
+  {
+    lock.lock();
+    try {
+      return checkpointed;
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  @JsonProperty
+  public String getSequenceName()
+  {
+    return sequenceName;
+  }
+
+  @JsonProperty
+  public Map<PartitionIdType, SequenceOffsetType> getStartOffsets()
+  {
+    return startOffsets;
+  }
+
+  @JsonProperty
+  public Map<PartitionIdType, SequenceOffsetType> getEndOffsets()
+  {
+    lock.lock();
+    try {
+      return endOffsets;
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  @JsonProperty
+  public boolean isSentinel()
+  {
+    return sentinel;
+  }
+
+  void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> newEndOffsets)
+  {
+    lock.lock();
+    try {
+      endOffsets.putAll(newEndOffsets);
+      checkpointed = true;
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  void updateAssignments(
+      SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> 
runner,
+      Map<PartitionIdType, SequenceOffsetType> nextPartitionOffset
+  )
+  {
+    lock.lock();
+    try {
+      assignments.clear();
+      nextPartitionOffset.forEach((key, value) -> {
+        SequenceOffsetType endOffset = endOffsets.get(key);
+        if (SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset)
+            || 
runner.createSequenceNumber(endOffset).compareTo(runner.createSequenceNumber(nextPartitionOffset.get(key)))
 > 0) {
+          assignments.add(key);
+        }
+      });
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  boolean isOpen()
+  {
+    return !assignments.isEmpty();
+  }
+
+  boolean canHandle(
+      SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> 
runner,
+      OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record
+  )
+  {
+    lock.lock();
+    try {
+      final OrderedSequenceNumber<SequenceOffsetType> partitionEndOffset = 
runner.createSequenceNumber(endOffsets.get(record.getPartitionId()));
+      final OrderedSequenceNumber<SequenceOffsetType> partitionStartOffset = 
runner.createSequenceNumber(startOffsets.get(
+          record.getPartitionId()));
+      final OrderedSequenceNumber<SequenceOffsetType> recordOffset = 
runner.createSequenceNumber(record.getSequenceNumber());
+      if (!isOpen() || recordOffset == null || partitionEndOffset == null || 
partitionStartOffset == null) {
+        return false;
+      }
+      boolean ret;
+      if (runner.isStartingSequenceOffsetsExclusive()) {
+        ret = recordOffset.compareTo(partitionStartOffset)
+              >= 
(getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0);
+      } else {
+        ret = recordOffset.compareTo(partitionStartOffset) >= 0;
+      }
+
+      if (runner.isEndSequenceOffsetsExclusive()) {
+        ret &= recordOffset.compareTo(partitionEndOffset) < 0;
+      } else {
+        ret &= recordOffset.compareTo(partitionEndOffset) <= 0;
+      }
+
+      return ret;
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public String toString()
+  {
+    lock.lock();
+    try {
+      return "SequenceMetadata{" +
+             "sequenceName='" + sequenceName + '\'' +
+             ", sequenceId=" + sequenceId +
+             ", startOffsets=" + startOffsets +
+             ", endOffsets=" + endOffsets +
+             ", assignments=" + assignments +
+             ", sentinel=" + sentinel +
+             ", checkpointed=" + checkpointed +
+             '}';
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  Supplier<Committer> getCommitterSupplier(
+      SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> 
runner,
+      String stream,
+      Map<PartitionIdType, SequenceOffsetType> lastPersistedOffsets
+  )
+  {
+    // Set up committer.
+    return () ->
+        new Committer()
+        {
+          @Override
+          public Object getMetadata()
+          {
+            lock.lock();
+
+            try {
+              Preconditions.checkState(
+                  assignments.isEmpty(),
+                  "This committer can be used only once all the records till 
sequences [%s] have been consumed, also make"
+                  + " sure to call updateAssignments before using this 
committer",
+                  endOffsets
+              );
+
+
+              // merge endOffsets for this sequence with globally 
lastPersistedOffsets
+              // This is done because this committer would be persisting only 
sub set of segments
+              // corresponding to the current sequence. Generally, 
lastPersistedOffsets should already
+              // cover endOffsets but just to be sure take max of sequences 
and persist that
+              for (Map.Entry<PartitionIdType, SequenceOffsetType> 
partitionOffset : endOffsets.entrySet()) {
+                SequenceOffsetType newOffsets = partitionOffset.getValue();
+                if (lastPersistedOffsets.containsKey(partitionOffset.getKey())
+                    && 
runner.createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey()))
+                             
.compareTo(runner.createSequenceNumber(newOffsets)) > 0) {
+                  newOffsets = 
lastPersistedOffsets.get(partitionOffset.getKey());
+                }
+                lastPersistedOffsets.put(
+                    partitionOffset.getKey(),
+                    newOffsets
+                );
+              }
+
+              // Publish metadata can be different from persist metadata as we 
are going to publish only
+              // subset of segments
+              return ImmutableMap.of(
+                  SeekableStreamIndexTaskRunner.METADATA_NEXT_PARTITIONS,
+                  new SeekableStreamPartitions<>(stream, lastPersistedOffsets),
+                  SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS,
+                  new SeekableStreamPartitions<>(stream, endOffsets)
+              );
+            }
+            finally {
+              lock.unlock();
+            }
+          }
+
+          @Override
+          public void run()
+          {
+            // Do nothing.
+          }
+        };
+
+  }
+
+  TransactionalSegmentPublisher createPublisher(
+      SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> 
runner,
+      TaskToolbox toolbox,
+      boolean useTransaction
+  )
+  {
+    return (segments, commitMetadata) -> {
+      final Map commitMetaMap = (Map) 
Preconditions.checkNotNull(commitMetadata, "commitMetadata");
+      final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> 
finalPartitions =
+          runner.deserializePartitionsFromMetadata(
+              toolbox.getObjectMapper(),
+              
commitMetaMap.get(SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS)
+          );
+
+      // Sanity check, we should only be publishing things that match our 
desired end state.
+      if 
(!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
+        throw new ISE(
+            "WTF?! Driver for sequence [%s], attempted to publish invalid 
metadata[%s].",
+            toString(),
+            commitMetadata
+        );
+      }
+
+      final SegmentTransactionalInsertAction action;
+
+      if (useTransaction) {
+        action = new SegmentTransactionalInsertAction(
+            segments,
+            runner.createDataSourceMetadata(
+                new SeekableStreamPartitions<>(finalPartitions.getStream(), 
getStartOffsets())
+            ),
+            runner.createDataSourceMetadata(finalPartitions)
+        );
+      } else {
+        action = new SegmentTransactionalInsertAction(segments, null, null);
+      }
+
+      return toolbox.getTaskActionClient().submit(action);
+    };
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 25250ac..b93d15f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -119,8 +119,7 @@ import java.util.stream.Stream;
  * @param <PartitionIdType>    the type of the partition id, for example, 
partitions in Kafka are int type while partitions in Kinesis are String type
  * @param <SequenceOffsetType> the type of the sequence number or offsets, for 
example, Kafka uses long offsets while Kinesis uses String sequence numbers
  */
-public abstract class SeekableStreamSupervisor<PartitionIdType, 
SequenceOffsetType>
-    implements Supervisor
+public abstract class SeekableStreamSupervisor<PartitionIdType, 
SequenceOffsetType> implements Supervisor
 {
   public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = 
"IS_INCREMENTAL_HANDOFF_SUPPORTED";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to