C0urante commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1175123889
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: I think the most convincing point in favor of monotonicity is that violations of it will seem like a bug to users and will almost certainly lead to Jira tickets being logged in the future asking us to fix that behavior. I think it's important to keep in mind that exactly-once offset syncing is impossible no matter what, and in addition, any consumer applications that cannot tolerate data loss should already have `auto.offset.reset` set to `earliest`, regardless of whether we make this change or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org