This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 997fdc795 [core] Move codes of cross partition to paimon-core (#2186)
997fdc795 is described below

commit 997fdc795d3705b174619214f0a0d48a48bd98c0
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Fri Oct 27 18:00:37 2023 +0800

    [core] Move codes of cross partition to paimon-core (#2186)
---
 .../org/apache/paimon/utils/KeyValueIterator.java  | 31 +++++++++++
 .../paimon/utils}/ListDelimitedSerializer.java     |  2 +-
 .../apache/paimon/utils/ProjectToRowFunction.java  |  8 +--
 paimon-core/pom.xml                                | 11 ++++
 .../crosspartition}/GlobalIndexAssigner.java       | 19 +++----
 .../paimon/crosspartition}/IndexBootstrap.java     |  2 +-
 .../KeyPartPartitionKeyExtractor.java              |  6 +-
 .../apache/paimon}/lookup/RocksDBListState.java    |  3 +-
 .../org/apache/paimon/lookup}/RocksDBOptions.java  |  2 +-
 .../org/apache/paimon}/lookup/RocksDBSetState.java |  2 +-
 .../org/apache/paimon}/lookup/RocksDBState.java    |  6 +-
 .../apache/paimon}/lookup/RocksDBStateFactory.java | 65 +++++++++++-----------
 .../apache/paimon}/lookup/RocksDBValueState.java   |  9 ++-
 .../crosspartition}/GlobalIndexAssignerTest.java   | 49 ++++++++--------
 .../paimon/crosspartition}/IndexBootstrapTest.java |  6 +-
 .../configuration/ConfigOptionsDocGenerator.java   |  1 +
 paimon-flink/paimon-flink-common/pom.xml           |  7 ---
 .../flink/lookup/FileStoreLookupFunction.java      |  5 +-
 .../apache/paimon/flink/lookup/LookupTable.java    |  1 +
 .../flink/lookup/NoPrimaryKeyLookupTable.java      |  2 +
 .../paimon/flink/lookup/PrimaryKeyLookupTable.java |  2 +
 .../flink/lookup/SecondaryIndexLookupTable.java    |  2 +
 .../flink/sink/index/GlobalDynamicBucketSink.java  |  3 +-
 .../sink/index/GlobalIndexAssignerOperator.java    |  7 +--
 .../flink/sink/index/IndexBootstrapOperator.java   |  1 +
 .../flink/lookup/FileStoreLookupFunctionTest.java  |  2 +-
 .../paimon/flink/lookup/LookupTableTest.java       |  1 +
 .../paimon/flink/lookup/RocksDBListStateTest.java  |  4 +-
 28 files changed, 149 insertions(+), 110 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/KeyValueIterator.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/KeyValueIterator.java
