This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new f6edd5b  Logic adjustments to SeekableStreamIndexTaskRunner. (#7267) 
(#7272)
f6edd5b is described below

commit f6edd5b048862b5bd7828131daccee6e64af7fca
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Mar 15 14:37:34 2019 -0700

    Logic adjustments to SeekableStreamIndexTaskRunner. (#7267) (#7272)
    
    * Logic adjustments to SeekableStreamIndexTaskRunner.
    
    A mix of simplifications and bug fixes. They are intermingled because
    some of the bugs were made difficult to fix, and also more likely to
    happen in the first place, by how the code was structured. I tried to
    keep restructuring to a minimum. The changes are:
    
    - Remove "initialOffsetsSnapshot", which was used to determine when to
      skip start offsets. Replace it with "lastReadOffsets", which I hope
      is more intuitive. (There is a connection: start offsets must be
      skipped if and only if they have already been read, either by a
      previous task or by a previous sequence in the same task, post-restoring.)
    - Remove "isStartingSequenceOffsetsExclusive", because it should always
      be the opposite of isEndOffsetExclusive. The reason is that starts are
      exclusive exactly when the prior ends are inclusive: they must match
      up in that way for adjacent reads to link up properly.
    - Don't call "seekToStartingSequence" after the initial seek. There is
      no reason to, since we expect to read continuous message streams
      throughout the task. And calling it makes offset-tracking logic
      trickier, so better to avoid the need for trickiness. I believe the
      call being here was causing a bug in Kinesis ingestion where a
      message might get double-read.
    - Remove the "continue" calls in the main read loop. They are bad
      because they prevent keeping currOffsets and lastReadOffsets up to
      date, and prevent us from detecting that we have finished reading.
    - Rework "verifyInitialRecordAndSkipExclusivePartition" into
      "verifyRecordInRange". It no longer has side effects. It does a sanity
      check on the message offset and also makes sure that it is not past
      the endOffsets.
    - Rework "assignPartitions" to replace inline comparisons with
      "isRecordAlreadyRead" and "isMoreToReadBeforeReadingRecord" calls. I
      believe this fixes an off-by-one error with Kinesis where the last
      record would not get read. It also makes the logic easier to read.
    - When doing the final publish, only adjust end offsets of the final
      sequence, rather than potentially adjusting any unpublished sequence.
      Adjusting sequences other than the last one is a mistake since it
      will extend their endOffsets beyond what they actually read. (I'm not
      sure if this was an issue in practice, since I'm not sure if real
      world situations would have more than one unpublished sequence.)
    - Rename "isEndSequenceOffsetsExclusive" to "isEndOffsetExclusive". It's
      shorter and more clear, I think.
    - Add equals/hashCode/toString methods to OrderedSequenceNumber.
    
    Kafka test changes:
    
    - Added a Kafka "testRestoreAtEndOffset" test to verify that restores at
      the very end of the task lifecycle still work properly.
    
    Kinesis test changes:
    
    - Renamed "testRunOnNothing" to "testRunOnSingletonRange". I think that
      given Kinesis semantics, the right behavior when start offset equals
      end offset (and there aren't exclusive partitions set) is to read that
      single offset. This is because they are both meant to be treated as
      inclusive.
    - Adjusted "testRestoreAfterPersistingSequences" to expect one more
      message read. I believe the old test was wrong; it expected the task
      not to read message number 5.
    - Adjusted "testRunContextSequenceAheadOfStartingOffsets" to use a
      checkpoint starting from 1 rather than 2. I believe the old test was
      wrong here too; it was expecting the task to start reading from the
      checkpointed offset, but it actually should have started reading from
      one past the checkpointed offset.
    - Adjusted "testIncrementalHandOffReadsThroughEndOffsets" to expect
      11 messages read instead of 12. It's starting at message 0 and reading
      up to 10, which should be 11 messages.
    
    * Changes from code review.
---
 .../IncrementalPublishingKafkaIndexTaskRunner.java |  10 +-
 .../indexing/kafka/LegacyKafkaIndexTaskRunner.java |  13 +-
 .../indexing/kinesis/KinesisIndexTaskRunner.java   |  10 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |  80 +++--
 .../SeekableStreamIndexTaskRunner.java             | 327 +++++++++++++--------
 .../seekablestream/SeekableStreamPartitions.java   |   6 +-
 .../indexing/seekablestream/SequenceMetadata.java  |  15 +-
 .../common/OrderedSequenceNumber.java              |  30 ++
 8 files changed, 279 insertions(+), 212 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index ae092d5..9e38a97 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -84,7 +84,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner 
extends SeekableStreamInd
   }
 
   @Override
-  protected Long getSequenceNumberToStoreAfterRead(@NotNull Long 
sequenceNumber)
+  protected Long getNextStartOffset(@NotNull Long sequenceNumber)
   {
     return sequenceNumber + 1;
   }
@@ -209,18 +209,12 @@ public class IncrementalPublishingKafkaIndexTaskRunner 
extends SeekableStreamInd
   }
 
   @Override
-  protected boolean isEndSequenceOffsetsExclusive()
+  protected boolean isEndOffsetExclusive()
   {
     return true;
   }
 
   @Override
-  protected boolean isStartingSequenceOffsetsExclusive()
-  {
-    return false;
-  }
-
-  @Override
   protected boolean isEndOfShard(Long seqNum)
   {
     return false;
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index 61ebf49..528780d 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -723,19 +723,12 @@ public class LegacyKafkaIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<In
   }
 
   @Override
-  protected boolean isEndSequenceOffsetsExclusive()
+  protected boolean isEndOffsetExclusive()
   {
-    return false;
+    return true;
   }
 
   @Override
-  protected boolean isStartingSequenceOffsetsExclusive()
-  {
-    return false;
-  }
-
-
-  @Override
   protected SeekableStreamPartitions<Integer, Long> 
deserializePartitionsFromMetadata(
       ObjectMapper mapper,
       Object object
@@ -822,7 +815,7 @@ public class LegacyKafkaIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<In
   }
 
   @Override
-  protected Long getSequenceNumberToStoreAfterRead(Long sequenceNumber)
+  protected Long getNextStartOffset(Long sequenceNumber)
   {
     throw new UnsupportedOperationException();
   }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index 50326f7..247f6d7 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -78,7 +78,7 @@ public class KinesisIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<String
 
 
   @Override
-  protected String getSequenceNumberToStoreAfterRead(String sequenceNumber)
+  protected String getNextStartOffset(String sequenceNumber)
   {
     return sequenceNumber;
   }
@@ -160,18 +160,12 @@ public class KinesisIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<String
   }
 
   @Override
-  protected boolean isEndSequenceOffsetsExclusive()
+  protected boolean isEndOffsetExclusive()
   {
     return false;
   }
 
   @Override
-  protected boolean isStartingSequenceOffsetsExclusive()
-  {
-    return true;
-  }
-
-  @Override
   protected boolean isEndOfShard(String seqNum)
   {
     return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum);
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index c4c3adb..1823d93 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -83,6 +83,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
 import org.apache.druid.indexing.test.TestDataSegmentKiller;
@@ -1082,7 +1083,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
 
 
   @Test(timeout = 120_000L)
-  public void testRunOnNothing() throws Exception
+  public void testRunOnSingletonRange() throws Exception
   {
     recordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
@@ -1092,11 +1093,15 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
     recordSupplier.seek(anyObject(), anyString());
     expectLastCall().anyTimes();
 
+    expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 
3)).once();
+
     recordSupplier.close();
     expectLastCall().once();
 
     replayAll();
 
