This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new be8cf18d440 IGNITE-27335 Fix client data streamer observable timestamp
(#7224)
be8cf18d440 is described below
commit be8cf18d44055c72304a59f78873571c2848667b
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Dec 12 17:02:06 2025 +0200
IGNITE-27335 Fix client data streamer observable timestamp (#7224)
Propagate observable timestamp from `STREAMER_BATCH_SEND` client operation
so that clients observe changes made by the streamer.
---
.../handler/ClientInboundMessageHandler.java | 5 +++-
.../streamer/ItAbstractDataStreamerTest.java | 32 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 183713a994e..e85b3d33f1b 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -1009,7 +1009,10 @@ public class ClientInboundMessageHandler
);
case ClientOp.STREAMER_BATCH_SEND:
- return ClientStreamerBatchSendRequest.process(in,
igniteTables);
+ return ClientStreamerBatchSendRequest.process(in,
igniteTables).thenApply(w -> {
+ tsTracker.update(clockService.current());
+ return w;
+ });
case ClientOp.PRIMARY_REPLICAS_GET:
return
ClientTablePartitionPrimaryReplicasNodesGetRequest.process(in, igniteTables);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index ebe2c0f1c1e..9e227c7301b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -63,6 +63,8 @@ import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.DataStreamerException;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.DataStreamerOperationType;
@@ -424,6 +426,36 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
}
}
+ @Test
+ public void testManyItemsWithSql() {
+ ignite().sql().executeScript("delete from " + TABLE_NAME);
+
+ int count = 10_000;
+ RecordView<Tuple> view = defaultTable().recordView();
+ CompletableFuture<Void> streamerFut;
+
+ try (var publisher = new
SubmissionPublisher<DataStreamerItem<Tuple>>()) {
+ var options = DataStreamerOptions.builder().pageSize(100).build();
+ streamerFut = view.streamData(publisher, options);
+
+ for (int i = 0; i < count; i++) {
+ publisher.submit(DataStreamerItem.of(tuple(i, "foo-" + i)));
+ }
+ }
+
+ streamerFut.orTimeout(30, TimeUnit.SECONDS).join();
+
+ ArrayList<String> sqlRes = new ArrayList<>(count);
+ ResultSet<SqlRow> resultSet = ignite().sql().execute(null, "SELECT
name FROM " + TABLE_NAME + " order by id");
+ resultSet.forEachRemaining(row -> sqlRes.add(row.stringValue(0)));
+
+ assertEquals(count, sqlRes.size());
+
+ for (int i = 0; i < sqlRes.size(); i++) {
+ assertEquals("foo-" + i, sqlRes.get(i));
+ }
+ }
+
@ParameterizedTest
@CsvSource({"100, false", "100, true", "1000, false", "1000, true"})
public void testSameItemMultipleUpdatesOrder(int pageSize, boolean
existingKey) {