This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c4b68ed0 [core] Add hint in local lookup file tracking with remote 
file (#3814)
7c4b68ed0 is described below

commit 7c4b68ed06c441b3c6f7e9df896c4492ae8985e1
Author: Aitozi <[email protected]>
AuthorDate: Wed Jul 31 16:32:20 2024 +0800

    [core] Add hint in local lookup file tracking with remote file (#3814)
---
 .../paimon/utils/InternalRowPartitionComputer.java | 19 +++++++
 .../utils/InternalRowPartitionComputerTest.java    | 58 ++++++++++++++++++++++
 .../java/org/apache/paimon/KeyValueFileStore.java  |  1 +
 .../org/apache/paimon/disk/FileChannelManager.java |  3 ++
 .../apache/paimon/disk/FileChannelManagerImpl.java |  6 +++
 .../java/org/apache/paimon/disk/FileIOChannel.java |  5 ++
 .../java/org/apache/paimon/disk/IOManager.java     |  2 +
 .../java/org/apache/paimon/disk/IOManagerImpl.java |  5 ++
 .../org/apache/paimon/mergetree/LookupFile.java    | 12 +++++
 .../org/apache/paimon/mergetree/LookupLevels.java  |  7 ++-
 .../paimon/operation/KeyValueFileStoreWrite.java   | 20 ++++++--
 .../apache/paimon/table/query/LocalTableQuery.java | 17 ++++++-
 .../paimon/mergetree/ContainsLevelsTest.java       |  2 +-
 .../apache/paimon/mergetree/LookupFileTest.java    | 49 ++++++++++++++++++
 .../apache/paimon/mergetree/LookupLevelsTest.java  |  2 +-
 .../flink/source/TestChangelogDataReadWrite.java   |  3 ++
 16 files changed, 200 insertions(+), 11 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
index bdc5197fe..07746d58d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.utils;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.types.RowType;
 
@@ -77,4 +78,22 @@ public class InternalRowPartitionComputer {
         }
         return partValues;
     }
+
+    public static String paritionToString(
+            RowType partitionType, BinaryRow partition, String delimiter) {
+        InternalRow.FieldGetter[] getters = partitionType.fieldGetters();
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < getters.length; i++) {
+            Object part = getters[i].getFieldOrNull(partition);
+            if (part != null) {
+                builder.append(part);
+            } else {
+                builder.append("null");
+            }
+            if (i != getters.length - 1) {
+                builder.append(delimiter);
+            }
+        }
+        return builder.toString();
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
new file mode 100644
index 000000000..fcd79fa4c
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link InternalRowPartitionComputer}. */
+public class InternalRowPartitionComputerTest {
+
+    @Test
+    public void testPartitionToString() {
+        RowType rowType = RowType.of();
+        BinaryRow binaryRow = new BinaryRow(0);
+        BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
+        assertThat(InternalRowPartitionComputer.paritionToString(rowType, 
binaryRow, "-"))
+                .isEqualTo("");
+
+        rowType = RowType.of(DataTypes.STRING(), DataTypes.INT());
+        binaryRow = new BinaryRow(2);
+        writer = new BinaryRowWriter(binaryRow);
+        writer.writeString(0, BinaryString.fromString("20240731"));
+        writer.writeInt(1, 10);
+        assertThat(InternalRowPartitionComputer.paritionToString(rowType, 
binaryRow, "-"))
+                .isEqualTo("20240731-10");
+
+        rowType = RowType.of(DataTypes.STRING(), DataTypes.INT());
+        binaryRow = new BinaryRow(2);
+        writer = new BinaryRowWriter(binaryRow);
+        writer.setNullAt(0);
+        writer.writeInt(1, 10);
+        assertThat(InternalRowPartitionComputer.paritionToString(rowType, 
binaryRow, "-"))
+                .isEqualTo("null-10");
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index c17f1c252..26341d045 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -169,6 +169,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 schemaManager,
                 schema,
                 commitUser,
+                partitionType,
                 keyType,
                 valueType,
                 keyComparatorSupplier,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java
index 1c6a709cf..7349d4566 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java
@@ -26,6 +26,9 @@ public interface FileChannelManager extends AutoCloseable {
     /** Creates an ID identifying an underlying file channel and returns it. */
     FileIOChannel.ID createChannel();
 
+    /** Creates an ID identifying an underlying file channel and returns it. */
+    FileIOChannel.ID createChannel(String prefix);
+
     /** Creates an enumerator for channels that logically belong together and 
returns it. */
     FileIOChannel.Enumerator createChannelEnumerator();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
index d78a361c9..ce175e90b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
@@ -89,6 +89,12 @@ public class FileChannelManagerImpl implements 
FileChannelManager {
         return new ID(paths[num], num, random);
     }
 
+    @Override
+    public ID createChannel(String prefix) {
+        int num = (int) (nextPath.getAndIncrement() % paths.length);
+        return new ID(paths[num], num, prefix, random);
+    }
+
     @Override
     public Enumerator createChannelEnumerator() {
         return new Enumerator(paths, random);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java
index 5c9a262df..4b72029dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java
@@ -102,6 +102,11 @@ public interface FileIOChannel {
             this.bucketNum = bucketNum;
         }
 
+        public ID(File basePath, int bucketNum, String prefix, Random random) {
+            this.path = new File(basePath, prefix + "-" + randomString(random) 
+ ".channel");
+            this.bucketNum = bucketNum;
+        }
+
         /** Returns the path to the underlying temporary file. */
         public String getPath() {
             return path.getAbsolutePath();
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java
index 8e6b67f83..510492583 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java
@@ -34,6 +34,8 @@ public interface IOManager extends AutoCloseable {
 
     ID createChannel();
 
+    ID createChannel(String prefix);
+
     String[] tempDirs();
 
     Enumerator createChannelEnumerator();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
index 50b8621ec..d39c8efb5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
@@ -77,6 +77,11 @@ public class IOManagerImpl implements IOManager {
         return fileChannelManager.createChannel();
     }
 
+    @Override
+    public ID createChannel(String prefix) {
+        return fileChannelManager.createChannel(prefix);
+    }
+
     @Override
     public String[] tempDirs() {
         return tempDirs;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
index 429ede810..4a1b04f52 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -105,4 +105,16 @@ public class LookupFile implements Closeable {
             }
         }
     }
+
+    public static String localFilePrefix(
+            String partition, int bucket, String remoteFileName, int length) {
+        String identifier;
+        if (partition.isEmpty()) {
+            identifier = String.format("%s-%s", bucket, remoteFileName);
+        } else {
+            identifier = String.format("%s-%s-%s", partition, bucket, 
remoteFileName);
+        }
+
+        return identifier.substring(0, Math.min(identifier.length(), length));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index 6cb3bc61b..da1192a6e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -47,7 +47,6 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.function.Function;
-import java.util.function.Supplier;
 
 import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE;
 import static org.apache.paimon.utils.VarLengthIntUtils.decodeLong;
@@ -61,7 +60,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
     private final RowCompactedSerializer keySerializer;
     private final ValueProcessor<T> valueProcessor;
     private final IOFunction<DataFileMeta, RecordReader<KeyValue>> 
fileReaderFactory;
-    private final Supplier<File> localFileFactory;
+    private final Function<String, File> localFileFactory;
     private final LookupStoreFactory lookupStoreFactory;
     private final Function<Long, BloomFilter.Builder> bfGenerator;
 
@@ -74,7 +73,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             RowType keyType,
             ValueProcessor<T> valueProcessor,
             IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
-            Supplier<File> localFileFactory,
+            Function<String, File> localFileFactory,
             LookupStoreFactory lookupStoreFactory,
             Function<Long, BloomFilter.Builder> bfGenerator,
             Cache<String, LookupFile> lookupFileCache) {
@@ -145,7 +144,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
     }
 
     private LookupFile createLookupFile(DataFileMeta file) throws IOException {
-        File localFile = localFileFactory.get();
+        File localFile = localFileFactory.apply(file.fileName());
         if (!localFile.createNewFile()) {
             throw new IOException("Can not create new file: " + localFile);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 85a796d2e..545f20f09 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -69,6 +69,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.UserDefinedSeqComparator;
 
@@ -104,6 +105,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private final FileIO fileIO;
     private final RowType keyType;
     private final RowType valueType;
+    private final RowType partitionType;
     @Nullable private final RecordLevelExpire recordLevelExpire;
     @Nullable private Cache<String, LookupFile> lookupFileCache;
 
@@ -112,6 +114,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             SchemaManager schemaManager,
             TableSchema schema,
             String commitUser,
+            RowType partitionType,
             RowType keyType,
             RowType valueType,
             Supplier<Comparator<InternalRow>> keyComparatorSupplier,
@@ -136,6 +139,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 deletionVectorsMaintainerFactory,
                 tableName);
         this.fileIO = fileIO;
+        this.partitionType = partitionType;
         this.keyType = keyType;
         this.valueType = valueType;
         this.udsComparatorSupplier = udsComparatorSupplier;
@@ -325,7 +329,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             return new LookupMergeTreeCompactRewriter(
                     maxLevel,
                     mergeEngine,
-                    createLookupLevels(levels, processor, lookupReaderFactory),
+                    createLookupLevels(partition, bucket, levels, processor, 
lookupReaderFactory),
                     readerFactory,
                     writerFactory,
                     keyComparator,
@@ -347,6 +351,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     }
 
     private <T> LookupLevels<T> createLookupLevels(
+            BinaryRow partition,
+            int bucket,
             Levels levels,
             LookupLevels.ValueProcessor<T> valueProcessor,
             FileReaderFactory<KeyValue> readerFactory) {
@@ -366,14 +372,22 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                             
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
                             
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
         }
-
         return new LookupLevels<>(
                 levels,
                 keyComparatorSupplier.get(),
                 keyType,
                 valueProcessor,
                 readerFactory::createRecordReader,
-                () -> ioManager.createChannel().getPathFile(),
+                file ->
+                        ioManager
+                                .createChannel(
+                                        LookupFile.localFilePrefix(
+                                                
InternalRowPartitionComputer.paritionToString(
+                                                        partitionType, 
partition, "-"),
+                                                bucket,
+                                                file,
+                                                100))
+                                .getPathFile(),
                 lookupStoreFactory,
                 bfGenerator(options),
                 lookupFileCache);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index f0a32d101..5f5463a70 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -39,6 +39,7 @@ import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.KeyComparatorSupplier;
 import org.apache.paimon.utils.Preconditions;
 
@@ -75,6 +76,8 @@ public class LocalTableQuery implements TableQuery {
 
     @Nullable private Cache<String, LookupFile> lookupFileCache;
 
+    private final RowType partitionType;
+
     public LocalTableQuery(FileStoreTable table) {
         this.options = table.coreOptions();
         this.tableView = new HashMap<>();
@@ -86,6 +89,7 @@ public class LocalTableQuery implements TableQuery {
         KeyValueFileStore store = (KeyValueFileStore) tableStore;
 
         this.readerFactoryBuilder = store.newReaderFactoryBuilder();
+        this.partitionType = table.schema().logicalPartitionType();
         RowType keyType = readerFactoryBuilder.keyType();
         this.keyComparatorSupplier = new 
KeyComparatorSupplier(readerFactoryBuilder.keyType());
         this.lookupStoreFactory =
@@ -155,9 +159,18 @@ public class LocalTableQuery implements TableQuery {
                                         file.fileName(),
                                         file.fileSize(),
                                         file.level()),
-                        () ->
+                        file ->
                                 Preconditions.checkNotNull(ioManager, 
"IOManager is required.")
-                                        .createChannel()
+                                        .createChannel(
+                                                LookupFile.localFilePrefix(
+                                                        
InternalRowPartitionComputer
+                                                                
.paritionToString(
+                                                                        
partitionType,
+                                                                        
partition,
+                                                                        "-"),
+                                                        bucket,
+                                                        file,
+                                                        100))
                                         .getPathFile(),
                         lookupStoreFactory,
                         bfGenerator(options),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 086548547..47a9dd902 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -192,7 +192,7 @@ public class ContainsLevelsTest {
                         createReaderFactory()
                                 .createRecordReader(
                                         0, file.fileName(), file.fileSize(), 
file.level()),
-                () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
+                file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(
                         new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 
0.75, "none"),
                 rowCount -> BloomFilter.builder(rowCount, 0.01),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java
new file mode 100644
index 000000000..1fcebb338
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LookupFile}. */
+public class LookupFileTest {
+
+    @Test
+    public void testLocalFilePrefix() {
+        assertThat(
+                        LookupFile.localFilePrefix(
+                                "2024073105",
+                                10,
+                                
"data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc",
+                                100))
+                
.isEqualTo("2024073105-10-data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc");
+        assertThat(
+                        LookupFile.localFilePrefix(
+                                "", 10, 
"data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc", 100))
+                
.isEqualTo("10-data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc");
+        assertThat(
+                        LookupFile.localFilePrefix(
+                                "2024073105",
+                                10,
+                                
"data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc",
+                                20))
+                .isEqualTo("2024073105-10-data-c");
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 9945b54b9..f9b4bf727 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -271,7 +271,7 @@ public class LookupLevelsTest {
                         createReaderFactory()
                                 .createRecordReader(
                                         0, file.fileName(), file.fileSize(), 
file.level()),
-                () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
+                file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(
                         new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 
0.75, "none"),
                 rowCount -> BloomFilter.builder(rowCount, 0.05),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index edd6688da..66566f873 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -70,6 +70,8 @@ public class TestChangelogDataReadWrite {
             new RowType(singletonList(new DataField(0, "k", new 
BigIntType())));
     private static final RowType VALUE_TYPE =
             new RowType(singletonList(new DataField(0, "v", new 
BigIntType())));
+    private static final RowType PARTITION_TYPE =
+            new RowType(singletonList(new DataField(0, "p", new IntType())));
     private static final Comparator<InternalRow> COMPARATOR =
             Comparator.comparingLong(o -> o.getLong(0));
     private static final RecordEqualiser EQUALISER =
@@ -171,6 +173,7 @@ public class TestChangelogDataReadWrite {
                                 schemaManager,
                                 schemaManager.schema(0),
                                 commitUser,
+                                PARTITION_TYPE,
                                 KEY_TYPE,
                                 VALUE_TYPE,
                                 () -> COMPARATOR,

Reply via email to