+    // When start and end offsets are the same, it means we need to read one 
message (since in Kinesis, end offsets
+    // are inclusive).
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
@@ -1131,12 +1136,12 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
     verifyAll();
 
     // Check metrics
-    Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(1, 
task.getRunner().getRowIngestionMeters().getProcessed());
     Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getUnparseable());
     Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getThrownAway());
 
     // Check published metadata
-    Assert.assertEquals(ImmutableSet.of(), publishedDescriptors());
+    Assert.assertEquals(ImmutableSet.of(SD(task, "2010/P1D", 0)), 
publishedDescriptors());
   }
 
 
@@ -2102,14 +2107,11 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
   @Test(timeout = 120_000L)
   public void testRestore() throws Exception
   {
-    recordSupplier.assign(anyObject());
-    expectLastCall().anyTimes();
-
-    
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
-
-    recordSupplier.seek(anyObject(), anyString());
-    expectLastCall().anyTimes();
-
+    final StreamPartition<String> streamPartition = StreamPartition.of(stream, 
shardId1);
+    recordSupplier.assign(ImmutableSet.of(streamPartition));
+    expectLastCall();
+    recordSupplier.seek(streamPartition, "2");
+    expectLastCall();
     expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 4))
                                           .once()
                                           .andReturn(Collections.emptyList())
@@ -2160,16 +2162,13 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
     verifyAll();
     reset(recordSupplier);
 
-    recordSupplier.assign(anyObject());
-    expectLastCall().anyTimes();
-
-    
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
-
-    recordSupplier.seek(anyObject(), anyString());
-    expectLastCall().anyTimes();
-
+    recordSupplier.assign(ImmutableSet.of(streamPartition));
+    expectLastCall();
+    recordSupplier.seek(streamPartition, "3");
+    expectLastCall();
     expect(recordSupplier.poll(anyLong())).andReturn(records.subList(3, 
6)).once();
-
+    recordSupplier.assign(ImmutableSet.of());
+    expectLastCall();
     recordSupplier.close();
     expectLastCall();
 
@@ -2251,8 +2250,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     recordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
 
-    
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
-
     recordSupplier.seek(anyObject(), anyString());
     expectLastCall().anyTimes();
 
@@ -2324,9 +2321,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     recordSupplier.assign(anyObject());
     expectLastCall().anyTimes();
 
-    
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
-
-
     recordSupplier.seek(anyObject(), anyString());
     expectLastCall().anyTimes();
 
