This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2bf1e14b392 IGNITE-27311 Fix client KeyValueView DataStreamer column
order mixup (#7205)
2bf1e14b392 is described below
commit 2bf1e14b3928629f7f7c28d934049ca940ad4d49
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Dec 10 16:58:51 2025 +0200
IGNITE-27311 Fix client KeyValueView DataStreamer column order mixup (#7205)
Write columns in schema order.
---
.../internal/client/table/ClientKeyValueView.java | 14 +-
.../streamer/ItAbstractDataStreamerTest.java | 211 +++++++++++++++++++++
2 files changed, 219 insertions(+), 6 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
index 15eee0da91d..8897c05c5ca 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
@@ -749,16 +749,18 @@ public class ClientKeyValueView<K, V> extends
AbstractClientView<Entry<K, V>> im
for (Entry<K, V> e : items) {
boolean del = deleted != null && deleted.get(i++);
- int colCount = del ? s.keyColumns().length :
s.columns().length;
+ ClientColumn[] columns = del ? s.keyColumns() :
s.columns();
noValueSet.clear();
- var builder = new BinaryTupleBuilder(colCount);
+ var builder = new BinaryTupleBuilder(columns.length);
ClientMarshallerWriter writer = new
ClientMarshallerWriter(builder, noValueSet);
- keyMarsh.writeObject(e.getKey(), writer);
-
- if (!del) {
- valMarsh.writeObject(e.getValue(), writer);
+ for (var column : columns) {
+ if (column.key()) {
+ keyMarsh.writeField(e.getKey(), writer,
column.keyIndex());
+ } else {
+ valMarsh.writeField(e.getValue(), writer,
column.valIndex());
+ }
}
w.out().packBinaryTuple(builder, noValueSet);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index a631918fa74..8723b43bbb8 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -94,11 +94,20 @@ import org.junit.jupiter.params.provider.ValueSource;
public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrationTest {
public static final String TABLE_NAME = "test_table";
+ private static final String TABLE_NAME_COMPOSITE_KEY =
"test_table_composite_key";
+
abstract Ignite ignite();
@BeforeAll
public void createTable() {
createTable(TABLE_NAME, 2, 10);
+ sql("CREATE TABLE test_table_composite_key (\n"
+ + " name VARCHAR,\n"
+ + " data VARCHAR,\n"
+ + " uniqueId VARCHAR,\n"
+ + " foo VARCHAR,\n"
+ + " PRIMARY KEY (uniqueId, name)\n"
+ + ")");
}
@BeforeEach
@@ -202,6 +211,127 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
assertNull(view.get(null, 3));
}
+ @Test
+ public void testBasicStreamingCompositeKeyRecordBinaryView() {
+ RecordView<Tuple> view = compositeKeyTable().recordView();
+ view.upsert(null, compositeKeyTuple(1));
+ view.upsert(null, compositeKeyTuple(2));
+
+ CompletableFuture<Void> streamerFut;
+
+ try (var publisher = new
SubmissionPublisher<DataStreamerItem<Tuple>>()) {
+ streamerFut = view.streamData(publisher, null);
+
+ publisher.submit(DataStreamerItem.of(compositeKeyTuple(3)));
+ publisher.submit(DataStreamerItem.of(compositeKeyTuple(4)));
+
+
publisher.submit(DataStreamerItem.removed(compositeKeyTupleKey(1)));
+ }
+
+ streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+ assertNull(view.get(null, compositeKeyTupleKey(1)));
+ assertNotNull(view.get(null, compositeKeyTupleKey(2)));
+ assertNotNull(view.get(null, compositeKeyTupleKey(3)));
+ assertNotNull(view.get(null, compositeKeyTupleKey(4)));
+
+ Tuple resTuple = view.get(null, compositeKeyTupleKey(3));
+ assertEquals("name3", resTuple.stringValue("name"));
+ assertEquals("data3", resTuple.stringValue("data"));
+ assertEquals("uniqueId3", resTuple.stringValue("uniqueId"));
+ assertEquals("foo3", resTuple.stringValue("foo"));
+ }
+
+ @Test
+ public void testBasicStreamingCompositeKeyRecordPojoView() {
+ RecordView<CompositeKeyPojo> view =
compositeKeyTable().recordView(CompositeKeyPojo.class);
+ view.upsert(null, new CompositeKeyPojo(1, "data1", "foo1"));
+ view.upsert(null, new CompositeKeyPojo(2, "data2", "foo2"));
+
+ CompletableFuture<Void> streamerFut;
+
+ try (var publisher = new
SubmissionPublisher<DataStreamerItem<CompositeKeyPojo>>()) {
+ streamerFut = view.streamData(publisher, null);
+
+ publisher.submit(DataStreamerItem.of(new CompositeKeyPojo(3,
"data3", "foo3")));
+ publisher.submit(DataStreamerItem.of(new CompositeKeyPojo(4,
"data4", "foo4")));
+
+ publisher.submit(DataStreamerItem.removed(new
CompositeKeyPojo(1)));
+ }
+
+ streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+ assertNull(view.get(null, new CompositeKeyPojo(1)));
+ assertNotNull(view.get(null, new CompositeKeyPojo(2)));
+ assertNotNull(view.get(null, new CompositeKeyPojo(3)));
+ assertNotNull(view.get(null, new CompositeKeyPojo(4)));
+
+ CompositeKeyPojo resPojo = view.get(null, new CompositeKeyPojo(3));
+ assertEquals("name3", resPojo.name);
+ assertEquals("data3", resPojo.data);
+ assertEquals("uniqueId3", resPojo.uniqueId);
+ assertEquals("foo3", resPojo.foo);
+ }
+
+ @Test
+ public void testBasicStreamingCompositeKeyKvBinaryView() {
+ KeyValueView<Tuple, Tuple> view = compositeKeyTable().keyValueView();
+ view.put(null, compositeKeyTupleKey(1), compositeKeyTupleVal(1));
+ view.put(null, compositeKeyTupleKey(2), compositeKeyTupleVal(2));
+
+ CompletableFuture<Void> streamerFut;
+
+ try (var publisher = new
SubmissionPublisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>>()) {
+ streamerFut = view.streamData(publisher, null);
+
+
publisher.submit(DataStreamerItem.of(Map.entry(compositeKeyTupleKey(3),
compositeKeyTupleVal(3))));
+
publisher.submit(DataStreamerItem.of(Map.entry(compositeKeyTupleKey(4),
compositeKeyTupleVal(4))));
+
+
publisher.submit(DataStreamerItem.removed(Map.entry(compositeKeyTupleKey(1),
Tuple.create())));
+ }
+
+ streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+ assertNull(view.get(null, compositeKeyTupleKey(1)));
+ assertNotNull(view.get(null, compositeKeyTupleKey(2)));
+ assertNotNull(view.get(null, compositeKeyTupleKey(3)));
+ assertNotNull(view.get(null, compositeKeyTupleKey(4)));
+
+ Tuple resValue = view.get(null, compositeKeyTupleKey(3));
+ assertEquals("data3", resValue.stringValue("data"));
+ assertEquals("foo3", resValue.stringValue("foo"));
+ }
+
+ @Test
+ public void testBasicStreamingCompositeKeyKvPojoView() {
+ KeyValueView<CompositeKeyKeyPojo, CompositeKeyValPojo> view =
compositeKeyTable().keyValueView(
+ Mapper.of(CompositeKeyKeyPojo.class),
Mapper.of(CompositeKeyValPojo.class));
+ view.put(null, new CompositeKeyKeyPojo(1), new CompositeKeyValPojo(1));
+ view.put(null, new CompositeKeyKeyPojo(2), new CompositeKeyValPojo(2));
+
+ CompletableFuture<Void> streamerFut;
+
+ try (var publisher = new
SubmissionPublisher<DataStreamerItem<Map.Entry<CompositeKeyKeyPojo,
CompositeKeyValPojo>>>()) {
+ streamerFut = view.streamData(publisher, null);
+
+ publisher.submit(DataStreamerItem.of(Map.entry(new
CompositeKeyKeyPojo(3), new CompositeKeyValPojo(3))));
+ publisher.submit(DataStreamerItem.of(Map.entry(new
CompositeKeyKeyPojo(4), new CompositeKeyValPojo(4))));
+
+ publisher.submit(DataStreamerItem.removed(Map.entry(new
CompositeKeyKeyPojo(1), new CompositeKeyValPojo(1))));
+ }
+
+ streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+ assertNull(view.get(null, new CompositeKeyKeyPojo(1)));
+ assertNotNull(view.get(null, new CompositeKeyKeyPojo(2)));
+ assertNotNull(view.get(null, new CompositeKeyKeyPojo(3)));
+ assertNotNull(view.get(null, new CompositeKeyKeyPojo(4)));
+
+ CompositeKeyValPojo resValue = view.get(null, new
CompositeKeyKeyPojo(3));
+ assertEquals("data3", resValue.data);
+ assertEquals("foo3", resValue.foo);
+ }
+
@Test
public void testAutoFlushByTimer() throws InterruptedException {
RecordView<Tuple> view = this.defaultTable().recordView();
@@ -890,6 +1020,10 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
return ignite().tables().table(TABLE_NAME);
}
+ private Table compositeKeyTable() {
+ return ignite().tables().table(TABLE_NAME_COMPOSITE_KEY);
+ }
+
private static Tuple tuple(int id, String name) {
return Tuple.create()
.set("id", id)
@@ -901,6 +1035,26 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
.set("id", id);
}
+ private static Tuple compositeKeyTuple(int id) {
+ return Tuple.create()
+ .set("name", "name" + id)
+ .set("data", "data" + id)
+ .set("uniqueId", "uniqueId" + id)
+ .set("foo", "foo" + id);
+ }
+
+ private static Tuple compositeKeyTupleKey(int id) {
+ return Tuple.create()
+ .set("name", "name" + id)
+ .set("uniqueId", "uniqueId" + id);
+ }
+
+ private static Tuple compositeKeyTupleVal(int id) {
+ return Tuple.create()
+ .set("data", "data" + id)
+ .set("foo", "foo" + id);
+ }
+
@SuppressWarnings("unused")
private static class PersonPojo {
int id;
@@ -937,6 +1091,63 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
}
}
+ @SuppressWarnings("unused")
+ private static class CompositeKeyPojo {
+ String name;
+ String data;
+ String uniqueId;
+ String foo;
+
+ @SuppressWarnings("unused") // Required by serializer.
+ private CompositeKeyPojo() {
+ // No-op.
+ }
+
+ CompositeKeyPojo(int id) {
+ this.name = "name" + id;
+ this.uniqueId = "uniqueId" + id;
+ }
+
+ CompositeKeyPojo(int id, String data, String foo) {
+ this.name = "name" + id;
+ this.data = data;
+ this.uniqueId = "uniqueId" + id;
+ this.foo = foo;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private static class CompositeKeyKeyPojo {
+ String name;
+ String uniqueId;
+
+ @SuppressWarnings("unused") // Required by serializer.
+ private CompositeKeyKeyPojo() {
+ // No-op.
+ }
+
+ CompositeKeyKeyPojo(int id) {
+ this.name = "name" + id;
+ this.uniqueId = "uniqueId" + id;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private static class CompositeKeyValPojo {
+ String data;
+ String foo;
+
+ @SuppressWarnings("unused") // Required by serializer.
+ private CompositeKeyValPojo() {
+ // No-op.
+ }
+
+ CompositeKeyValPojo(int id) {
+ this.data = "data" + id;
+ this.foo = "foo" + id;
+ }
+ }
+
private static class TestReceiver implements DataStreamerReceiver<String,
Object, String> {
@Override
public CompletableFuture<List<String>> receive(List<String> page,
DataStreamerReceiverContext ctx, Object arg) {