gianm opened a new pull request #7267: Logic adjustments to 
SeekableStreamIndexTaskRunner.
URL: https://github.com/apache/incubator-druid/pull/7267
 
 
   A mix of simplifications and bug fixes. They are intermingled because
   some of the bugs were made difficult to fix, and also more likely to
   happen in the first place, by how the code was structured. I tried to
   keep restructuring to a minimum. The changes are:
   
   - Remove "initialOffsetsSnapshot", which was used to determine when to
     skip start offsets. Replace it with "lastReadOffsets", which I hope
     is more intuitive. (There is a connection: start offsets must be
     skipped if and only if they have already been read, either by a
     previous task or by a previous sequence in the same task, post-restoring.)
   - Remove "isStartingSequenceOffsetsExclusive", because it should always
     be the opposite of isEndOffsetExclusive. The reason is that starts are
     exclusive exactly when the prior ends are inclusive: they must match
     up in that way for adjacent reads to link up properly.
   - Don't call "seekToStartingSequence" after the initial seek. There is
     no reason to, since we expect to read continuous message streams
     throughout the task. And calling it makes offset-tracking logic
     trickier, so better to avoid the need for trickiness. I believe the
     call being here was causing a bug in Kinesis ingestion where a
     message might get double-read.
   - Remove the "continue" calls in the main read loop. They are bad
     because they prevent keeping currOffsets and lastReadOffsets up to
     date, and prevent us from detecting that we have finished reading.
   - Rework "verifyInitialRecordAndSkipExclusivePartition" into
     "verifyRecordInRange". It no longer has side effects. It does a sanity
     check on the message offset and also makes sure that it is not past
     the endOffsets.
   - Rework "assignPartitions" to replace inline comparisons with
     "isRecordAlreadyRead" and "isMoreToReadBeforeReadingRecord" calls. I
     believe this fixes an off-by-one error with Kinesis where the last
     record would not get read. It also makes the logic easier to read.
   - When doing the final publish, only adjust end offsets of the final
     sequence, rather than potentially adjusting any unpublished sequence.
     Adjusting sequences other than the last one is a mistake since it
     will extend their endOffsets beyond what they actually read. (I'm not
     sure if this was an issue in practice, since I'm not sure if real
     world situations would have more than one unpublished sequence.)
   - Rename "isEndSequenceOffsetsExclusive" to "isEndOffsetExclusive". It's
     shorter and more clear, I think.
   - Add equals/hashCode/toString methods to OrderedSequenceNumber.
   
   Kafka test changes:
   
   - Added a Kafka "testRestoreAtEndOffset" test to verify that restores at
     the very end of the task lifecycle still work properly.
   
   Kinesis test changes:
   
   - Renamed "testRunOnNothing" to "testRunOnSingletonRange". I think that
     given Kinesis semantics, the right behavior when start offset equals
     end offset (and there aren't exclusive partitions set) is to read that
     single offset. This is because they are both meant to be treated as
     inclusive.
   - Adjusted "testRestoreAfterPersistingSequences" to expect one more
     message read. I believe the old test was wrong; it expected the task
     not to read message number 5.
   - Adjusted "testRunContextSequenceAheadOfStartingOffsets" to use a
     checkpoint starting from 1 rather than 2. I believe the old test was
     wrong here too; it was expecting the task to start reading from the
     checkpointed offset, but it actually should have started reading from
     one past the checkpointed offset.
   - Adjusted "testIncrementalHandOffReadsThroughEndOffsets" to expect
     11 messages read instead of 12. It's starting at message 0 and reading
     up to 10, which should be 11 messages.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to