@@ -2380,7 +2374,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     Assert.assertEquals(5, 
task1.getRunner().getRowIngestionMeters().getProcessed());
     Assert.assertEquals(0, 
task1.getRunner().getRowIngestionMeters().getUnparseable());
     Assert.assertEquals(0, 
task1.getRunner().getRowIngestionMeters().getThrownAway());
-    Assert.assertEquals(1, 
task2.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(2, 
task2.getRunner().getRowIngestionMeters().getProcessed());
     Assert.assertEquals(0, 
task2.getRunner().getRowIngestionMeters().getUnparseable());
     Assert.assertEquals(0, 
task2.getRunner().getRowIngestionMeters().getThrownAway());
 
@@ -2389,8 +2383,9 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     SegmentDescriptor desc2 = SD(task1, "2009/P1D", 0);
     SegmentDescriptor desc3 = SD(task1, "2010/P1D", 0);
     SegmentDescriptor desc4 = SD(task1, "2011/P1D", 0);
-    SegmentDescriptor desc5 = SD(task1, "2013/P1D", 0);
-    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5), 
publishedDescriptors());
+    SegmentDescriptor desc5 = SD(task1, "2012/P1D", 0);
+    SegmentDescriptor desc6 = SD(task1, "2013/P1D", 0);
+    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, 
desc6), publishedDescriptors());
     Assert.assertEquals(
         new KinesisDataSourceMetadata(
             new SeekableStreamPartitions<>(stream, ImmutableMap.of(
@@ -2404,14 +2399,11 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
   @Test(timeout = 120_000L)
   public void testRunWithPauseAndResume() throws Exception
   {
-    recordSupplier.assign(anyObject());
-    expectLastCall().anyTimes();
-
-    
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
-
-    recordSupplier.seek(anyObject(), anyString());
-    expectLastCall().anyTimes();
-
+    final StreamPartition<String> streamPartition = StreamPartition.of(stream, 
shardId1);
+    recordSupplier.assign(ImmutableSet.of(streamPartition));
+    expectLastCall();
+    recordSupplier.seek(streamPartition, "2");
+    expectLastCall();
     expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 5))
                                           .once()
                                           .andReturn(Collections.emptyList())
@@ -2478,14 +2470,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
 
     reset(recordSupplier);
 
-    recordSupplier.assign(anyObject());
-    expectLastCall().anyTimes();
-
-    
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
-
-    recordSupplier.seek(anyObject(), anyString());
-    expectLastCall().anyTimes();
-
+    recordSupplier.assign(ImmutableSet.of());
+    expectLastCall();
     recordSupplier.close();
     expectLastCall().once();
 
@@ -2549,8 +2535,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
 
     final TreeMap<Integer, Map<String, String>> sequences = new TreeMap<>();
     // Here the sequence number is 1 meaning that one incremental handoff was 
done by the failed task
-    // and this task should start reading from stream 2 for partition 0
-    sequences.put(1, ImmutableMap.of(shardId1, "2"));
+    // and this task should start reading from offset 2 for partition 0 (not 
offset 1, because end is inclusive)
+    sequences.put(1, ImmutableMap.of(shardId1, "1"));
     final Map<String, Object> context = new HashMap<>();
     context.put("checkpoints", objectMapper.writerWithType(new 
TypeReference<TreeMap<Integer, Map<String, String>>>()
     {
@@ -2787,7 +2773,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
               throw new ISE("Task is not ready");
             }
           }
-          catch (Exception e) {
+          catch (Throwable e) {
             log.warn(e, "Task failed");
             return TaskStatus.failure(task.getId(), 
Throwables.getStackTraceAsString(e));
           }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 00c708c..ee9a7e2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -142,6 +142,12 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
 
   private final Map<PartitionIdType, SequenceOffsetType> endOffsets;
+
+  // lastReadOffsets are the last offsets that were read and processed.
+  private final Map<PartitionIdType, SequenceOffsetType> lastReadOffsets = new 
HashMap<>();
+
+  // currOffsets are what should become the start offsets of the next reader, 
if we stopped reading now. They are
+  // initialized to the start offsets when the task begins.
   private final ConcurrentMap<PartitionIdType, SequenceOffsetType> currOffsets 
= new ConcurrentHashMap<>();
   private final ConcurrentMap<PartitionIdType, SequenceOffsetType> 
lastPersistedOffsets = new ConcurrentHashMap<>();
 
@@ -192,8 +198,6 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
   private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = 
new ArrayList<>();
   private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = 
new ArrayList<>();
-  private final Set<PartitionIdType> initialOffsetsSnapshot = new HashSet<>();
-  private final Set<PartitionIdType> exclusiveStartingPartitions = new 
HashSet<>();
 
   private volatile DateTime startTime;
   private volatile Status status = Status.NOT_STARTED; // this is only ever 
set by the task runner thread (runThread)
@@ -272,7 +276,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> previous 
= sequenceOffsets.next();
         while (sequenceOffsets.hasNext()) {
           Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> current 
= sequenceOffsets.next();
-          sequences.add(new SequenceMetadata<>(
+          addSequence(new SequenceMetadata<>(
               previous.getKey(),
               StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 
previous.getKey()),
               previous.getValue(),
@@ -283,7 +287,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           previous = current;
           exclusive = true;
         }
-        sequences.add(new SequenceMetadata<>(
+        addSequence(new SequenceMetadata<>(
             previous.getKey(),
             StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 
previous.getKey()),
             previous.getValue(),
@@ -292,7 +296,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
             exclusive ? previous.getValue().keySet() : null
         ));
       } else {
-        sequences.add(new SequenceMetadata<>(
+        addSequence(new SequenceMetadata<>(
             0,
             StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
             ioConfig.getStartPartitions().getPartitionSequenceNumberMap(),
@@ -408,6 +412,21 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         );
       }
 
+      // Initialize lastReadOffsets immediately after restoring currOffsets. 
This is only done when end offsets are
+      // inclusive, because the point of initializing lastReadOffsets here is 
so we know when to skip the start record.
+      // When end offsets are exclusive, we never skip the start record.
+      if (!isEndOffsetExclusive()) {
+        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
currOffsets.entrySet()) {
+          final boolean isAtStart = entry.getValue().equals(
+              
ioConfig.getStartPartitions().getPartitionSequenceNumberMap().get(entry.getKey())
+          );
+
+          if (!isAtStart || 
ioConfig.getExclusiveStartSequenceNumberPartitions().contains(entry.getKey())) {
+            lastReadOffsets.put(entry.getKey(), entry.getValue());
+          }
+        }
+      }
+
       // Set up committer.
       final Supplier<Committer> committerSupplier = () -> {
         final Map<PartitionIdType, SequenceOffsetType> snapshot = 
ImmutableMap.copyOf(currOffsets);
@@ -450,17 +469,14 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       status = Status.READING;
       Throwable caughtExceptionInner = null;
 
-      initialOffsetsSnapshot.addAll(currOffsets.keySet());
-      
exclusiveStartingPartitions.addAll(ioConfig.getExclusiveStartSequenceNumberPartitions());
-
       try {
         while (stillReading) {
           if (possiblyPause()) {
             // The partition assignments may have changed while paused by a 
call to setEndOffsets() so reassign
-            // partitions upon resuming. This is safe even if the end 
sequences have not been modified.
+            // partitions upon resuming. Don't call "seekToStartingSequence" 
after "assignPartitions", because there's
+            // no need to re-seek here. All we're going to be doing is 
dropping partitions.
             assignment = assignPartitions(recordSupplier);
             possiblyResetDataSourceMetadata(toolbox, recordSupplier, 
assignment, currOffsets);
-            seekToStartingSequence(recordSupplier, assignment);
 
             if (assignment.isEmpty()) {
               log.info("All partitions have been fully read");
@@ -470,7 +486,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           }
 
           // if stop is requested or task's end sequence is set by call to 
setEndOffsets method with finish set to true
-          if (stopRequested.get() || sequences.get(sequences.size() - 
1).isCheckpointed()) {
+          if (stopRequested.get() || sequences.size() == 0 || 
sequences.get(sequences.size() - 1).isCheckpointed()) {
             status = Status.PUBLISHING;
           }
 
@@ -498,44 +514,17 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
           SequenceMetadata sequenceToCheckpoint = null;
           for (OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> 
record : records) {
-
-            // for Kafka, the end offsets are exclusive, so skip it
-            if (isEndSequenceOffsetsExclusive() &&
-                createSequenceNumber(record.getSequenceNumber()).compareTo(
-                    
createSequenceNumber(endOffsets.get(record.getPartitionId()))) == 0) {
-              continue;
-            }
-
-            // for the first message we receive, check that we were given a 
message with a sequenceNumber that matches
-            // our expected starting sequenceNumber
-            if (!verifyInitialRecordAndSkipExclusivePartition(record)) {
-              continue;
-            }
+            final boolean shouldProcess = 
verifyRecordInRange(record.getPartitionId(), record.getSequenceNumber());
 
             log.trace(
-                "Got stream[%s] partition[%s] sequence[%s].",
+                "Got stream[%s] partition[%s] sequenceNumber[%s], 
shouldProcess[%s].",
                 record.getStream(),
                 record.getPartitionId(),
-                record.getSequenceNumber()
+                record.getSequenceNumber(),
+                shouldProcess
             );
 
-            if (isEndOfShard(record.getSequenceNumber())) {
-              // shard is closed, applies to Kinesis only
-              currOffsets.put(record.getPartitionId(), 
record.getSequenceNumber());
-            } else if 
(createSequenceNumber(record.getSequenceNumber()).compareTo(
-                createSequenceNumber(endOffsets.get(record.getPartitionId()))) 
<= 0) {
-
-
-              if 
(!record.getSequenceNumber().equals(currOffsets.get(record.getPartitionId()))
-                  && !ioConfig.isSkipOffsetGaps()) {
-                throw new ISE(
-                    "WTF?! Got sequence[%s] after sequence[%s] in 
partition[%s].",
-                    record.getSequenceNumber(),
-                    currOffsets.get(record.getPartitionId()),
-                    record.getPartitionId()
-                );
-              }
-
+            if (shouldProcess) {
               try {
                 final List<byte[]> valueBytess = record.getData();
                 final List<InputRow> rows;
@@ -557,7 +546,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
                 if (sequenceToUse == null) {
                   throw new ISE(
-                      "WTH?! cannot find any valid sequence for record with 
partition [%d] and sequence [%d]. Current sequences: %s",
+                      "WTH?! cannot find any valid sequence for record with 
partition [%s] and sequenceNumber [%s]. Current sequences: %s",
                       record.getPartitionId(),
                       record.getSequenceNumber(),
                       sequences
@@ -627,12 +616,18 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
               // in kafka, we can easily get the next offset by adding 1, but 
for kinesis, there's no way
               // to get the next sequence number without having to make an 
expensive api call. So the behavior
               // here for kafka is to +1 while for kinesis we simply save the 
current sequence number
-              currOffsets.put(record.getPartitionId(), 
getSequenceNumberToStoreAfterRead(record.getSequenceNumber()));
+              lastReadOffsets.put(record.getPartitionId(), 
record.getSequenceNumber());
+              currOffsets.put(record.getPartitionId(), 
getNextStartOffset(record.getSequenceNumber()));
             }
 
-            if 
((currOffsets.get(record.getPartitionId()).equals(endOffsets.get(record.getPartitionId()))
-                 || isEndOfShard(currOffsets.get(record.getPartitionId())))
-                && assignment.remove(record.getStreamPartition())) {
+            // Use record.getSequenceNumber() in the moreToRead check, since 
currOffsets might not have been
+            // updated if we were skipping records for being beyond the end.
+            final boolean moreToReadAfterThisRecord = 
isMoreToReadAfterReadingRecord(
+                record.getSequenceNumber(),
+                endOffsets.get(record.getPartitionId())
+            );
+
+            if (!moreToReadAfterThisRecord && 
assignment.remove(record.getStreamPartition())) {
               log.info("Finished reading stream[%s], partition[%s].", 
record.getStream(), record.getPartitionId());
               recordSupplier.assign(assignment);
               stillReading = !assignment.isEmpty();
@@ -698,11 +693,18 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         status = Status.PUBLISHING;
       }
 
-      for (SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata : sequences) {
+      for (int i = 0; i < sequences.size(); i++) {
+        final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata = sequences.get(i);
         if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) 
{
-          // this is done to prevent checks in sequence specific commit 
supplier from failing
-          sequenceMetadata.setEndOffsets(currOffsets);
-          sequenceMetadata.updateAssignments(this, currOffsets);
+          final boolean isLast = i == (sequences.size() - 1);
+          if (isLast) {
+            // Shorten endOffsets of the last sequence to match currOffsets.
+            sequenceMetadata.setEndOffsets(currOffsets);
+          }
+
+          // Update assignments of the sequence, which should clear them. 
(This will be checked later, when the
+          // Committer is built.)
+          sequenceMetadata.updateAssignments(currOffsets, 
this::isMoreToReadAfterReadingRecord);
           publishingSequences.add(sequenceMetadata.getSequenceName());
           // persist already done in finally, so directly add to publishQueue
           publishAndRegisterHandoff(sequenceMetadata);
@@ -805,7 +807,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
         toolbox.getDataSegmentServerAnnouncer().unannounce();
       }
-      catch (Exception e) {
+      catch (Throwable e) {
         if (caughtExceptionOuter != null) {
           caughtExceptionOuter.addSuppressed(e);
         } else {
@@ -922,7 +924,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           @Override
           public void onFailure(Throwable t)
           {
-            log.error(t, "Error while publishing segments for sequence[%s]", 
sequenceMetadata);
+            log.error(t, "Error while publishing segments for 
sequenceNumber[%s]", sequenceMetadata);
             handoffFuture.setException(t);
           }
         }
@@ -1000,7 +1002,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       throws InterruptedException
   {
     for (SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata : sequences) {
-      sequenceMetadata.updateAssignments(this, currOffsets);
+      sequenceMetadata.updateAssignments(currOffsets, 
this::isMoreToReadBeforeReadingRecord);
       if (!sequenceMetadata.isOpen() && 
!publishingSequences.contains(sequenceMetadata.getSequenceName())) {
         publishingSequences.add(sequenceMetadata.getSequenceName());
         try {
@@ -1026,19 +1028,21 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   {
     final Set<StreamPartition<PartitionIdType>> assignment = new HashSet<>();
     for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
currOffsets.entrySet()) {
-      final SequenceOffsetType endOffset = endOffsets.get(entry.getKey());
-      if (isEndOfShard(endOffset)
-          || SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset)
-          || 
createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(endOffset))
 < 0) {
-        assignment.add(StreamPartition.of(stream, entry.getKey()));
-      } else if (entry.getValue().equals(endOffset)) {
-        log.info("Finished reading partition[%s].", entry.getKey());
-      } else {
-        throw new ISE(
-            "WTF?! Cannot start from sequence[%,d] > endOffset[%,d]",
-            entry.getValue(),
+      final PartitionIdType partition = entry.getKey();
+      final SequenceOffsetType currOffset = entry.getValue();
+      final SequenceOffsetType endOffset = endOffsets.get(partition);
+
+      if (!isRecordAlreadyRead(partition, endOffset) && 
isMoreToReadBeforeReadingRecord(currOffset, endOffset)) {
+        log.info(
+            "Adding partition[%s], start[%s] -> end[%s] to assignment.",
+            partition,
+            currOffset,
             endOffset
         );
+
+        assignment.add(StreamPartition.of(stream, partition));
+      } else {
+        log.info("Finished reading partition[%s].", partition);
       }
     }
 
@@ -1047,6 +1051,77 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return assignment;
   }
 
+  private void addSequence(final SequenceMetadata<PartitionIdType, 
SequenceOffsetType> sequenceMetadata)
+  {
+    // Sanity check that the start of the new sequence matches up with the end 
of the prior sequence.
+    for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
sequenceMetadata.getStartOffsets().entrySet()) {
+      final PartitionIdType partition = entry.getKey();
+      final SequenceOffsetType startOffset = entry.getValue();
+
+      if (!sequences.isEmpty()) {
+        final SequenceOffsetType priorOffset = sequences.get(sequences.size() 
- 1).endOffsets.get(partition);
+
+        if (!startOffset.equals(priorOffset)) {
+          throw new ISE(
+              "New sequence startOffset[%s] does not equal expected prior 
offset[%s]",
+              startOffset,
+              priorOffset
+          );
+        }
+      }
+    }
+
+    // Actually do the add.
+    sequences.add(sequenceMetadata);
+  }
+
+  /**
+   * Returns true if the given record has already been read, based on 
lastReadOffsets.
+   */
+  private boolean isRecordAlreadyRead(
+      final PartitionIdType recordPartition,
+      final SequenceOffsetType recordSequenceNumber
+  )
+  {
+    final SequenceOffsetType lastReadOffset = 
lastReadOffsets.get(recordPartition);
+
+    if (lastReadOffset == null) {
+      return false;
+    } else {
+      return 
createSequenceNumber(recordSequenceNumber).compareTo(createSequenceNumber(lastReadOffset))
 <= 0;
+    }
+  }
+
+  /**
+   * Returns true if, given that we want to start reading from 
recordSequenceNumber and end at endSequenceNumber, there
+   * is more left to read. Used in pre-read checks to determine if there is 
anything left to read.
+   */
+  private boolean isMoreToReadBeforeReadingRecord(
+      final SequenceOffsetType recordSequenceNumber,
+      final SequenceOffsetType endSequenceNumber
+  )
+  {
+    final int compareToEnd = createSequenceNumber(recordSequenceNumber)
+        .compareTo(createSequenceNumber(endSequenceNumber));
+
+    return isEndOffsetExclusive() ? compareToEnd < 0 : compareToEnd <= 0;
+  }
+
+  /**
+   * Returns true if, given that recordSequenceNumber has already been read 
and we want to end at endSequenceNumber,
+   * there is more left to read. Used in post-read checks to determine if 
there is anything left to read.
+   */
+  private boolean isMoreToReadAfterReadingRecord(
+      final SequenceOffsetType recordSequenceNumber,
+      final SequenceOffsetType endSequenceNumber
+  )
+  {
+    final int compareNextToEnd = 
createSequenceNumber(getNextStartOffset(recordSequenceNumber))
+        .compareTo(createSequenceNumber(endSequenceNumber));
+
+    // Unlike isMoreToReadBeforeReadingRecord, we don't care if the end is 
exclusive or not. If we read it, we're done.
+    return compareNextToEnd < 0;
+  }
 
   private void seekToStartingSequence(
       RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier,
@@ -1055,7 +1130,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   {
     for (final StreamPartition<PartitionIdType> partition : partitions) {
       final SequenceOffsetType sequence = 
currOffsets.get(partition.getPartitionId());
-      log.info("Seeking partition[%s] to sequence[%s].", 
partition.getPartitionId(), sequence);
+      log.info("Seeking partition[%s] to sequenceNumber[%s].", 
partition.getPartitionId(), sequence);
       recordSupplier.seek(partition, sequence);
     }
   }
@@ -1114,7 +1189,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     if (tuningConfig.isLogParseExceptions()) {
       log.error(
           pe,
-          "Encountered parse exception on row from partition[%s] sequence[%s]",
+          "Encountered parse exception on row from partition[%s] 
sequenceNumber[%s]",
           record.getPartitionId(),
           record.getSequenceNumber()
       );
@@ -1240,7 +1315,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       }
     }
     catch (Exception e) {
-      Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1376,17 +1451,20 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No 
Sequences found to set end sequences");
 
         final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
latestSequence = sequences.get(sequences.size() - 1);
-        // if a partition has not been read yet (contained in 
initialOffsetsSnapshot), then
-        // do not mark the starting sequence number as exclusive
-        Set<PartitionIdType> exclusivePartitions = sequenceNumbers.keySet()
-                                                                  .stream()
-                                                                  .filter(x -> 
!initialOffsetsSnapshot.contains(x)
-                                                                               
|| ioConfig.getExclusiveStartSequenceNumberPartitions()
-                                                                               
           .contains(x))
-                                                                  
.collect(Collectors.toSet());
+        final Set<PartitionIdType> exclusiveStartPartitions;
+
+        if (isEndOffsetExclusive()) {
+          // When end offsets are exclusive, there's no need for marking the 
next sequence as having any
+          // exclusive-start partitions. It should always start from the end 
offsets of the prior sequence.
+          exclusiveStartPartitions = Collections.emptySet();
+        } else {
+          // When end offsets are inclusive, we must mark all partitions as 
exclusive-start, to avoid reading
+          // their final messages (which have already been read).
+          exclusiveStartPartitions = sequenceNumbers.keySet();
+        }
 
         if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
-             && 
latestSequence.getExclusiveStartPartitions().equals(exclusivePartitions)
+             && 
latestSequence.getExclusiveStartPartitions().equals(exclusiveStartPartitions)
              && !finish)
             || (latestSequence.getEndOffsets().equals(sequenceNumbers) && 
finish)) {
           log.warn("Ignoring duplicate request, end sequences already set for 
sequences [%s]", sequenceNumbers);
@@ -1426,19 +1504,17 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           log.info("Updating endOffsets from [%s] to [%s]", endOffsets, 
sequenceNumbers);
           endOffsets.putAll(sequenceNumbers);
         } else {
-          exclusiveStartingPartitions.addAll(exclusivePartitions);
-
           // create new sequence
+          log.info("Creating new sequence with startOffsets [%s] and 
endOffsets [%s]", sequenceNumbers, endOffsets);
           final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
newSequence = new SequenceMetadata<>(
               latestSequence.getSequenceId() + 1,
               StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), 
latestSequence.getSequenceId() + 1),
               sequenceNumbers,
               endOffsets,
               false,
-              exclusivePartitions
+              exclusiveStartPartitions
           );
-          sequences.add(newSequence);
-          initialOffsetsSnapshot.addAll(sequenceNumbers.keySet());
+          addSequence(newSequence);
         }
         persistSequences();
       }
@@ -1547,7 +1623,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       return 
Response.ok().entity(toolbox.getObjectMapper().writeValueAsString(getCurrentOffsets())).build();
     }
     catch (JsonProcessingException e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1592,45 +1668,47 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return startTime;
   }
 
-  private boolean verifyInitialRecordAndSkipExclusivePartition(
-      final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> 
record
+  /**
+   * This method does two things:
+   *
+   * 1) Verifies that the sequence numbers we read are at least as high as 
those read previously, and throws an
+   * exception if not.
+   * 2) Returns false if we should skip this record because it's either (a) 
the first record in a partition that we are
+   * needing to be exclusive on; (b) too late to read, past the endOffsets.
+   */
+  private boolean verifyRecordInRange(
+      final PartitionIdType partition,
+      final SequenceOffsetType recordOffset
   )
   {
-    // Check only for the first record among the record batch.
-    if (initialOffsetsSnapshot.contains(record.getPartitionId())) {
-      final SequenceOffsetType currOffset = Preconditions.checkNotNull(
-          currOffsets.get(record.getPartitionId()),
-          "Current offset is null for sequenceNumber[%s] and partitionId[%s]",
-          record.getSequenceNumber(),
-          record.getPartitionId()
-      );
-      final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber = 
createSequenceNumber(
-          record.getSequenceNumber()
-      );
-      final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber = 
createSequenceNumber(
-          currOffset
-      );
-      if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) {
-        throw new ISE(
-            "sequenceNumber of the start record[%s] is smaller than current 
sequenceNumber[%s] for partition[%s]",
-            record.getSequenceNumber(),
-            currOffset,
-            record.getPartitionId()
-        );
-      }
+    // Verify that the record is at least as high as its currOffset.
+    final SequenceOffsetType currOffset = Preconditions.checkNotNull(
+        currOffsets.get(partition),
+        "Current offset is null for sequenceNumber[%s] and partition[%s]",
+        recordOffset,
+        partition
+    );
 
-      // Remove the mark to notify that this partition has been read.
-      initialOffsetsSnapshot.remove(record.getPartitionId());
+    final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber = 
createSequenceNumber(recordOffset);
+    final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber = 
createSequenceNumber(currOffset);
 
-      // check exclusive starting sequence
-      if (isStartingSequenceOffsetsExclusive() && 
exclusiveStartingPartitions.contains(record.getPartitionId())) {
-        log.info("Skipping starting sequenceNumber for partition[%s] marked 
exclusive", record.getPartitionId());
+    final int comparisonToCurrent = 
recordSequenceNumber.compareTo(currentSequenceNumber);
+    if (comparisonToCurrent < 0) {
+      throw new ISE(
+          "Record sequenceNumber[%s] is smaller than current 
sequenceNumber[%s] for partition[%s]",
+          recordOffset,
+          currOffset,
+          partition
+      );
+    }
 
-        return false;
-      }
+    // Check if the record has already been read.
+    if (isRecordAlreadyRead(partition, recordOffset)) {
+      return false;
     }
 
-    return true;
+    // Finally, check if this record comes before the endOffsets for this 
partition.
+    return isMoreToReadBeforeReadingRecord(recordSequenceNumber.get(), 
endOffsets.get(partition));
   }
 
   /**
@@ -1655,16 +1733,14 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   ) throws IOException;
 
   /**
-   * Calculates the sequence number used to update `currentOffsets` after 
finished reading a record.
-   * In Kafka this returns sequenceNumeber + 1 since that's the next expected 
offset
-   * In Kinesis this simply returns sequenceNumber, since the sequence numbers 
in Kinesis are not
-   * contiguous and finding the next sequence number requires an expensive API 
call
+   * Calculates the sequence number used to update currOffsets after finished 
reading a record.
+   * This is what would become the start offsets of the next reader, if we 
stopped reading now.
    *
    * @param sequenceNumber the sequence number that has already been processed
    *
    * @return next sequence number to be stored
    */
-  protected abstract SequenceOffsetType 
getSequenceNumberToStoreAfterRead(SequenceOffsetType sequenceNumber);
+  protected abstract SequenceOffsetType getNextStartOffset(SequenceOffsetType 
sequenceNumber);
 
   /**
    * deserializes stored metadata into SeekableStreamPartitions
@@ -1736,14 +1812,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
    * In Kafka, the endOffsets are exclusive, so skip it.
    * In Kinesis the endOffsets are inclusive
    */
-  protected abstract boolean isEndSequenceOffsetsExclusive();
-
-  /**
-   * In Kafka, the startingOffsets are inclusive.
-   * In Kinesis, the startingOffsets are exclusive, except for the first
-   * partition we read from stream
-   */
-  protected abstract boolean isStartingSequenceOffsetsExclusive();
+  protected abstract boolean isEndOffsetExclusive();
 
   protected abstract TypeReference<List<SequenceMetadata<PartitionIdType, 
SequenceOffsetType>>> getSequenceMetadataTypeReference();
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java
index 28f5dde..dc3ff87 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java
@@ -143,9 +143,9 @@ public class SeekableStreamPartitions<PartitionIdType, 
SequenceOffsetType>
   @Override
   public String toString()
   {
-    return "SeekableStreamPartitions{" +
-           "stream/topic='" + stream + '\'' +
-           ", partitionSequenceNumberMap/partitionOffsetMap=" + 
partitionIdToSequenceNumberMap +
+    return getClass().getSimpleName() + "{" +
+           "stream='" + stream + '\'' +
+           ", partitionSequenceNumberMap=" + partitionIdToSequenceNumberMap +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index 7fbc800..61bb35a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
 
 public class SequenceMetadata<PartitionIdType, SequenceOffsetType>
 {
@@ -148,17 +149,16 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
   }
 
   void updateAssignments(
-      SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> 
runner,
-      Map<PartitionIdType, SequenceOffsetType> nextPartitionOffset
+      Map<PartitionIdType, SequenceOffsetType> currOffsets,
+      BiFunction<SequenceOffsetType, SequenceOffsetType, Boolean> moreToReadFn
   )
   {
     lock.lock();
     try {
       assignments.clear();
-      nextPartitionOffset.forEach((key, value) -> {
+      currOffsets.forEach((key, value) -> {
         SequenceOffsetType endOffset = endOffsets.get(key);
-        if (SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset)
-            || 
runner.createSequenceNumber(endOffset).compareTo(runner.createSequenceNumber(nextPartitionOffset.get(key)))
 > 0) {
+        if (moreToReadFn.apply(value, endOffset)) {
           assignments.add(key);
         }
       });
@@ -188,14 +188,15 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
         return false;
       }
       boolean ret;
-      if (runner.isStartingSequenceOffsetsExclusive()) {
+      if (!runner.isEndOffsetExclusive()) {
+        // Inclusive endOffsets mean that we must skip the first record of any 
partition that has been read before.
         ret = recordOffset.compareTo(partitionStartOffset)
               >= 
(getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0);
       } else {
         ret = recordOffset.compareTo(partitionStartOffset) >= 0;
       }
 
-      if (runner.isEndSequenceOffsetsExclusive()) {
+      if (runner.isEndOffsetExclusive()) {
         ret &= recordOffset.compareTo(partitionEndOffset) < 0;
       } else {
         ret &= recordOffset.compareTo(partitionEndOffset) <= 0;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java
index f193488..74fd08d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.seekablestream.common;
 
+import java.util.Objects;
 
 /**
  * Represents a Kafka/Kinesis stream sequence number. Mainly used to do
@@ -51,4 +52,33 @@ public abstract class 
OrderedSequenceNumber<SequenceOffsetType>
   {
     return isExclusive;
   }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    OrderedSequenceNumber<?> that = (OrderedSequenceNumber<?>) o;
+    return isExclusive == that.isExclusive &&
+           Objects.equals(sequenceNumber, that.sequenceNumber);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(sequenceNumber, isExclusive);
+  }
+
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + "{" +
+           "sequenceNumber=" + sequenceNumber +
+           ", isExclusive=" + isExclusive +
+           '}';
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to