new file mode 100644
index 000000000..5bb7d65fe
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/KeyValueIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.IOException;
+
+/** An iterator for Key and Value. */
+public interface KeyValueIterator<K, V> {
+
+    boolean advanceNext() throws IOException;
+
+    K getKey();
+
+    V getValue();
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/ListDelimitedSerializer.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java
rename to 
paimon-common/src/main/java/org/apache/paimon/utils/ListDelimitedSerializer.java
index 56595cf14..25d089b83 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/ListDelimitedSerializer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.lookup;
+package org.apache.paimon.utils;
 
 import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.io.DataInputDeserializer;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
 b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectToRowFunction.java
similarity index 91%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
rename to 
paimon-common/src/main/java/org/apache/paimon/utils/ProjectToRowFunction.java
index f18116679..30e47b985 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectToRowFunction.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.utils;
+package org.apache.paimon.utils;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.SerBiFunction;
 
 import java.util.List;
 import java.util.Map;
@@ -33,15 +32,14 @@ import java.util.stream.IntStream;
 import static org.apache.paimon.data.InternalRow.createFieldGetter;
 
 /** Project {@link BinaryRow} fields into {@link InternalRow}. */
-public class ProjectToRowDataFunction
-        implements SerBiFunction<InternalRow, BinaryRow, InternalRow> {
+public class ProjectToRowFunction implements SerBiFunction<InternalRow, 
BinaryRow, InternalRow> {
 
     private final InternalRow.FieldGetter[] fieldGetters;
 
     private final Map<Integer, Integer> projectMapping;
     private final InternalRow.FieldGetter[] projectGetters;
 
-    public ProjectToRowDataFunction(RowType rowType, List<String> 
projectFields) {
+    public ProjectToRowFunction(RowType rowType, List<String> projectFields) {
         DataType[] types = rowType.getFieldTypes().toArray(new DataType[0]);
         this.fieldGetters =
                 IntStream.range(0, types.length)
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 3190bf5a4..935ba024d 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -31,6 +31,10 @@ under the License.
     <artifactId>paimon-core</artifactId>
     <name>Paimon : Core</name>
 
+    <properties>
+        <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.paimon</groupId>
@@ -58,6 +62,13 @@ under the License.
             <version>${lz4.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>frocksdbjni</artifactId>
+            <version>${frocksdbjni.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- test dependencies -->
 
         <dependency>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
rename to 
paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 9f168d79e..2dfbc7075 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.index;
+package org.apache.paimon.crosspartition;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.MergeEngine;
@@ -28,10 +28,9 @@ import 
org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.RowBuffer;
-import org.apache.paimon.flink.RocksDBOptions;
-import org.apache.paimon.flink.lookup.RocksDBStateFactory;
-import org.apache.paimon.flink.lookup.RocksDBValueState;
-import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
+import org.apache.paimon.lookup.RocksDBOptions;
+import org.apache.paimon.lookup.RocksDBStateFactory;
+import org.apache.paimon.lookup.RocksDBValueState;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.sort.BinaryExternalSortBuffer;
@@ -46,14 +45,14 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.IDMapping;
+import org.apache.paimon.utils.KeyValueIterator;
 import org.apache.paimon.utils.MutableObjectIterator;
 import org.apache.paimon.utils.OffsetRow;
 import org.apache.paimon.utils.PositiveIntInt;
 import org.apache.paimon.utils.PositiveIntIntSerializer;
+import org.apache.paimon.utils.ProjectToRowFunction;
 import org.apache.paimon.utils.TypeUtils;
 
-import org.apache.flink.table.runtime.util.KeyValueIterator;
-
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -82,7 +81,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
     private transient IOManager ioManager;
 
     private transient int bucketIndex;
-    private transient ProjectToRowDataFunction setPartition;
+    private transient ProjectToRowFunction setPartition;
     private transient boolean bootstrap;
     private transient BinaryExternalSortBuffer bootstrapKeys;
     private transient RowBuffer bootstrapRecords;
@@ -120,7 +119,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
 
         RowType bootstrapType = IndexBootstrap.bootstrapType(table.schema());
         this.bucketIndex = bootstrapType.getFieldCount() - 1;
-        this.setPartition = new ProjectToRowDataFunction(table.rowType(), 
table.partitionKeys());
+        this.setPartition = new ProjectToRowFunction(table.rowType(), 
table.partitionKeys());
 
         CoreOptions coreOptions = table.coreOptions();
         this.targetBucketRowNumber = (int) 
coreOptions.dynamicBucketTargetRowNum();
@@ -219,7 +218,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
                         }
                     };
 
-            stateFactory.bulkLoad(keyIndex.columnFamily(), kvIter);
+            stateFactory.bulkLoad(keyIndex, kvIter);
             isEmpty = false;
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
rename to 
paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index 392b29622..e99f47f36 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.index;
+package org.apache.paimon.crosspartition;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
similarity index 92%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
rename to 
paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
index 188f3fa61..ea86358ac 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.index;
+package org.apache.paimon.crosspartition;
 
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
@@ -26,13 +26,11 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.PartitionKeyExtractor;
 import org.apache.paimon.types.RowType;
 
-import org.apache.flink.table.data.RowData;
-
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-/** A {@link PartitionKeyExtractor} to {@link RowData} with only key and 
partiton fields. */
+/** A {@link PartitionKeyExtractor} to {@link InternalRow} with only key and 
partiton fields. */
 public class KeyPartPartitionKeyExtractor implements 
PartitionKeyExtractor<InternalRow> {
 
     private final Projection partitionProjection;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
 b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
rename to 
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java
index 72ea7e9a0..8a47486af 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java
@@ -16,9 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.lookup;
+package org.apache.paimon.lookup;
 
 import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.utils.ListDelimitedSerializer;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
 b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
rename to paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java
index d54a5ef8a..74b92bbc3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
+package org.apache.paimon.lookup;
 
 import org.apache.paimon.annotation.Documentation;
 import org.apache.paimon.options.ConfigOption;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
 b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
rename to 
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java
index a27ff44a4..5d247e426 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.lookup;
+package org.apache.paimon.lookup;
 
 import org.apache.paimon.data.serializer.Serializer;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
 b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
similarity index 97%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
rename to paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
index 3632069d7..729b38173 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.lookup;
+package org.apache.paimon.lookup;
 
 import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.io.DataInputDeserializer;
@@ -77,10 +77,6 @@ public abstract class RocksDBState<K, V, CacheV> {
                         .build();
     }
 
-    public ColumnFamilyHandle columnFamily() {
-        return columnFamily;
-    }
-
     public byte[] serializeKey(K key) throws IOException {
         keyOutView.clear();
         keySerializer.serialize(key, keyOutView);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
 b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
similarity index 76%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
rename to 
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
index 757d08776..1bc1edbcd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
@@ -16,12 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.lookup;
+package org.apache.paimon.lookup;
 
 import org.apache.paimon.data.serializer.Serializer;
-import org.apache.paimon.flink.RocksDBOptions;
+import org.apache.paimon.utils.KeyValueIterator;
 
-import org.apache.flink.table.runtime.util.KeyValueIterator;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
@@ -82,39 +81,43 @@ public class RocksDBStateFactory implements Closeable {
         }
     }
 
-    public void bulkLoad(ColumnFamilyHandle columnFamily, 
KeyValueIterator<byte[], byte[]> iterator)
-            throws RocksDBException, IOException {
-        long targetFileSize = options.targetFileSizeBase();
-
-        List<String> files = new ArrayList<>();
-        SstFileWriter writer = null;
-        long recordNum = 0;
-        while (iterator.advanceNext()) {
-            byte[] key = iterator.getKey();
-            byte[] value = iterator.getValue();
-
-            if (writer == null) {
-                writer = new SstFileWriter(new EnvOptions(), options);
-                String path = new File(this.path, "sst-" + 
(sstIndex++)).getPath();
-                writer.open(path);
-                files.add(path);
+    public void bulkLoad(RocksDBState<?, ?, ?> state, KeyValueIterator<byte[], 
byte[]> iterator)
+            throws IOException {
+        try {
+            long targetFileSize = options.targetFileSizeBase();
+
+            List<String> files = new ArrayList<>();
+            SstFileWriter writer = null;
+            long recordNum = 0;
+            while (iterator.advanceNext()) {
+                byte[] key = iterator.getKey();
+                byte[] value = iterator.getValue();
+
+                if (writer == null) {
+                    writer = new SstFileWriter(new EnvOptions(), options);
+                    String path = new File(this.path, "sst-" + 
(sstIndex++)).getPath();
+                    writer.open(path);
+                    files.add(path);
+                }
+
+                writer.put(key, value);
+                recordNum++;
+                if (recordNum % 1000 == 0 && writer.fileSize() >= 
targetFileSize) {
+                    writer.finish();
+                    writer = null;
+                    recordNum = 0;
+                }
             }
 
-            writer.put(key, value);
-            recordNum++;
-            if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) {
+            if (writer != null) {
                 writer.finish();
-                writer = null;
-                recordNum = 0;
             }
-        }
 
-        if (writer != null) {
-            writer.finish();
-        }
-
-        if (files.size() > 0) {
-            db.ingestExternalFile(columnFamily, files, new 
IngestExternalFileOptions());
+            if (files.size() > 0) {
+                db.ingestExternalFile(state.columnFamily, files, new 
IngestExternalFileOptions());
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
 b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java
similarity index 93%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
rename to 
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java
index a12855d88..8fadb509e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java
@@ -16,13 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.lookup;
+package org.apache.paimon.lookup;
 
 import org.apache.paimon.data.serializer.Serializer;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
 
 import javax.annotation.Nullable;
 
@@ -52,7 +51,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, 
V, RocksDBState.Ref
         }
     }
 
-    private Reference get(ByteArray keyBytes) throws RocksDBException {
+    private Reference get(ByteArray keyBytes) throws Exception {
         Reference valueRef = cache.getIfPresent(keyBytes);
         if (valueRef == null) {
             valueRef = ref(db.get(columnFamily, keyBytes.bytes));
@@ -70,7 +69,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, 
V, RocksDBState.Ref
             byte[] valueBytes = serializeValue(value);
             db.put(columnFamily, writeOptions, keyBytes, valueBytes);
             cache.put(wrap(keyBytes), ref(valueBytes));
-        } catch (RocksDBException e) {
+        } catch (Exception e) {
             throw new IOException(e);
         }
     }
@@ -83,7 +82,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, 
V, RocksDBState.Ref
                 db.delete(columnFamily, writeOptions, keyBytes);
                 cache.put(keyByteArray, ref(null));
             }
-        } catch (RocksDBException e) {
+        } catch (Exception e) {
             throw new IOException(e);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
similarity index 84%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
index b2e8d0871..7a4ae602b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.index;
+package org.apache.paimon.crosspartition;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.MergeEngine;
@@ -29,8 +29,9 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.Pair;
 
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -40,7 +41,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link GlobalIndexAssigner}. */
@@ -73,7 +73,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
                         .options(options.toMap())
                         .build();
         catalog.createTable(identifier, schema, true);
-        return 
GlobalIndexAssignerOperator.createRowDataAssigner(catalog.getTable(identifier));
+        return new GlobalIndexAssigner(catalog.getTable(identifier));
     }
 
     @Test
@@ -126,38 +126,38 @@ public class GlobalIndexAssignerTest extends 
TableTestBase {
     @Test
     public void testUpsert() throws Exception {
         GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE);
-        List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
-        assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(new 
Tuple2<>(row, bucket)));
+        List<Pair<InternalRow, Integer>> output = new ArrayList<>();
+        assigner.open(ioManager(), 2, 0, (row, bucket) -> 
output.add(Pair.of(row, bucket)));
         assigner.endBoostrap(false);
 
         // change partition
         assigner.processInput(GenericRow.of(1, 1, 1));
         assigner.processInput(GenericRow.of(2, 1, 2));
-        assertThat(output)
+        Assertions.assertThat(output)
                 .containsExactly(
-                        new Tuple2<>(GenericRow.of(1, 1, 1), 0),
-                        new Tuple2<>(GenericRow.ofKind(RowKind.DELETE, 1, 1, 
2), 0),
-                        new Tuple2<>(GenericRow.of(2, 1, 2), 0));
+                        Pair.of(GenericRow.of(1, 1, 1), 0),
+                        Pair.of(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2), 0),
+                        Pair.of(GenericRow.of(2, 1, 2), 0));
         output.clear();
 
         // test partition 1 deleted
         assigner.processInput(GenericRow.of(1, 2, 2));
         assigner.processInput(GenericRow.of(1, 3, 3));
         assigner.processInput(GenericRow.of(1, 4, 4));
-        assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
+        assertThat(output.stream().map(Pair::getRight)).containsExactly(0, 0, 
0);
         output.clear();
 
         // move from full bucket
         assigner.processInput(GenericRow.of(2, 4, 4));
-        assertThat(output)
+        Assertions.assertThat(output)
                 .containsExactly(
-                        new Tuple2<>(GenericRow.ofKind(RowKind.DELETE, 1, 4, 
4), 0),
-                        new Tuple2<>(GenericRow.of(2, 4, 4), 0));
+                        Pair.of(GenericRow.ofKind(RowKind.DELETE, 1, 4, 4), 0),
+                        Pair.of(GenericRow.of(2, 4, 4), 0));
         output.clear();
 
         // test partition 1 deleted
         assigner.processInput(GenericRow.of(1, 5, 5));
-        assertThat(output.stream().map(t -> t.f1)).containsExactly(0);
+        assertThat(output.stream().map(Pair::getRight)).containsExactly(0);
         output.clear();
 
         assigner.close();
@@ -170,24 +170,23 @@ public class GlobalIndexAssignerTest extends 
TableTestBase {
                         ? MergeEngine.PARTIAL_UPDATE
                         : MergeEngine.AGGREGATE;
         GlobalIndexAssigner assigner = createAssigner(mergeEngine);
-        List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
-        assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(new 
Tuple2<>(row, bucket)));
+        List<Pair<InternalRow, Integer>> output = new ArrayList<>();
+        assigner.open(ioManager(), 2, 0, (row, bucket) -> 
output.add(Pair.of(row, bucket)));
         assigner.endBoostrap(false);
 
         // change partition
         assigner.processInput(GenericRow.of(1, 1, 1));
         assigner.processInput(GenericRow.of(2, 1, 2));
-        assertThat(output)
+        Assertions.assertThat(output)
                 .containsExactly(
-                        new Tuple2<>(GenericRow.of(1, 1, 1), 0),
-                        new Tuple2<>(GenericRow.of(1, 1, 2), 0));
+                        Pair.of(GenericRow.of(1, 1, 1), 0), 
Pair.of(GenericRow.of(1, 1, 2), 0));
         output.clear();
 
         // test partition 2 no effect
         assigner.processInput(GenericRow.of(2, 2, 2));
         assigner.processInput(GenericRow.of(2, 3, 3));
         assigner.processInput(GenericRow.of(2, 4, 4));
-        assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
+        assertThat(output.stream().map(Pair::getRight)).containsExactly(0, 0, 
0);
         output.clear();
         assigner.close();
     }
@@ -195,21 +194,21 @@ public class GlobalIndexAssignerTest extends 
TableTestBase {
     @Test
     public void testFirstRow() throws Exception {
         GlobalIndexAssigner assigner = createAssigner(MergeEngine.FIRST_ROW);
-        List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
-        assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(new 
Tuple2<>(row, bucket)));
+        List<Pair<InternalRow, Integer>> output = new ArrayList<>();
+        assigner.open(ioManager(), 2, 0, (row, bucket) -> 
output.add(Pair.of(row, bucket)));
         assigner.endBoostrap(false);
 
         // change partition
         assigner.processInput(GenericRow.of(1, 1, 1));
         assigner.processInput(GenericRow.of(2, 1, 2));
-        assertThat(output).containsExactly(new Tuple2<>(GenericRow.of(1, 1, 
1), 0));
+        Assertions.assertThat(output).containsExactly(Pair.of(GenericRow.of(1, 
1, 1), 0));
         output.clear();
 
         // test partition 2 no effect
         assigner.processInput(GenericRow.of(2, 2, 2));
         assigner.processInput(GenericRow.of(2, 3, 3));
         assigner.processInput(GenericRow.of(2, 4, 4));
-        assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
+        assertThat(output.stream().map(Pair::getRight)).containsExactly(0, 0, 
0);
         output.clear();
         assigner.close();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index ca73dfebf..976466464 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.index;
+package org.apache.paimon.crosspartition;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
@@ -42,11 +42,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
 
+import static org.apache.paimon.crosspartition.IndexBootstrap.filterSplit;
 import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
-import static org.apache.paimon.flink.sink.index.IndexBootstrap.filterSplit;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link IndexBootstrap}. */
+/** Test for {@link org.apache.paimon.crosspartition.IndexBootstrap}. */
 public class IndexBootstrapTest extends TableTestBase {
 
     @Test
diff --git 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index a8b9cc0bf..e069c967f 100644
--- 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -74,6 +74,7 @@ public class ConfigOptionsDocGenerator {
             new OptionsClassLocation[] {
                 new OptionsClassLocation("paimon-common", 
"org.apache.paimon.options"),
                 new OptionsClassLocation("paimon-common", "org.apache.paimon"),
+                new OptionsClassLocation("paimon-core", 
"org.apache.paimon.lookup"),
                 new OptionsClassLocation(
                         "paimon-flink/paimon-flink-common", 
"org.apache.paimon.flink"),
                 new OptionsClassLocation(
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index aa8bda419..b1dcb26ba 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -35,7 +35,6 @@ under the License.
 
     <properties>
         <flink.version>1.18.0</flink.version>
-        <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
     </properties>
 
     <dependencies>
@@ -67,12 +66,6 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>com.ververica</groupId>
-            <artifactId>frocksdbjni</artifactId>
-            <version>${frocksdbjni.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 36f393210..c26e373b2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.utils.TableScanUtils;
+import org.apache.paimon.lookup.RocksDBStateFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateFilter;
@@ -59,8 +60,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
-import static org.apache.paimon.flink.RocksDBOptions.LOOKUP_CACHE_ROWS;
-import static 
org.apache.paimon.flink.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
+import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
+import static 
org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 
 /** A lookup {@link TableFunction} for file store. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
index 29f319ee1..7aef1838a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.lookup.RocksDBStateFactory;
 import org.apache.paimon.types.RowType;
 
 import java.io.IOException;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index caa809062..0af3e9e90 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -22,6 +22,8 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.lookup.RocksDBListState;
+import org.apache.paimon.lookup.RocksDBStateFactory;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyProjectedRow;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 0928bf047..b4c8bbe7f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.lookup.RocksDBStateFactory;
+import org.apache.paimon.lookup.RocksDBValueState;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyProjectedRow;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index 61452d681..4364036e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.lookup.RocksDBSetState;
+import org.apache.paimon.lookup.RocksDBStateFactory;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyProjectedRow;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index eb2f8d30f..589f86a86 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink.index;
 
+import org.apache.paimon.crosspartition.IndexBootstrap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator;
@@ -43,8 +44,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.paimon.crosspartition.IndexBootstrap.bootstrapType;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
-import static org.apache.paimon.flink.sink.index.IndexBootstrap.bootstrapType;
 
 /** Sink for global dynamic bucket table. */
 public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<InternalRow, Integer>> {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index 8133177a9..4cb56435c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink.index;
 
+import org.apache.paimon.crosspartition.GlobalIndexAssigner;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.table.Table;
@@ -103,10 +104,6 @@ public class GlobalIndexAssignerOperator
     }
 
     public static GlobalIndexAssignerOperator forRowData(Table table) {
-        return new GlobalIndexAssignerOperator(createRowDataAssigner(table));
-    }
-
-    public static GlobalIndexAssigner createRowDataAssigner(Table t) {
-        return new GlobalIndexAssigner(t);
+        return new GlobalIndexAssignerOperator(new GlobalIndexAssigner(table));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
index d5f0683a7..a75274be9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink.index;
 
+import org.apache.paimon.crosspartition.IndexBootstrap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.utils.SerializableFunction;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index 8ff6c0089..5eacc7d7b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -22,7 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowData;
-import org.apache.paimon.flink.RocksDBOptions;
+import org.apache.paimon.lookup.RocksDBOptions;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index dc1932b78..181eefe40 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.lookup.RocksDBStateFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowKind;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
index 09c2ea106..e0a2b516d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
@@ -26,6 +26,8 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.lookup.RocksDBListState;
+import org.apache.paimon.lookup.RocksDBStateFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
@@ -40,7 +42,7 @@ import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link org.apache.paimon.flink.lookup.RocksDBListState}. */
+/** Test for {@link RocksDBListState}. */
 public class RocksDBListStateTest {
 
     @TempDir Path tempDir;


Reply via email to