yashmayya commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1219596953
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,10 +280,33 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } Review Comment: This `ConnectorOffsetBackingStore::set` method's Javadoc also needs to be updated to mention the special case handling for batches with `null` offsets since it currently states the following: ``` * <p>If configured to use a connector-specific offset store, the returned {@link Future} corresponds to a * write to that store, and the passed-in {@link Callback} is invoked once that write completes. If a worker-global * store is provided, a secondary write is made to that store if the write to the connector-specific store * succeeds. Errors with this secondary write are not reflected in the returned {@link Future} or the passed-in * {@link Callback}; they are only logged as a warning to users. ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,10 +280,33 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } + boolean containsTombstones = values.entrySet() + .stream() + .anyMatch(offset -> offset.getValue() == null); Review Comment: nit: can be simplified ```suggestion boolean containsTombstones = values.containsValue(null); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,10 +280,33 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } + boolean containsTombstones = values.entrySet() + .stream() + .anyMatch(offset -> offset.getValue() == null); + + AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new AtomicReference<>(); + + // If there are tombstone offsets, then the failure to write to secondary store will + // not be ignored. Also, for tombstone records, we first write to secondary store and + // then to primary stores. + if (secondaryStore != null && containsTombstones) { + secondaryStore.set(values, (secondaryWriteError, ignored) -> { + try (LoggingContext context = loggingContext()) { + if (secondaryWriteError != null) { + log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); + secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); + } else { + log.debug("Successfully flushed tombstone offsets to secondary backing store"); + } + } + }); + } + return primaryStore.set(values, (primaryWriteError, ignored) -> { - if (secondaryStore != null) { + // Secondary store writes have already happened for tombstone records Review Comment: How do we know this if we aren't blocking on the write to the secondary store above? I believe we should do a synchronous write to the secondary store in this tombstone offset case. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -302,7 +326,12 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb } } try (LoggingContext context = loggingContext()) { - callback.onCompletion(primaryWriteError, ignored); + Throwable secondaryWriteError = secondaryStoreTombstoneWriteError.get(); + if (secondaryStore != null && containsTombstones && secondaryWriteError != null) { Review Comment: Same as above - we aren't blocking on the write to the secondary store, so we can't be sure that it has completed at this point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org