yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1219596953


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,10 +280,33 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 

Review Comment:
   This `ConnectorOffsetBackingStore::set` method's Javadoc also needs to be 
updated to mention the special case handling for batches with `null` offsets 
since it currently states the following:
   
   ```
        * <p>If configured to use a connector-specific offset store, the 
returned {@link Future} corresponds to a
        * write to that store, and the passed-in {@link Callback} is invoked 
once that write completes. If a worker-global
        * store is provided, a secondary write is made to that store if the 
write to the connector-specific store
        * succeeds. Errors with this secondary write are not reflected in the 
returned {@link Future} or the passed-in
        * {@link Callback}; they are only logged as a warning to users.
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,10 +280,33 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
+        boolean containsTombstones = values.entrySet()
+                .stream()
+                .anyMatch(offset -> offset.getValue() == null);

Review Comment:
   nit: can be simplified
   ```suggestion
           boolean containsTombstones = values.containsValue(null);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,10 +280,33 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
+        boolean containsTombstones = values.entrySet()
+                .stream()
+                .anyMatch(offset -> offset.getValue() == null);
+
+        AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+
+        // If there are tombstone offsets, then the failure to write to 
secondary store will
+        // not be ignored. Also, for tombstone records, we first write to 
secondary store and
+        // then to primary stores.
+        if (secondaryStore != null && containsTombstones) {
+            secondaryStore.set(values, (secondaryWriteError, ignored) -> {
+                try (LoggingContext context = loggingContext()) {
+                    if (secondaryWriteError != null) {
+                        log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+                        secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+                    } else {
+                        log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+                    }
+                }
+            });
+        }
+
         return primaryStore.set(values, (primaryWriteError, ignored) -> {
-            if (secondaryStore != null) {
+            // Secondary store writes have already happened for tombstone 
records

Review Comment:
   How do we know this if we aren't blocking on the write to the secondary 
store above? I believe we should do a synchronous write to the secondary store 
in this tombstone offset case.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -302,7 +326,12 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
                 }
             }
             try (LoggingContext context = loggingContext()) {
-                callback.onCompletion(primaryWriteError, ignored);
+                Throwable secondaryWriteError = 
secondaryStoreTombstoneWriteError.get();
+                if (secondaryStore != null && containsTombstones && 
secondaryWriteError != null) {

Review Comment:
   Same as above - we aren't blocking on the write to the secondary store, so 
we can't be sure that it has completed at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to