gharris1727 commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: @urbandan > Monotonicity is an optimization - it tries to minimize re-processing after a failover. (Please correct me if I'm wrong, but I don't really see any other user stories behind it.) Yes this is correct, but I think it misses some nuance. The overall feature (offset translation, checkpoints, syncing offsets to the consumer group) is an optimization: users could just replicate their data, and start reading the replica from the beginning. And relevant to your earlier simplicity argument: it would be simpler to not offer offset translation, but would require users to re-deliver more data. In the same way, monotonicity is an optimization to not rewind checkpoints when we have already translated better ones: We could choose to not offer monotonicity for the simplicity but (with this specific in-memory store implementation) that means that if you are N offsets behind the replication flow, your offset could fall backward by another N offsets, doubling the data re-delivery. I would also consider the user-story of someone monitoring their cluster, and noticing that despite consumers and replication moving forward, the checkpoints topic is moving _backwards_. Though no error has occurred, the result of the offset translation is getting worse. To someone unfamiliar with the internal workings of this algorithm, it looks like we're just generating arbitrary offsets. Actually we were already doing that before KAFKA-13659, and someone went through the effort to figure out why their offsets went backwards each time the connector restarted to open a ticket. Additionally, it increases the difference in semantics between using the raw checkpoints topic and synced consumer offsets, which I think is more likely to lead to confusion. > Being able to checkpoint old offsets after a restart is a feature, and probably a good one. If cluster restarts/rebalances are frequent enough, and some consumers are lagging behind consistently, they might never get their checkpoints translated, ever. I completely agree. I think that offset translation _must_ be extended to offer translation of very old offsets, and the solution in this PR is just part of that overall solution. The limitation you pointed out is a very important one, as it's the core limitation that this PR is trying to solve. Before this PR, consumers lagging behind the latest offset sync couldn't be translated. After this PR, consumers lagging behind the latest checkpoint task restart can't be translated. I think that is a strict improvement, and even if this PR doesn't address the issue completely, it is worth merging to get a release out-the-door. > Based on our discussion so far, it seems that monotonicity will not allow us to implement "old offset checkpointing after restart", which is not ideal. Worst case of breaking monotonicity is sub-optimal failover and extra re-processing. Worst case for not translating old offsets after restart is never checkpointing a group - then, with the default consumer config (auto.offset.reset=latest), failover results in data loss. I don't think the auto.offset.reset=latest situation is a fair criticism of this PR, as that can happen with or without this change. And actually, it's more likely to happen without this change, as the translation window is smaller. You are welcome to help in designing and implementing the full back-translation algorithm and help get it landed in 3.5.1/3.6. This PR is a tactical change to get us over the line to 3.5.0, and is necessarily going to leave some things on the table. I'm glad that you are interested in MM2 offset translation, as I think full back-translation may be more than I can take on alone. Are you interested in exploring these ideas yourself and opening a follow-up PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org