This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new be8cf18d440 IGNITE-27335 Fix client data streamer observable timestamp 
(#7224)
be8cf18d440 is described below

commit be8cf18d44055c72304a59f78873571c2848667b
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Dec 12 17:02:06 2025 +0200

    IGNITE-27335 Fix client data streamer observable timestamp (#7224)
    
    Propagate observable timestamp from `STREAMER_BATCH_SEND` client operation 
so that clients observe changes made by the streamer.
---
 .../handler/ClientInboundMessageHandler.java       |  5 +++-
 .../streamer/ItAbstractDataStreamerTest.java       | 32 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 183713a994e..e85b3d33f1b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -1009,7 +1009,10 @@ public class ClientInboundMessageHandler
                 );
 
             case ClientOp.STREAMER_BATCH_SEND:
-                return ClientStreamerBatchSendRequest.process(in, 
igniteTables);
+                return ClientStreamerBatchSendRequest.process(in, 
igniteTables).thenApply(w -> {
+                    tsTracker.update(clockService.current());
+                    return w;
+                });
 
             case ClientOp.PRIMARY_REPLICAS_GET:
                 return 
ClientTablePartitionPrimaryReplicasNodesGetRequest.process(in, igniteTables);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index ebe2c0f1c1e..9e227c7301b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -63,6 +63,8 @@ import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerException;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOperationType;
@@ -424,6 +426,36 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
         }
     }
 
+    @Test
+    public void testManyItemsWithSql() {
+        ignite().sql().executeScript("delete from " + TABLE_NAME);
+
+        int count = 10_000;
+        RecordView<Tuple> view = defaultTable().recordView();
+        CompletableFuture<Void> streamerFut;
+
+        try (var publisher = new 
SubmissionPublisher<DataStreamerItem<Tuple>>()) {
+            var options = DataStreamerOptions.builder().pageSize(100).build();
+            streamerFut = view.streamData(publisher, options);
+
+            for (int i = 0; i < count; i++) {
+                publisher.submit(DataStreamerItem.of(tuple(i, "foo-" + i)));
+            }
+        }
+
+        streamerFut.orTimeout(30, TimeUnit.SECONDS).join();
+
+        ArrayList<String> sqlRes = new ArrayList<>(count);
+        ResultSet<SqlRow> resultSet = ignite().sql().execute(null, "SELECT 
name FROM " + TABLE_NAME + " order by id");
+        resultSet.forEachRemaining(row -> sqlRes.add(row.stringValue(0)));
+
+        assertEquals(count, sqlRes.size());
+
+        for (int i = 0; i < sqlRes.size(); i++) {
+            assertEquals("foo-" + i, sqlRes.get(i));
+        }
+    }
+
     @ParameterizedTest
     @CsvSource({"100, false", "100, true", "1000, false", "1000, true"})
     public void testSameItemMultipleUpdatesOrder(int pageSize, boolean 
existingKey) {

Reply via email to