jihoonson commented on issue #7239: Kafka tasks fail after resuming for incremental handoff URL: https://github.com/apache/incubator-druid/issues/7239#issuecomment-472072934 Yeah, `verifyInitialRecordAndSkipExclusivePartition` was added in #6431. The similar check was done like below before that PR. ```java if (record.offset() < endOffsets.get(record.partition())) { if (record.offset() != nextOffsets.get(record.partition())) { if (ioConfig.isSkipOffsetGaps()) { log.warn( "Skipped to offset[%,d] after offset[%,d] in partition[%d].", record.offset(), nextOffsets.get(record.partition()), record.partition() ); } else { throw new ISE( "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", record.offset(), nextOffsets.get(record.partition()), record.partition() ); } } ``` `nextOffsets` was updated only by incrementing an offset by 1 whenever reading a record, not in `setEndOffsets()`. So, after being resumed, the task could read more records to catch up to the assigned end offsets. Now, `verifyInitialRecordAndSkipExclusivePartition` checks `initialOffsetsSnapshot` which is updated on `setEndOffsets()`. Since `verifyInitialRecordAndSkipExclusivePartition` checks that record.offset >= initialOffsetsSnapshot, it always throws an error if it tries to read after resume.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
