This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2bf1e14b392 IGNITE-27311 Fix client KeyValueView DataStreamer column 
order mixup (#7205)
2bf1e14b392 is described below

commit 2bf1e14b3928629f7f7c28d934049ca940ad4d49
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Dec 10 16:58:51 2025 +0200

    IGNITE-27311 Fix client KeyValueView DataStreamer column order mixup (#7205)
    
    Write columns in schema order.
---
 .../internal/client/table/ClientKeyValueView.java  |  14 +-
 .../streamer/ItAbstractDataStreamerTest.java       | 211 +++++++++++++++++++++
 2 files changed, 219 insertions(+), 6 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
index 15eee0da91d..8897c05c5ca 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
@@ -749,16 +749,18 @@ public class ClientKeyValueView<K, V> extends 
AbstractClientView<Entry<K, V>> im
 
                     for (Entry<K, V> e : items) {
                         boolean del = deleted != null && deleted.get(i++);
-                        int colCount = del ? s.keyColumns().length : 
s.columns().length;
+                        ClientColumn[] columns = del ? s.keyColumns() : 
s.columns();
 
                         noValueSet.clear();
-                        var builder = new BinaryTupleBuilder(colCount);
+                        var builder = new BinaryTupleBuilder(columns.length);
                         ClientMarshallerWriter writer = new 
ClientMarshallerWriter(builder, noValueSet);
 
-                        keyMarsh.writeObject(e.getKey(), writer);
-
-                        if (!del) {
-                            valMarsh.writeObject(e.getValue(), writer);
+                        for (var column : columns) {
+                            if (column.key()) {
+                                keyMarsh.writeField(e.getKey(), writer, 
column.keyIndex());
+                            } else {
+                                valMarsh.writeField(e.getValue(), writer, 
column.valIndex());
+                            }
                         }
 
                         w.out().packBinaryTuple(builder, noValueSet);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index a631918fa74..8723b43bbb8 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -94,11 +94,20 @@ import org.junit.jupiter.params.provider.ValueSource;
 public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrationTest {
     public static final String TABLE_NAME = "test_table";
 
+    private static final String TABLE_NAME_COMPOSITE_KEY = 
"test_table_composite_key";
+
     abstract Ignite ignite();
 
     @BeforeAll
     public void createTable() {
         createTable(TABLE_NAME, 2, 10);
+        sql("CREATE TABLE test_table_composite_key (\n"
+                + "    name VARCHAR,\n"
+                + "    data VARCHAR,\n"
+                + "    uniqueId VARCHAR,\n"
+                + "    foo VARCHAR,\n"
+                + "    PRIMARY KEY (uniqueId, name)\n"
+                + ")");
     }
 
     @BeforeEach
@@ -202,6 +211,127 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
         assertNull(view.get(null, 3));
     }
 
+    @Test
+    public void testBasicStreamingCompositeKeyRecordBinaryView() {
+        RecordView<Tuple> view = compositeKeyTable().recordView();
+        view.upsert(null, compositeKeyTuple(1));
+        view.upsert(null, compositeKeyTuple(2));
+
+        CompletableFuture<Void> streamerFut;
+
+        try (var publisher = new 
SubmissionPublisher<DataStreamerItem<Tuple>>()) {
+            streamerFut = view.streamData(publisher, null);
+
+            publisher.submit(DataStreamerItem.of(compositeKeyTuple(3)));
+            publisher.submit(DataStreamerItem.of(compositeKeyTuple(4)));
+
+            
publisher.submit(DataStreamerItem.removed(compositeKeyTupleKey(1)));
+        }
+
+        streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+        assertNull(view.get(null, compositeKeyTupleKey(1)));
+        assertNotNull(view.get(null, compositeKeyTupleKey(2)));
+        assertNotNull(view.get(null, compositeKeyTupleKey(3)));
+        assertNotNull(view.get(null, compositeKeyTupleKey(4)));
+
+        Tuple resTuple = view.get(null, compositeKeyTupleKey(3));
+        assertEquals("name3", resTuple.stringValue("name"));
+        assertEquals("data3", resTuple.stringValue("data"));
+        assertEquals("uniqueId3", resTuple.stringValue("uniqueId"));
+        assertEquals("foo3", resTuple.stringValue("foo"));
+    }
+
+    @Test
+    public void testBasicStreamingCompositeKeyRecordPojoView() {
+        RecordView<CompositeKeyPojo> view = 
compositeKeyTable().recordView(CompositeKeyPojo.class);
+        view.upsert(null, new CompositeKeyPojo(1, "data1", "foo1"));
+        view.upsert(null, new CompositeKeyPojo(2, "data2", "foo2"));
+
+        CompletableFuture<Void> streamerFut;
+
+        try (var publisher = new 
SubmissionPublisher<DataStreamerItem<CompositeKeyPojo>>()) {
+            streamerFut = view.streamData(publisher, null);
+
+            publisher.submit(DataStreamerItem.of(new CompositeKeyPojo(3, 
"data3", "foo3")));
+            publisher.submit(DataStreamerItem.of(new CompositeKeyPojo(4, 
"data4", "foo4")));
+
+            publisher.submit(DataStreamerItem.removed(new 
CompositeKeyPojo(1)));
+        }
+
+        streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+        assertNull(view.get(null, new CompositeKeyPojo(1)));
+        assertNotNull(view.get(null, new CompositeKeyPojo(2)));
+        assertNotNull(view.get(null, new CompositeKeyPojo(3)));
+        assertNotNull(view.get(null, new CompositeKeyPojo(4)));
+
+        CompositeKeyPojo resPojo = view.get(null, new CompositeKeyPojo(3));
+        assertEquals("name3", resPojo.name);
+        assertEquals("data3", resPojo.data);
+        assertEquals("uniqueId3", resPojo.uniqueId);
+        assertEquals("foo3", resPojo.foo);
+    }
+
+    @Test
+    public void testBasicStreamingCompositeKeyKvBinaryView() {
+        KeyValueView<Tuple, Tuple> view = compositeKeyTable().keyValueView();
+        view.put(null, compositeKeyTupleKey(1), compositeKeyTupleVal(1));
+        view.put(null, compositeKeyTupleKey(2), compositeKeyTupleVal(2));
+
+        CompletableFuture<Void> streamerFut;
+
+        try (var publisher = new 
SubmissionPublisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>>()) {
+            streamerFut = view.streamData(publisher, null);
+
+            
publisher.submit(DataStreamerItem.of(Map.entry(compositeKeyTupleKey(3), 
compositeKeyTupleVal(3))));
+            
publisher.submit(DataStreamerItem.of(Map.entry(compositeKeyTupleKey(4), 
compositeKeyTupleVal(4))));
+
+            
publisher.submit(DataStreamerItem.removed(Map.entry(compositeKeyTupleKey(1), 
Tuple.create())));
+        }
+
+        streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+        assertNull(view.get(null, compositeKeyTupleKey(1)));
+        assertNotNull(view.get(null, compositeKeyTupleKey(2)));
+        assertNotNull(view.get(null, compositeKeyTupleKey(3)));
+        assertNotNull(view.get(null, compositeKeyTupleKey(4)));
+
+        Tuple resValue = view.get(null, compositeKeyTupleKey(3));
+        assertEquals("data3", resValue.stringValue("data"));
+        assertEquals("foo3", resValue.stringValue("foo"));
+    }
+
+    @Test
+    public void testBasicStreamingCompositeKeyKvPojoView() {
+        KeyValueView<CompositeKeyKeyPojo, CompositeKeyValPojo> view = 
compositeKeyTable().keyValueView(
+                Mapper.of(CompositeKeyKeyPojo.class), 
Mapper.of(CompositeKeyValPojo.class));
+        view.put(null, new CompositeKeyKeyPojo(1), new CompositeKeyValPojo(1));
+        view.put(null, new CompositeKeyKeyPojo(2), new CompositeKeyValPojo(2));
+
+        CompletableFuture<Void> streamerFut;
+
+        try (var publisher = new 
SubmissionPublisher<DataStreamerItem<Map.Entry<CompositeKeyKeyPojo, 
CompositeKeyValPojo>>>()) {
+            streamerFut = view.streamData(publisher, null);
+
+            publisher.submit(DataStreamerItem.of(Map.entry(new 
CompositeKeyKeyPojo(3), new CompositeKeyValPojo(3))));
+            publisher.submit(DataStreamerItem.of(Map.entry(new 
CompositeKeyKeyPojo(4), new CompositeKeyValPojo(4))));
+
+            publisher.submit(DataStreamerItem.removed(Map.entry(new 
CompositeKeyKeyPojo(1), new CompositeKeyValPojo(1))));
+        }
+
+        streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+        assertNull(view.get(null, new CompositeKeyKeyPojo(1)));
+        assertNotNull(view.get(null, new CompositeKeyKeyPojo(2)));
+        assertNotNull(view.get(null, new CompositeKeyKeyPojo(3)));
+        assertNotNull(view.get(null, new CompositeKeyKeyPojo(4)));
+
+        CompositeKeyValPojo resValue = view.get(null, new 
CompositeKeyKeyPojo(3));
+        assertEquals("data3", resValue.data);
+        assertEquals("foo3", resValue.foo);
+    }
+
     @Test
     public void testAutoFlushByTimer() throws InterruptedException {
         RecordView<Tuple> view = this.defaultTable().recordView();
@@ -890,6 +1020,10 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
         return ignite().tables().table(TABLE_NAME);
     }
 
+    private Table compositeKeyTable() {
+        return ignite().tables().table(TABLE_NAME_COMPOSITE_KEY);
+    }
+
     private static Tuple tuple(int id, String name) {
         return Tuple.create()
                 .set("id", id)
@@ -901,6 +1035,26 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
                 .set("id", id);
     }
 
+    private static Tuple compositeKeyTuple(int id) {
+        return Tuple.create()
+                .set("name", "name" + id)
+                .set("data", "data" + id)
+                .set("uniqueId", "uniqueId" + id)
+                .set("foo", "foo" + id);
+    }
+
+    private static Tuple compositeKeyTupleKey(int id) {
+        return Tuple.create()
+                .set("name", "name" + id)
+                .set("uniqueId", "uniqueId" + id);
+    }
+
+    private static Tuple compositeKeyTupleVal(int id) {
+        return Tuple.create()
+                .set("data", "data" + id)
+                .set("foo", "foo" + id);
+    }
+
     @SuppressWarnings("unused")
     private static class PersonPojo {
         int id;
@@ -937,6 +1091,63 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
         }
     }
 
+    @SuppressWarnings("unused")
+    private static class CompositeKeyPojo {
+        String name;
+        String data;
+        String uniqueId;
+        String foo;
+
+        @SuppressWarnings("unused") // Required by serializer.
+        private CompositeKeyPojo() {
+            // No-op.
+        }
+
+        CompositeKeyPojo(int id) {
+            this.name = "name" + id;
+            this.uniqueId = "uniqueId" + id;
+        }
+
+        CompositeKeyPojo(int id, String data, String foo) {
+            this.name = "name" + id;
+            this.data = data;
+            this.uniqueId = "uniqueId" + id;
+            this.foo = foo;
+        }
+    }
+
+    @SuppressWarnings("unused")
+    private static class CompositeKeyKeyPojo {
+        String name;
+        String uniqueId;
+
+        @SuppressWarnings("unused") // Required by serializer.
+        private CompositeKeyKeyPojo() {
+            // No-op.
+        }
+
+        CompositeKeyKeyPojo(int id) {
+            this.name = "name" + id;
+            this.uniqueId = "uniqueId" + id;
+        }
+    }
+
+    @SuppressWarnings("unused")
+    private static class CompositeKeyValPojo {
+        String data;
+        String foo;
+
+        @SuppressWarnings("unused") // Required by serializer.
+        private CompositeKeyValPojo() {
+            // No-op.
+        }
+
+        CompositeKeyValPojo(int id) {
+            this.data = "data" + id;
+            this.foo = "foo" + id;
+        }
+    }
+
     private static class TestReceiver implements DataStreamerReceiver<String, 
Object, String> {
         @Override
         public CompletableFuture<List<String>> receive(List<String> page, 
DataStreamerReceiverContext ctx, Object arg) {

Reply via email to