This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8dda186de4 [flink] Fix RemoteTableQuery key serializer to use trimmed
primary keys (#8145) (#8146)
8dda186de4 is described below
commit 8dda186de4612ec98c1dcc27da006a940e58e9ed
Author: Jordan Epstein <[email protected]>
AuthorDate: Sun Jun 7 20:58:50 2026 -0500
[flink] Fix RemoteTableQuery key serializer to use trimmed primary keys
(#8145) (#8146)
---
.../paimon/flink/query/RemoteTableQuery.java | 4 ++-
.../paimon/flink/RemoteLookupJoinITCase.java | 38 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
index 34d993eb3d..fca626de18 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
@@ -54,7 +54,9 @@ public class RemoteTableQuery implements TableQuery {
ServiceManager manager = this.table.store().newServiceManager();
this.client = new KvQueryClient(new QueryLocationImpl(manager), 1);
this.keySerializer =
- InternalSerializers.create(TypeUtils.project(table.rowType(),
table.primaryKeys()));
+ InternalSerializers.create(
+ TypeUtils.project(
+ this.table.rowType(),
this.table.schema().trimmedPrimaryKeys()));
}
public static boolean isRemoteServiceAvailable(FileStoreTable table) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
index df9a7ed59b..fca350da19 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
@@ -165,6 +166,43 @@ public class RemoteLookupJoinITCase extends
CatalogITCaseBase {
assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse();
}
+ @Test
+ public void testLookupPartitionedRemoteTable() throws Throwable {
+ // Partitioned primary-key table whose partition key and trimmed
primary key have different
+ // types: primaryKeys = (pt INT, k STRING), partitionKeys = (pt), so
the trimmed primary key
+ // handed to lookup is (k STRING). A key serializer built over the
full primary key reads
+ // field 0 as INT and breaks on the STRING key.
+ sql(
+ "CREATE TABLE DIMP (pt INT, k STRING, v INT, PRIMARY KEY (pt,
k) NOT ENFORCED) "
+ + "PARTITIONED BY (pt) WITH ('bucket' = '1')");
+ ServiceProxy proxy = launchQueryServer("DIMP");
+
+ proxy.write(GenericRow.of(1, BinaryString.fromString("a"), 100));
+ proxy.write(GenericRow.of(1, BinaryString.fromString("b"), 200));
+ proxy.write(GenericRow.of(2, BinaryString.fromString("a"), 300));
+
+ RemoteTableQuery query = new RemoteTableQuery(paimonTable("DIMP"));
+
+ // lookup is called with (partition, bucket, trimmedKey); trimmedKey =
(k STRING)
+ assertThat(query.lookup(row(1), 0,
GenericRow.of(BinaryString.fromString("a"))))
+ .isNotNull()
+ .extracting(r -> r.getInt(2))
+ .isEqualTo(100);
+ assertThat(query.lookup(row(1), 0,
GenericRow.of(BinaryString.fromString("b"))))
+ .isNotNull()
+ .extracting(r -> r.getInt(2))
+ .isEqualTo(200);
+ // same trimmed key ("a") in a different partition resolves through
the partition arg
+ assertThat(query.lookup(row(2), 0,
GenericRow.of(BinaryString.fromString("a"))))
+ .isNotNull()
+ .extracting(r -> r.getInt(2))
+ .isEqualTo(300);
+ assertThat(query.lookup(row(1), 0,
GenericRow.of(BinaryString.fromString("z")))).isNull();
+
+ query.close();
+ proxy.close();
+ }
+
private JobClient queryService(FileStoreTable table) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
QueryService.build(env, table, 2);