This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e62187ebbf SolaceIO data loss - remove message ack from close and 
advance as it may lead to data loss during work rebalancing or retry. (#37007)
1e62187ebbf is described below

commit 1e62187ebbf115769219ca829e211669e73cf75e
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Mon Dec 8 18:29:44 2025 +0100

    SolaceIO data loss - remove message ack from close and advance as it may 
lead to data loss during work rebalancing or retry. (#37007)
---
 .../sdk/io/solace/read/SolaceCheckpointMark.java   |  9 ++-
 .../sdk/io/solace/read/UnboundedSolaceReader.java  | 33 ++--------
 .../beam/sdk/io/solace/SolaceIOReadTest.java       | 73 +++++++++++++++++++++-
 3 files changed, 78 insertions(+), 37 deletions(-)

diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
index a913fd6133e..eb2d4b3006a 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
@@ -18,8 +18,8 @@
 package org.apache.beam.sdk.io.solace.read;
 
 import com.solacesystems.jcsmp.BytesXMLMessage;
+import java.util.List;
 import java.util.Objects;
-import java.util.Queue;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
 @VisibleForTesting
 public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
   private static final Logger LOG = 
LoggerFactory.getLogger(SolaceCheckpointMark.class);
-  private transient Queue<BytesXMLMessage> safeToAck;
+  private transient List<BytesXMLMessage> safeToAck;
 
   @SuppressWarnings("initialization") // Avro will set the fields by breaking 
abstraction
   private SolaceCheckpointMark() {}
@@ -48,14 +48,13 @@ public class SolaceCheckpointMark implements 
UnboundedSource.CheckpointMark {
    *
    * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
    */
-  SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
+  SolaceCheckpointMark(List<BytesXMLMessage> safeToAck) {
     this.safeToAck = safeToAck;
   }
 
   @Override
   public void finalizeCheckpoint() {
-    BytesXMLMessage msg;
-    while ((msg = safeToAck.poll()) != null) {
+    for (BytesXMLMessage msg : safeToAck) {
       try {
         msg.ackMessage();
       } catch (IllegalStateException e) {
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
index dc84e0a0701..7c756169ef3 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
@@ -27,7 +27,6 @@ import java.util.ArrayDeque;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -42,6 +41,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -60,12 +60,6 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
   private @Nullable BytesXMLMessage solaceOriginalRecord;
   private @Nullable T solaceMappedRecord;
 
-  /**
-   * Queue to place advanced messages before {@link #getCheckpointMark()} is 
called. CAUTION:
-   * Accessed by both reader and checkpointing threads.
-   */
-  private final Queue<BytesXMLMessage> safeToAckMessages = new 
ConcurrentLinkedQueue<>();
-
   /**
    * Queue for messages that were ingested in the {@link #advance()} method, 
but not sent yet to a
    * {@link SolaceCheckpointMark}.
@@ -136,8 +130,6 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
 
   @Override
   public boolean advance() {
-    finalizeReadyMessages();
-
     BytesXMLMessage receivedXmlMessage;
     try {
       receivedXmlMessage = getSessionService().getReceiver().receive();
@@ -158,27 +150,9 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
 
   @Override
   public void close() {
-    finalizeReadyMessages();
     sessionServiceCache.invalidate(readerUuid);
   }
 
-  public void finalizeReadyMessages() {
-    BytesXMLMessage msg;
-    while ((msg = safeToAckMessages.poll()) != null) {
-      try {
-        msg.ackMessage();
-      } catch (IllegalStateException e) {
-        LOG.error(
-            "SolaceIO.Read: failed to acknowledge the message with 
applicationMessageId={}, ackMessageId={}. Returning the message to queue to 
retry.",
-            msg.getApplicationMessageId(),
-            msg.getAckMessageId(),
-            e);
-        safeToAckMessages.add(msg); // In case the error was transient, might 
succeed later
-        break; // Commit is only best effort
-      }
-    }
-  }
-
   @Override
   public Instant getWatermark() {
     // should be only used by a test receiver
@@ -190,9 +164,10 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
 
   @Override
   public UnboundedSource.CheckpointMark getCheckpointMark() {
-    safeToAckMessages.addAll(receivedMessages);
+
+    ImmutableList<BytesXMLMessage> bytesXMLMessages = 
ImmutableList.copyOf(receivedMessages);
     receivedMessages.clear();
-    return new SolaceCheckpointMark(safeToAckMessages);
+    return new SolaceCheckpointMark(bytesXMLMessages);
   }
 
   @Override
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
index a1f80932edd..c17ec3e128d 100644
--- 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
@@ -458,13 +458,13 @@ public class SolaceIOReadTest {
     // mark all consumed messages as ready to be acknowledged
     CheckpointMark checkpointMark = reader.getCheckpointMark();
 
-    // consume 1 more message. This will call #ackMsg() on messages that were 
ready to be acked.
+    // consume 1 more message.
     reader.advance();
-    assertEquals(4, countAckMessages.get());
+    assertEquals(0, countAckMessages.get());
 
     // consume 1 more message. No change in the acknowledged messages.
     reader.advance();
-    assertEquals(4, countAckMessages.get());
+    assertEquals(0, countAckMessages.get());
 
     // acknowledge from the first checkpoint
     checkpointMark.finalizeCheckpoint();
@@ -473,6 +473,73 @@ public class SolaceIOReadTest {
     assertEquals(4, countAckMessages.get());
   }
 
+  @Test
+  public void testLateCheckpointOverlappingFlushingOfNextBundle() throws 
Exception {
+    AtomicInteger countConsumedMessages = new AtomicInteger(0);
+    AtomicInteger countAckMessages = new AtomicInteger(0);
+
+    // Broker that creates input data
+    SerializableFunction<Integer, BytesXMLMessage> recordFn =
+        index -> {
+          List<BytesXMLMessage> messages = new ArrayList<>();
+          for (int i = 0; i < 10; i++) {
+            messages.add(
+                SolaceDataUtils.getBytesXmlMessage(
+                    "payload_test" + i, "45" + i, (num) -> 
countAckMessages.incrementAndGet()));
+          }
+          countConsumedMessages.incrementAndGet();
+          return getOrNull(index, messages);
+        };
+
+    SessionServiceFactory fakeSessionServiceFactory =
+        
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
+
+    Read<Record> spec =
+        getDefaultRead()
+            .withSessionServiceFactory(fakeSessionServiceFactory)
+            .withMaxNumConnections(4);
+
+    UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+
+    UnboundedReader<Record> reader =
+        initialSource.createReader(PipelineOptionsFactory.create(), null);
+
+    // start the reader and move to the first record
+    assertTrue(reader.start());
+
+    // consume 3 messages (NB: #start() already consumed the first message)
+    for (int i = 0; i < 3; i++) {
+      assertTrue(String.format("Failed at %d-th message", i), 
reader.advance());
+    }
+
+    // #advance() was called, but the messages were not ready to be 
acknowledged.
+    assertEquals(0, countAckMessages.get());
+
+    // mark all consumed messages as ready to be acknowledged
+    CheckpointMark checkpointMark = reader.getCheckpointMark();
+
+    // data is flushed
+
+    // consume 1 more message.
+    reader.advance();
+    assertEquals(0, countAckMessages.get());
+
+    // consume 1 more message. No change in the acknowledged messages.
+    reader.advance();
+    assertEquals(0, countAckMessages.get());
+
+    CheckpointMark checkpointMark2 = reader.getCheckpointMark();
+    // data is prepared for flushing that will be rejected
+
+    // acknowledge from the first checkpoint may arrive late
+    checkpointMark.finalizeCheckpoint();
+
+    assertEquals(4, countAckMessages.get());
+
+    checkpointMark2.finalizeCheckpoint();
+    assertEquals(6, countAckMessages.get());
+  }
+
   @Test
   public void testCheckpointMarkSafety() throws Exception {
 

Reply via email to