scwhittle commented on code in PR #37007:
URL: https://github.com/apache/beam/pull/37007#discussion_r2597707328


##########
sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java:
##########
@@ -473,6 +473,70 @@ public void testCheckpointMarkAndFinalizeSeparately() 
throws Exception {
     assertEquals(4, countAckMessages.get());
   }
 
+  @Test
+  public void testLateCheckpointOverlappingFlushingOfNextBundle() throws 
Exception {
+    AtomicInteger countConsumedMessages = new AtomicInteger(0);
+    AtomicInteger countAckMessages = new AtomicInteger(0);
+
+    // Broker that creates input data
+    SerializableFunction<Integer, BytesXMLMessage> recordFn =
+        index -> {
+          List<BytesXMLMessage> messages = new ArrayList<>();
+          for (int i = 0; i < 10; i++) {
+            messages.add(
+                SolaceDataUtils.getBytesXmlMessage(
+                    "payload_test" + i, "45" + i, (num) -> 
countAckMessages.incrementAndGet()));
+          }
+          countConsumedMessages.incrementAndGet();
+          return getOrNull(index, messages);
+        };
+
+    SessionServiceFactory fakeSessionServiceFactory =
+        
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
+
+    Read<Record> spec =
+        getDefaultRead()
+            .withSessionServiceFactory(fakeSessionServiceFactory)
+            .withMaxNumConnections(4);
+
+    UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+
+    UnboundedReader<Record> reader =
+        initialSource.createReader(PipelineOptionsFactory.create(), null);
+
+    // start the reader and move to the first record
+    assertTrue(reader.start());
+
+    // consume 3 messages (NB: #start() already consumed the first message)
+    for (int i = 0; i < 3; i++) {
+      assertTrue(String.format("Failed at %d-th message", i), 
reader.advance());
+    }
+
+    // #advance() was called, but the messages were not ready to be 
acknowledged.
+    assertEquals(0, countAckMessages.get());
+
+    // mark all consumed messages as ready to be acknowledged
+    CheckpointMark checkpointMark = reader.getCheckpointMark();
+
+    // data is flushed
+
+    // consume 1 more message. This will call #ackMsg() on messages that were 
ready to be acked.

Review Comment:
   why are these messages ready to be acked? We haven't gotten notification the 
checkpoint completed so this seems like a data-loss bug.



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -190,6 +190,7 @@ public Instant getWatermark() {
 
   @Override
   public UnboundedSource.CheckpointMark getCheckpointMark() {
+    safeToAckMessages = new ConcurrentLinkedQueue<>();

Review Comment:
   can't leave a comment above, but I think that finalizeReadyMessages called 
in advance and close also seem problematic.
   
   Dataflow streaming for example has a cache of UnboundedReaders that it will 
close if they are idle for a while (which could occur even if there are 
messages if the other nodes in the graph taking up all the processing threads).
   
   See test comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to