This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dda186de4 [flink] Fix RemoteTableQuery key serializer to use trimmed 
primary keys (#8145) (#8146)
8dda186de4 is described below

commit 8dda186de4612ec98c1dcc27da006a940e58e9ed
Author: Jordan Epstein <[email protected]>
AuthorDate: Sun Jun 7 20:58:50 2026 -0500

    [flink] Fix RemoteTableQuery key serializer to use trimmed primary keys 
(#8145) (#8146)
---
 .../paimon/flink/query/RemoteTableQuery.java       |  4 ++-
 .../paimon/flink/RemoteLookupJoinITCase.java       | 38 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
index 34d993eb3d..fca626de18 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
@@ -54,7 +54,9 @@ public class RemoteTableQuery implements TableQuery {
         ServiceManager manager = this.table.store().newServiceManager();
         this.client = new KvQueryClient(new QueryLocationImpl(manager), 1);
         this.keySerializer =
-                InternalSerializers.create(TypeUtils.project(table.rowType(), 
table.primaryKeys()));
+                InternalSerializers.create(
+                        TypeUtils.project(
+                                this.table.rowType(), 
this.table.schema().trimmedPrimaryKeys()));
     }
 
     public static boolean isRemoteServiceAvailable(FileStoreTable table) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
index df9a7ed59b..fca350da19 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
@@ -165,6 +166,43 @@ public class RemoteLookupJoinITCase extends 
CatalogITCaseBase {
         
assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse();
     }
 
+    @Test
+    public void testLookupPartitionedRemoteTable() throws Throwable {
+        // Partitioned primary-key table whose partition key and trimmed 
primary key have different
+        // types: primaryKeys = (pt INT, k STRING), partitionKeys = (pt), so 
the trimmed primary key
+        // handed to lookup is (k STRING). A key serializer built over the 
full primary key reads
+        // field 0 as INT and breaks on the STRING key.
+        sql(
+                "CREATE TABLE DIMP (pt INT, k STRING, v INT, PRIMARY KEY (pt, 
k) NOT ENFORCED) "
+                        + "PARTITIONED BY (pt) WITH ('bucket' = '1')");
+        ServiceProxy proxy = launchQueryServer("DIMP");
+
+        proxy.write(GenericRow.of(1, BinaryString.fromString("a"), 100));
+        proxy.write(GenericRow.of(1, BinaryString.fromString("b"), 200));
+        proxy.write(GenericRow.of(2, BinaryString.fromString("a"), 300));
+
+        RemoteTableQuery query = new RemoteTableQuery(paimonTable("DIMP"));
+
+        // lookup is called with (partition, bucket, trimmedKey); trimmedKey = 
(k STRING)
+        assertThat(query.lookup(row(1), 0, 
GenericRow.of(BinaryString.fromString("a"))))
+                .isNotNull()
+                .extracting(r -> r.getInt(2))
+                .isEqualTo(100);
+        assertThat(query.lookup(row(1), 0, 
GenericRow.of(BinaryString.fromString("b"))))
+                .isNotNull()
+                .extracting(r -> r.getInt(2))
+                .isEqualTo(200);
+        // same trimmed key ("a") in a different partition resolves through 
the partition arg
+        assertThat(query.lookup(row(2), 0, 
GenericRow.of(BinaryString.fromString("a"))))
+                .isNotNull()
+                .extracting(r -> r.getInt(2))
+                .isEqualTo(300);
+        assertThat(query.lookup(row(1), 0, 
GenericRow.of(BinaryString.fromString("z")))).isNull();
+
+        query.close();
+        proxy.close();
+    }
+
     private JobClient queryService(FileStoreTable table) throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         QueryService.build(env, table, 2);

Reply via email to