This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1502e83bf [core] Redefine user defined sequence fields (#2945)
1502e83bf is described below

commit 1502e83bfcbb529557dc9fa4adc336ec30aa961d
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 6 16:40:07 2024 +0800

    [core] Redefine user defined sequence fields (#2945)
---
 .../concepts/primary-key-table/sequence-rowkind.md |  24 +-
 .../shortcodes/generated/core_configuration.html   |   6 -
 .../main/java/org/apache/paimon/CoreOptions.java   |  32 +--
 .../main/java/org/apache/paimon/types/RowType.java |  23 ++
 .../java/org/apache/paimon/KeyValueFileStore.java  |  11 +
 .../apache/paimon/mergetree/MergeTreeReaders.java  |   3 +-
 .../apache/paimon/mergetree/MergeTreeWriter.java   |  12 +-
 .../paimon/mergetree/SortBufferWriteBuffer.java    |  33 ++-
 .../compact/ChangelogMergeTreeRewriter.java        |  14 +-
 .../FullChangelogMergeTreeCompactRewriter.java     |   5 +
 .../compact/LookupMergeTreeCompactRewriter.java    |   5 +
 .../compact/MergeTreeCompactRewriter.java          |   7 +
 .../compact/PartialUpdateMergeFunction.java        | 156 +++++++++++-
 .../paimon/operation/KeyValueFileStoreRead.java    |  43 +---
 .../paimon/operation/KeyValueFileStoreWrite.java   |  32 ++-
 .../org/apache/paimon/schema/SchemaValidation.java |  22 +-
 .../apache/paimon/table/query/LocalTableQuery.java |   2 +-
 .../paimon/table/sink/SequenceGenerator.java       | 236 ------------------
 .../paimon/utils/UserDefinedSeqComparator.java     |  81 ++++++
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   2 +
 .../mergetree/SortBufferWriteBufferTestBase.java   |   1 +
 .../apache/paimon/schema/SchemaManagerTest.java    |   2 +-
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  54 ----
 .../paimon/table/sink/SequenceGeneratorTest.java   | 272 ---------------------
 .../apache/paimon/table/system/FilesTableTest.java |  10 +-
 .../paimon/flink/lookup/FullCacheLookupTable.java  |  53 ++--
 .../paimon/flink/lookup/LookupStreamingReader.java |  12 +-
 .../flink/lookup/NoPrimaryKeyLookupTable.java      |   7 +-
 .../paimon/flink/lookup/PrimaryKeyLookupTable.java |   8 +-
 .../flink/lookup/SecondaryIndexLookupTable.java    |   8 +-
 .../paimon/flink/sink/LocalMergeOperator.java      |  16 +-
 .../paimon/flink/lookup/LookupTableTest.java       |  70 ++----
 .../flink/source/TestChangelogDataReadWrite.java   |  24 +-
 33 files changed, 483 insertions(+), 803 deletions(-)

diff --git a/docs/content/concepts/primary-key-table/sequence-rowkind.md 
b/docs/content/concepts/primary-key-table/sequence-rowkind.md
index 80f20dce9..65df770bb 100644
--- a/docs/content/concepts/primary-key-table/sequence-rowkind.md
+++ b/docs/content/concepts/primary-key-table/sequence-rowkind.md
@@ -41,32 +41,18 @@ CREATE TABLE my_table (
     pk BIGINT PRIMARY KEY NOT ENFORCED,
     v1 DOUBLE,
     v2 BIGINT,
-    dt TIMESTAMP
+    update_time TIMESTAMP
 ) WITH (
-    'sequence.field' = 'dt'
+    'sequence.field' = 'update_time'
 );
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
-The record with the largest `sequence.field` value will be the last to merge, 
regardless of the input order.
+The record with the largest `sequence.field` value will be the last to merge, 
if the values are the same, the input
+order will be used to determine which one is the last one.
 
-**Sequence Auto Padding**:
-
-When the record is updated or deleted, the `sequence.field` must become larger 
and cannot remain unchanged.
-For -U and +U, their sequence-fields must be different. If you cannot meet 
this requirement, Paimon provides
-option to automatically pad the sequence field for you.
-
-1. `'sequence.auto-padding' = 'row-kind-flag'`: If you are using same value 
for -U and +U, just like "`op_ts`"
-   (the time that the change was made in the database) in Mysql Binlog. It is 
recommended to use the automatic
-   padding for row kind flag, which will automatically distinguish between -U 
(-D) and +U (+I).
-
-2. Insufficient precision: If the provided `sequence.field` doesn't meet the 
precision, like a rough second or
-   millisecond, you can set `sequence.auto-padding` to `second-to-micro` or 
`millis-to-micro` so that the precision
-   of sequence number will be made up to microsecond by incremental id 
(Calculate within a single bucket).
-
-3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add 
the micro to the second, and then
-   pad the row kind flag.
+You can define multiple fields for `sequence.field`, for example 
`'update_time,flag'`, multiple fields will be compared in order.
 
 ## Row Kind Field
 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 2e25fe96b..a123d3a77 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -527,12 +527,6 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Long</td>
             <td>Optional timestamp used in case of "from-timestamp" scan mode. 
If there is no snapshot earlier than this time, the earliest snapshot will be 
chosen.</td>
         </tr>
-        <tr>
-            <td><h5>sequence.auto-padding</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>String</td>
-            <td>Specify the way of padding precision, if the provided sequence 
field is used to indicate "time" but doesn't meet the precise.<ul><li>You can 
specific:</li><li>1. "row-kind-flag": Pads a bit flag to indicate whether it is 
retract (0) or add (1) message.</li><li>2. "second-to-micro": Pads the sequence 
field that indicates time with precision of seconds to micro-second.</li><li>3. 
"millis-to-micro": Pads the sequence field that indicates time with precision 
of milli-second t [...]
-        </tr>
         <tr>
             <td><h5>sequence.field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 6430c919e..201525ab0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -486,26 +486,6 @@ public class CoreOptions implements Serializable {
                             "The field that generates the row kind for primary 
key table,"
                                     + " the row kind determines which data is 
'+I', '-U', '+U' or '-D'.");
 
-    public static final ConfigOption<String> SEQUENCE_AUTO_PADDING =
-            key("sequence.auto-padding")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Specify the way of padding 
precision, if the provided sequence field is used to indicate \"time\" but 
doesn't meet the precise.")
-                                    .list(
-                                            text("You can specific:"),
-                                            text(
-                                                    "1. \"row-kind-flag\": 
Pads a bit flag to indicate whether it is retract (0) or add (1) message."),
-                                            text(
-                                                    "2. \"second-to-micro\": 
Pads the sequence field that indicates time with precision of seconds to 
micro-second."),
-                                            text(
-                                                    "3. \"millis-to-micro\": 
Pads the sequence field that indicates time with precision of milli-second to 
micro-second."),
-                                            text(
-                                                    "4. Composite pattern: for 
example, \"second-to-micro,row-kind-flag\"."))
-                                    .build());
-
     public static final ConfigOption<StartupMode> SCAN_MODE =
             key("scan.mode")
                     .enumType(StartupMode.class)
@@ -1486,22 +1466,14 @@ public class CoreOptions implements Serializable {
         return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
     }
 
-    public Optional<String> sequenceField() {
-        return options.getOptional(SEQUENCE_FIELD);
+    public Optional<List<String>> sequenceField() {
+        return options.getOptional(SEQUENCE_FIELD).map(s -> 
Arrays.asList(s.split(",")));
     }
 
     public Optional<String> rowkindField() {
         return options.getOptional(ROWKIND_FIELD);
     }
 
-    public List<String> sequenceAutoPadding() {
-        String padding = options.get(SEQUENCE_AUTO_PADDING);
-        if (padding == null) {
-            return Collections.emptyList();
-        }
-        return Arrays.asList(padding.split(","));
-    }
-
     public boolean writeOnly() {
         return options.get(WRITE_ONLY);
     }
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java 
b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
index 1462d330e..fe11976de 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
@@ -99,6 +99,29 @@ public final class RowType extends DataType {
         return -1;
     }
 
+    public boolean containsField(String fieldName) {
+        for (DataField field : fields) {
+            if (field.name().equals(fieldName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean notContainsField(String fieldName) {
+        return !containsField(fieldName);
+    }
+
+    public DataField getField(String fieldName) {
+        for (DataField field : fields) {
+            if (field.name().equals(fieldName)) {
+                return field;
+            }
+        }
+
+        throw new RuntimeException("Cannot find field: " + fieldName);
+    }
+
     @Override
     public DataType copy(boolean isNullable) {
         return new RowType(
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 373bce35c..727bf8f82 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -37,10 +37,14 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.KeyComparatorSupplier;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
 import org.apache.paimon.utils.ValueEqualiserSupplier;
 
+import javax.annotation.Nullable;
+
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -123,6 +127,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 keyType,
                 valueType,
                 newKeyComparator(),
+                userDefinedSeqComparator(),
                 mfFactory,
                 newReaderFactoryBuilder());
     }
@@ -140,6 +145,11 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 options);
     }
 
+    @Nullable
+    private FieldsComparator userDefinedSeqComparator() {
+        return UserDefinedSeqComparator.create(valueType, options);
+    }
+
     @Override
     public KeyValueFileStoreWrite newWrite(String commitUser) {
         return newWrite(commitUser, null);
@@ -159,6 +169,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 keyType,
                 valueType,
                 keyComparatorSupplier,
+                this::userDefinedSeqComparator,
                 valueEqualiserSupplier,
                 mfFactory,
                 pathFactory(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index 186371b81..1485b72b2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -47,6 +47,7 @@ public class MergeTreeReaders {
             boolean dropDelete,
             KeyValueFileReaderFactory readerFactory,
             Comparator<InternalRow> userKeyComparator,
+            @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunction<KeyValue> mergeFunction,
             MergeSorter mergeSorter)
             throws IOException {
@@ -58,7 +59,7 @@ public class MergeTreeReaders {
                                     section,
                                     readerFactory,
                                     userKeyComparator,
-                                    null,
+                                    userDefinedSeqComparator,
                                     new 
ReducerMergeFunctionWrapper(mergeFunction),
                                     mergeSorter));
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 046fad893..537d838d0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -33,9 +33,9 @@ import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.mergetree.compact.MergeFunction;
-import org.apache.paimon.table.sink.SequenceGenerator;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.RecordWriter;
 
 import javax.annotation.Nullable;
@@ -66,7 +66,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
     private final KeyValueFileWriterFactory writerFactory;
     private final boolean commitForceCompact;
     private final ChangelogProducer changelogProducer;
-    @Nullable private final SequenceGenerator sequenceGenerator;
+    @Nullable private final FieldsComparator userDefinedSeqComparator;
 
     private final LinkedHashSet<DataFileMeta> newFiles;
     private final LinkedHashSet<DataFileMeta> newFilesChangelog;
@@ -90,7 +90,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
             boolean commitForceCompact,
             ChangelogProducer changelogProducer,
             @Nullable CommitIncrement increment,
-            @Nullable SequenceGenerator sequenceGenerator) {
+            @Nullable FieldsComparator userDefinedSeqComparator) {
         this.writeBufferSpillable = writeBufferSpillable;
         this.sortMaxFan = sortMaxFan;
         this.sortCompression = sortCompression;
@@ -104,7 +104,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         this.writerFactory = writerFactory;
         this.commitForceCompact = commitForceCompact;
         this.changelogProducer = changelogProducer;
-        this.sequenceGenerator = sequenceGenerator;
+        this.userDefinedSeqComparator = userDefinedSeqComparator;
 
         this.newFiles = new LinkedHashSet<>();
         this.newFilesChangelog = new LinkedHashSet<>();
@@ -138,6 +138,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
                 new SortBufferWriteBuffer(
                         keyType,
                         valueType,
+                        userDefinedSeqComparator,
                         memoryPool,
                         writeBufferSpillable,
                         sortMaxFan,
@@ -148,9 +149,6 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
     @Override
     public void write(KeyValue kv) throws Exception {
         long sequenceNumber = newSequenceNumber();
-        if (sequenceGenerator != null) {
-            sequenceNumber = sequenceGenerator.generateWithPadding(kv.value(), 
sequenceNumber);
-        }
         boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), 
kv.key(), kv.value());
         if (!success) {
             flushWriteBuffer(false, false);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
index ae877f5cb..f0e9fcf07 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
@@ -40,6 +40,8 @@ import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.MutableObjectIterator;
 
 import javax.annotation.Nullable;
@@ -61,6 +63,7 @@ public class SortBufferWriteBuffer implements WriteBuffer {
     public SortBufferWriteBuffer(
             RowType keyType,
             RowType valueType,
+            @Nullable FieldsComparator userDefinedSeqComparator,
             MemorySegmentPool memoryPool,
             boolean spillable,
             int sortMaxFan,
@@ -70,17 +73,33 @@ public class SortBufferWriteBuffer implements WriteBuffer {
         this.valueType = valueType;
         this.serializer = new KeyValueSerializer(keyType, valueType);
 
-        // user key + sequenceNumber
-        List<DataType> sortKeyTypes = new ArrayList<>(keyType.getFieldTypes());
-        sortKeyTypes.add(new BigIntType(false));
+        // key fields
+        IntStream sortFields = IntStream.range(0, keyType.getFieldCount());
+
+        // user define sequence fields
+        if (userDefinedSeqComparator != null) {
+            IntStream udsFields =
+                    IntStream.of(userDefinedSeqComparator.compareFields())
+                            .map(operand -> operand + keyType.getFieldCount() 
+ 2);
+            sortFields = IntStream.concat(sortFields, udsFields);
+        }
+
+        // sequence field
+        sortFields = IntStream.concat(sortFields, 
IntStream.of(keyType.getFieldCount()));
+
+        int[] sortFieldArray = sortFields.toArray();
+
+        // row type
+        List<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes());
+        fieldTypes.add(new BigIntType(false));
+        fieldTypes.add(new TinyIntType(false));
+        fieldTypes.addAll(valueType.getFieldTypes());
 
-        // for sort binary buffer
-        int[] sortFields = IntStream.range(0, sortKeyTypes.size()).toArray();
         NormalizedKeyComputer normalizedKeyComputer =
                 CodeGenUtils.newNormalizedKeyComputer(
-                        sortKeyTypes, sortFields, "MemTableKeyComputer");
+                        fieldTypes, sortFieldArray, "MemTableKeyComputer");
         RecordComparator keyComparator =
-                CodeGenUtils.newRecordComparator(sortKeyTypes, sortFields, 
"MemTableComparator");
+                CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray, 
"MemTableComparator");
 
         if (memoryPool.freePages() < 3) {
             throw new IllegalArgumentException(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 99f5f052d..f338ee056 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -30,6 +30,9 @@ import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.utils.FieldsComparator;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -51,9 +54,16 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
+            @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
             MergeSorter mergeSorter) {
-        super(readerFactory, writerFactory, keyComparator, mfFactory, 
mergeSorter);
+        super(
+                readerFactory,
+                writerFactory,
+                keyComparator,
+                userDefinedSeqComparator,
+                mfFactory,
+                mergeSorter);
         this.maxLevel = maxLevel;
         this.mergeEngine = mergeEngine;
     }
@@ -112,7 +122,7 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
                                     section,
                                     readerFactory,
                                     keyComparator,
-                                    null,
+                                    userDefinedSeqComparator,
                                     createMergeWrapper(outputLevel),
                                     mergeSorter));
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 553b0ff5d..3ed7a5d69 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -27,8 +27,11 @@ import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
@@ -48,6 +51,7 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
+            @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
             MergeSorter mergeSorter,
             RecordEqualiser valueEqualiser,
@@ -58,6 +62,7 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
                 readerFactory,
                 writerFactory,
                 keyComparator,
+                userDefinedSeqComparator,
                 mfFactory,
                 mergeSorter);
         this.valueEqualiser = valueEqualiser;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index b88c26b99..5335ba82a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -28,6 +28,9 @@ import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.utils.FieldsComparator;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -54,6 +57,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
+            @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
             MergeSorter mergeSorter,
             MergeFunctionWrapperFactory<T> wrapperFactory) {
@@ -63,6 +67,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
                 readerFactory,
                 writerFactory,
                 keyComparator,
+                userDefinedSeqComparator,
                 mfFactory,
                 mergeSorter);
         this.lookupLevels = lookupLevels;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index 228a3aa11..bd3338bf7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -30,6 +30,9 @@ import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.utils.FieldsComparator;
+
+import javax.annotation.Nullable;
 
 import java.util.Comparator;
 import java.util.List;
@@ -40,6 +43,7 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
     protected final KeyValueFileReaderFactory readerFactory;
     protected final KeyValueFileWriterFactory writerFactory;
     protected final Comparator<InternalRow> keyComparator;
+    @Nullable protected final FieldsComparator userDefinedSeqComparator;
     protected final MergeFunctionFactory<KeyValue> mfFactory;
     protected final MergeSorter mergeSorter;
 
@@ -47,11 +51,13 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
+            @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
             MergeSorter mergeSorter) {
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.keyComparator = keyComparator;
+        this.userDefinedSeqComparator = userDefinedSeqComparator;
         this.mfFactory = mfFactory;
         this.mergeSorter = mergeSorter;
     }
@@ -72,6 +78,7 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
                         dropDelete,
                         readerFactory,
                         keyComparator,
+                        userDefinedSeqComparator,
                         mfFactory.create(),
                         mergeSorter);
         writer.write(new RecordReaderIterator<>(sectionsReader));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 6c7fa0567..dbf6dfd75 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -24,10 +24,23 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.table.sink.SequenceGenerator;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.CharType;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeDefaultVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.InternalRowUtils;
 import org.apache.paimon.utils.Projection;
 
 import javax.annotation.Nullable;
@@ -144,9 +157,9 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                     row.setField(i, field);
                 }
             } else {
-                Long currentSeq = 
sequenceGen.generateWithoutPadding(kv.value());
+                Long currentSeq = sequenceGen.generate(kv.value());
                 if (currentSeq != null) {
-                    Long previousSeq = sequenceGen.generateWithoutPadding(row);
+                    Long previousSeq = sequenceGen.generate(row);
                     if (previousSeq == null || currentSeq >= previousSeq) {
                         row.setField(
                                 i, aggregator == null ? field : 
aggregator.agg(accumulator, field));
@@ -162,9 +175,9 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         for (int i = 0; i < getters.length; i++) {
             SequenceGenerator sequenceGen = fieldSequences.get(i);
             if (sequenceGen != null) {
-                Long currentSeq = 
sequenceGen.generateWithoutPadding(kv.value());
+                Long currentSeq = sequenceGen.generate(kv.value());
                 if (currentSeq != null) {
-                    Long previousSeq = sequenceGen.generateWithoutPadding(row);
+                    Long previousSeq = sequenceGen.generate(row);
                     FieldAggregator aggregator = fieldAggregators.get(i);
                     if (previousSeq == null || currentSeq >= previousSeq) {
                         if (sequenceGen.index() == i) {
@@ -391,4 +404,137 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             return fieldAggregators;
         }
     }
+
+    private static class SequenceGenerator {
+
+        private final int index;
+
+        private final Generator generator;
+        private final DataType fieldType;
+
+        private SequenceGenerator(String field, RowType rowType) {
+            index = rowType.getFieldNames().indexOf(field);
+            if (index == -1) {
+                throw new RuntimeException(
+                        String.format(
+                                "Can not find sequence field %s in table 
schema: %s",
+                                field, rowType));
+            }
+            fieldType = rowType.getTypeAt(index);
+            generator = fieldType.accept(new SequenceGeneratorVisitor());
+        }
+
+        public SequenceGenerator(int index, DataType dataType) {
+            this.index = index;
+
+            this.fieldType = dataType;
+            if (index == -1) {
+                throw new RuntimeException(String.format("Index : %s is 
invalid", index));
+            }
+            generator = fieldType.accept(new SequenceGeneratorVisitor());
+        }
+
+        public int index() {
+            return index;
+        }
+
+        public DataType fieldType() {
+            return fieldType;
+        }
+
+        @Nullable
+        public Long generate(InternalRow row) {
+            return generator.generateNullable(row, index);
+        }
+
+        private interface Generator {
+            long generate(InternalRow row, int i);
+
+            @Nullable
+            default Long generateNullable(InternalRow row, int i) {
+                if (row.isNullAt(i)) {
+                    return null;
+                }
+                return generate(row, i);
+            }
+        }
+
+        private static class SequenceGeneratorVisitor extends 
DataTypeDefaultVisitor<Generator> {
+
+            @Override
+            public Generator visit(CharType charType) {
+                return stringGenerator();
+            }
+
+            @Override
+            public Generator visit(VarCharType varCharType) {
+                return stringGenerator();
+            }
+
+            private Generator stringGenerator() {
+                return (row, i) -> Long.parseLong(row.getString(i).toString());
+            }
+
+            @Override
+            public Generator visit(DecimalType decimalType) {
+                return (row, i) ->
+                        InternalRowUtils.castToIntegral(
+                                row.getDecimal(
+                                        i, decimalType.getPrecision(), 
decimalType.getScale()));
+            }
+
+            @Override
+            public Generator visit(TinyIntType tinyIntType) {
+                return InternalRow::getByte;
+            }
+
+            @Override
+            public Generator visit(SmallIntType smallIntType) {
+                return InternalRow::getShort;
+            }
+
+            @Override
+            public Generator visit(IntType intType) {
+                return InternalRow::getInt;
+            }
+
+            @Override
+            public Generator visit(BigIntType bigIntType) {
+                return InternalRow::getLong;
+            }
+
+            @Override
+            public Generator visit(FloatType floatType) {
+                return (row, i) -> (long) row.getFloat(i);
+            }
+
+            @Override
+            public Generator visit(DoubleType doubleType) {
+                return (row, i) -> (long) row.getDouble(i);
+            }
+
+            @Override
+            public Generator visit(DateType dateType) {
+                return InternalRow::getInt;
+            }
+
+            @Override
+            public Generator visit(TimestampType timestampType) {
+                return (row, i) ->
+                        row.getTimestamp(i, 
timestampType.getPrecision()).getMillisecond();
+            }
+
+            @Override
+            public Generator visit(LocalZonedTimestampType 
localZonedTimestampType) {
+                return (row, i) ->
+                        row.getTimestamp(i, 
localZonedTimestampType.getPrecision())
+                                .getMillisecond();
+            }
+
+            @Override
+            protected Generator defaultMethod(DataType dataType) {
+                throw new UnsupportedOperationException("Unsupported type: " + 
dataType);
+            }
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index e8accda5b..8852e38a3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -24,8 +24,6 @@ import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.format.FileFormatDiscover;
-import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.mergetree.DropDeleteReader;
@@ -41,12 +39,11 @@ import 
org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
 import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.ProjectedRow;
 
 import javax.annotation.Nullable;
@@ -72,6 +69,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
     private final Comparator<InternalRow> keyComparator;
     private final MergeFunctionFactory<KeyValue> mfFactory;
     private final MergeSorter mergeSorter;
+    @Nullable private final FieldsComparator userDefinedSeqComparator;
 
     @Nullable private int[][] keyProjectedFields;
 
@@ -84,49 +82,20 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
 
     private boolean forceKeepDelete = false;
 
-    public KeyValueFileStoreRead(
-            FileIO fileIO,
-            SchemaManager schemaManager,
-            long schemaId,
-            RowType keyType,
-            RowType valueType,
-            Comparator<InternalRow> keyComparator,
-            MergeFunctionFactory<KeyValue> mfFactory,
-            FileFormatDiscover formatDiscover,
-            FileStorePathFactory pathFactory,
-            KeyValueFieldsExtractor extractor,
-            CoreOptions options) {
-        this(
-                schemaManager,
-                schemaId,
-                keyType,
-                valueType,
-                keyComparator,
-                mfFactory,
-                KeyValueFileReaderFactory.builder(
-                        fileIO,
-                        schemaManager,
-                        schemaId,
-                        keyType,
-                        valueType,
-                        formatDiscover,
-                        pathFactory,
-                        extractor,
-                        options));
-    }
-
     public KeyValueFileStoreRead(
             SchemaManager schemaManager,
             long schemaId,
             RowType keyType,
             RowType valueType,
             Comparator<InternalRow> keyComparator,
+            @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
             KeyValueFileReaderFactory.Builder readerFactoryBuilder) {
         this.tableSchema = schemaManager.schema(schemaId);
         this.readerFactoryBuilder = readerFactoryBuilder;
         this.keyComparator = keyComparator;
         this.mfFactory = mfFactory;
+        this.userDefinedSeqComparator = userDefinedSeqComparator;
         this.mergeSorter =
                 new MergeSorter(
                         CoreOptions.fromMap(tableSchema.options()), keyType, 
valueType, null);
@@ -227,7 +196,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                             batchMergeRead(
                                     split.partition(), split.bucket(), 
split.dataFiles(), false),
                             keyComparator,
-                            null,
+                            userDefinedSeqComparator,
                             mergeSorter,
                             forceKeepDelete);
         }
@@ -256,7 +225,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                                             ? overlappedSectionFactory
                                             : nonOverlappedSectionFactory,
                                     keyComparator,
-                                    null,
+                                    userDefinedSeqComparator,
                                     mergeFuncWrapper,
                                     mergeSorter));
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 269ff2dc5..8fb99645b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -55,12 +55,12 @@ import 
org.apache.paimon.mergetree.compact.UniversalCompaction;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.SequenceGenerator;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,9 +84,9 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
     private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
     private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
+    private final Supplier<FieldsComparator> udsComparatorSupplier;
     private final Supplier<RecordEqualiser> valueEqualiserSupplier;
     private final MergeFunctionFactory<KeyValue> mfFactory;
-    private final TableSchema schema;
     private final CoreOptions options;
     private final FileIO fileIO;
     private final RowType keyType;
@@ -100,6 +100,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             RowType keyType,
             RowType valueType,
             Supplier<Comparator<InternalRow>> keyComparatorSupplier,
+            Supplier<FieldsComparator> udsComparatorSupplier,
             Supplier<RecordEqualiser> valueEqualiserSupplier,
             MergeFunctionFactory<KeyValue> mfFactory,
             FileStorePathFactory pathFactory,
@@ -114,6 +115,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         this.fileIO = fileIO;
         this.keyType = keyType;
         this.valueType = valueType;
+        this.udsComparatorSupplier = udsComparatorSupplier;
         this.readerFactoryBuilder =
                 KeyValueFileReaderFactory.builder(
                         fileIO,
@@ -137,7 +139,6 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         this.keyComparatorSupplier = keyComparatorSupplier;
         this.valueEqualiserSupplier = valueEqualiserSupplier;
         this.mfFactory = mfFactory;
-        this.schema = schemaManager.schema(schemaId);
         this.options = options;
     }
 
@@ -186,7 +187,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 options.commitForceCompact(),
                 options.changelogProducer(),
                 restoreIncrement,
-                SequenceGenerator.create(schema, options));
+                UserDefinedSeqComparator.create(valueType, options));
     }
 
     @VisibleForTesting
@@ -204,7 +205,10 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             return new NoopCompactManager();
         } else {
             Comparator<InternalRow> keyComparator = 
keyComparatorSupplier.get();
-            CompactRewriter rewriter = createRewriter(partition, bucket, 
keyComparator, levels);
+            @Nullable FieldsComparator userDefinedSeqComparator = 
udsComparatorSupplier.get();
+            CompactRewriter rewriter =
+                    createRewriter(
+                            partition, bucket, keyComparator, 
userDefinedSeqComparator, levels);
             return new MergeTreeCompactManager(
                     compactExecutor,
                     levels,
@@ -220,7 +224,11 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     }
 
     private MergeTreeCompactRewriter createRewriter(
-            BinaryRow partition, int bucket, Comparator<InternalRow> 
keyComparator, Levels levels) {
+            BinaryRow partition,
+            int bucket,
+            Comparator<InternalRow> keyComparator,
+            @Nullable FieldsComparator userDefinedSeqComparator,
+            Levels levels) {
         KeyValueFileReaderFactory readerFactory = 
readerFactoryBuilder.build(partition, bucket);
         KeyValueFileWriterFactory writerFactory =
                 writerFactoryBuilder.build(partition, bucket, options);
@@ -235,6 +243,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         readerFactory,
                         writerFactory,
                         keyComparator,
+                        userDefinedSeqComparator,
                         mfFactory,
                         mergeSorter,
                         valueEqualiserSupplier.get(),
@@ -253,6 +262,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                             readerFactory,
                             writerFactory,
                             keyComparator,
+                            userDefinedSeqComparator,
                             mfFactory,
                             mergeSorter,
                             new FirstRowMergeFunctionWrapperFactory());
@@ -265,6 +275,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                             readerFactory,
                             writerFactory,
                             keyComparator,
+                            userDefinedSeqComparator,
                             mfFactory,
                             mergeSorter,
                             new LookupMergeFunctionWrapperFactory(
@@ -273,7 +284,12 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 }
             default:
                 return new MergeTreeCompactRewriter(
-                        readerFactory, writerFactory, keyComparator, 
mfFactory, mergeSorter);
+                        readerFactory,
+                        writerFactory,
+                        keyComparator,
+                        userDefinedSeqComparator,
+                        mfFactory,
+                        mergeSorter);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 9fac38114..91ccf9c11 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -164,13 +164,13 @@ public class SchemaValidation {
             }
         }
 
-        Optional<String> sequenceField = options.sequenceField();
+        Optional<List<String>> sequenceField = options.sequenceField();
         sequenceField.ifPresent(
-                field ->
+                fields ->
                         checkArgument(
-                                schema.fieldNames().contains(field),
-                                "Nonexistent sequence field: '%s'",
-                                field));
+                                schema.fieldNames().containsAll(fields),
+                                "Nonexistent sequence fields: '%s'",
+                                fields));
 
         Optional<String> rowkindField = options.rowkindField();
         rowkindField.ifPresent(
@@ -181,11 +181,13 @@ public class SchemaValidation {
                                 field));
 
         sequenceField.ifPresent(
-                field ->
-                        checkArgument(
-                                options.fieldAggFunc(field) == null,
-                                "Should not define aggregation on sequence 
field: '%s'",
-                                field));
+                fields ->
+                        fields.forEach(
+                                field ->
+                                        checkArgument(
+                                                options.fieldAggFunc(field) == 
null,
+                                                "Should not define aggregation 
on sequence field: '%s'",
+                                                field)));
 
         CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
         if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 9f008eb39..3ed9a3ac7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -109,7 +109,7 @@ public class LocalTableQuery implements TableQuery {
             int bucket,
             List<DataFileMeta> beforeFiles,
             List<DataFileMeta> dataFiles) {
-        LookupLevels lookupLevels =
+        LookupLevels<KeyValue> lookupLevels =
                 tableView.computeIfAbsent(partition, k -> new 
HashMap<>()).get(bucket);
         if (lookupLevels == null) {
             Preconditions.checkArgument(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
deleted file mode 100644
index 9c6b81159..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.SequenceAutoPadding;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.CharType;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypeDefaultVisitor;
-import org.apache.paimon.types.DataTypeFamily;
-import org.apache.paimon.types.DateType;
-import org.apache.paimon.types.DecimalType;
-import org.apache.paimon.types.DoubleType;
-import org.apache.paimon.types.FloatType;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.LocalZonedTimestampType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.SmallIntType;
-import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.TinyIntType;
-import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.InternalRowUtils;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/** Generate sequence number. */
-public class SequenceGenerator {
-
-    private final int index;
-    private final List<SequenceAutoPadding> paddings;
-
-    private final Generator generator;
-    private final DataType fieldType;
-
-    public SequenceGenerator(String field, RowType rowType) {
-        this(field, rowType, Collections.emptyList());
-    }
-
-    public SequenceGenerator(String field, RowType rowType, 
List<SequenceAutoPadding> paddings) {
-        index = rowType.getFieldNames().indexOf(field);
-        this.paddings = paddings;
-
-        if (index == -1) {
-            throw new RuntimeException(
-                    String.format(
-                            "Can not find sequence field %s in table schema: 
%s", field, rowType));
-        }
-        fieldType = rowType.getTypeAt(index);
-        generator = fieldType.accept(new SequenceGeneratorVisitor());
-    }
-
-    public SequenceGenerator(int index, DataType dataType) {
-        this.index = index;
-        this.paddings = Collections.emptyList();
-
-        this.fieldType = dataType;
-        if (index == -1) {
-            throw new RuntimeException(String.format("Index : %s is invalid", 
index));
-        }
-        generator = fieldType.accept(new SequenceGeneratorVisitor());
-    }
-
-    @Nullable
-    public static SequenceGenerator create(TableSchema schema, CoreOptions 
options) {
-        List<SequenceAutoPadding> sequenceAutoPadding =
-                options.sequenceAutoPadding().stream()
-                        .map(SequenceAutoPadding::fromString)
-                        .collect(Collectors.toList());
-        return options.sequenceField()
-                .map(
-                        field ->
-                                new SequenceGenerator(
-                                        field, schema.logicalRowType(), 
sequenceAutoPadding))
-                .orElse(null);
-    }
-
-    public int index() {
-        return index;
-    }
-
-    public DataType fieldType() {
-        return fieldType;
-    }
-
-    @Nullable
-    public Long generateWithoutPadding(InternalRow row) {
-        return generator.generateNullable(row, index);
-    }
-
-    public long generateWithPadding(InternalRow row, long incrSeq) {
-        long sequence = generator.generate(row, index);
-        for (SequenceAutoPadding padding : paddings) {
-            switch (padding) {
-                case ROW_KIND_FLAG:
-                    sequence = addRowKindFlag(sequence, row.getRowKind());
-                    break;
-                case SECOND_TO_MICRO:
-                    sequence = secondToMicro(sequence, incrSeq);
-                    break;
-                case MILLIS_TO_MICRO:
-                    sequence = millisToMicro(sequence, incrSeq);
-                    break;
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unknown sequence padding mode " + padding);
-            }
-        }
-        return sequence;
-    }
-
-    private long addRowKindFlag(long sequence, RowKind rowKind) {
-        return (sequence << 1) | (rowKind.isAdd() ? 1 : 0);
-    }
-
-    private long millisToMicro(long sequence, long incrSeq) {
-        // Generated value is millis
-        return sequence * 1_000 + (incrSeq % 1_000);
-    }
-
-    private long secondToMicro(long sequence, long incrSeq) {
-        // timestamp returns millis
-        long second = fieldType.is(DataTypeFamily.TIMESTAMP) ? sequence / 1000 
: sequence;
-        return second * 1_000_000 + (incrSeq % 1_000_000);
-    }
-
-    private interface Generator {
-        long generate(InternalRow row, int i);
-
-        @Nullable
-        default Long generateNullable(InternalRow row, int i) {
-            if (row.isNullAt(i)) {
-                return null;
-            }
-            return generate(row, i);
-        }
-    }
-
-    private static class SequenceGeneratorVisitor extends 
DataTypeDefaultVisitor<Generator> {
-
-        @Override
-        public Generator visit(CharType charType) {
-            return stringGenerator();
-        }
-
-        @Override
-        public Generator visit(VarCharType varCharType) {
-            return stringGenerator();
-        }
-
-        private Generator stringGenerator() {
-            return (row, i) -> Long.parseLong(row.getString(i).toString());
-        }
-
-        @Override
-        public Generator visit(DecimalType decimalType) {
-            return (row, i) ->
-                    InternalRowUtils.castToIntegral(
-                            row.getDecimal(i, decimalType.getPrecision(), 
decimalType.getScale()));
-        }
-
-        @Override
-        public Generator visit(TinyIntType tinyIntType) {
-            return InternalRow::getByte;
-        }
-
-        @Override
-        public Generator visit(SmallIntType smallIntType) {
-            return InternalRow::getShort;
-        }
-
-        @Override
-        public Generator visit(IntType intType) {
-            return InternalRow::getInt;
-        }
-
-        @Override
-        public Generator visit(BigIntType bigIntType) {
-            return InternalRow::getLong;
-        }
-
-        @Override
-        public Generator visit(FloatType floatType) {
-            return (row, i) -> (long) row.getFloat(i);
-        }
-
-        @Override
-        public Generator visit(DoubleType doubleType) {
-            return (row, i) -> (long) row.getDouble(i);
-        }
-
-        @Override
-        public Generator visit(DateType dateType) {
-            return InternalRow::getInt;
-        }
-
-        @Override
-        public Generator visit(TimestampType timestampType) {
-            return (row, i) -> row.getTimestamp(i, 
timestampType.getPrecision()).getMillisecond();
-        }
-
-        @Override
-        public Generator visit(LocalZonedTimestampType 
localZonedTimestampType) {
-            return (row, i) ->
-                    row.getTimestamp(i, 
localZonedTimestampType.getPrecision()).getMillisecond();
-        }
-
-        @Override
-        protected Generator defaultMethod(DataType dataType) {
-            throw new UnsupportedOperationException("Unsupported type: " + 
dataType);
-        }
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
new file mode 100644
index 000000000..b1ea75ac1
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link FieldsComparator} for user defined sequence fields. */
+public class UserDefinedSeqComparator implements FieldsComparator {
+
+    private final int[] fields;
+    private final RecordComparator comparator;
+
+    public UserDefinedSeqComparator(int[] fields, RecordComparator comparator) 
{
+        this.fields = fields;
+        this.comparator = comparator;
+    }
+
+    @Override
+    public int[] compareFields() {
+        return fields;
+    }
+
+    @Override
+    public int compare(InternalRow o1, InternalRow o2) {
+        return comparator.compare(o1, o2);
+    }
+
+    @Nullable
+    public static UserDefinedSeqComparator create(RowType rowType, CoreOptions 
options) {
+        Optional<List<String>> sequenceField = options.sequenceField();
+        if (!sequenceField.isPresent()) {
+            return null;
+        }
+
+        List<String> fieldNames = rowType.getFieldNames();
+        int[] fields = 
sequenceField.get().stream().mapToInt(fieldNames::indexOf).toArray();
+        RecordComparator comparator =
+                CodeGenUtils.newRecordComparator(
+                        rowType.getFieldTypes(), fields, 
"UserDefinedSeqComparator");
+        return new UserDefinedSeqComparator(fields, comparator);
+    }
+
+    @Nullable
+    public static UserDefinedSeqComparator create(RowType rowType, 
List<String> sequenceFields) {
+        if (sequenceFields.isEmpty()) {
+            return null;
+        }
+
+        List<String> fieldNames = rowType.getFieldNames();
+        int[] fields = 
sequenceFields.stream().mapToInt(fieldNames::indexOf).toArray();
+        RecordComparator comparator =
+                CodeGenUtils.newRecordComparator(
+                        rowType.getFieldTypes(), fields, 
"UserDefinedSeqComparator");
+        return new UserDefinedSeqComparator(fields, comparator);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 1aa708306..6e718a085 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -554,6 +554,7 @@ public abstract class MergeTreeTestBase {
                         dropDelete,
                         readerFactory,
                         comparator,
+                        null,
                         DeduplicateMergeFunction.factory().create(),
                         new MergeSorter(options, null, null, null));
         List<TestRecord> records = new ArrayList<>();
@@ -600,6 +601,7 @@ public abstract class MergeTreeTestBase {
                             dropDelete,
                             compactReaderFactory,
                             comparator,
+                            null,
                             DeduplicateMergeFunction.factory().create(),
                             new MergeSorter(options, null, null, null));
             writer.write(new RecordReaderIterator<>(sectionsReader));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 170a66779..0d873507f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -66,6 +66,7 @@ public abstract class SortBufferWriteBufferTestBase {
                     new RowType(Collections.singletonList(new DataField(0, 
"key", new IntType()))),
                     new RowType(
                             Collections.singletonList(new DataField(1, 
"value", new BigIntType()))),
+                    null,
                     new HeapMemorySegmentPool(32 * 1024 * 3L, 32 * 1024),
                     false,
                     128,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 9c731ffea..db37284be 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -131,7 +131,7 @@ public class SchemaManagerTest {
                                                                         "f4"),
                                                                 ""))))
                 .isInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining("Nonexistent sequence field: 'f4'");
+                .hasMessageContaining("Nonexistent sequence fields: '[f4]'");
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index d561ce5ca..ff280c27e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -209,60 +209,6 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 
"1|11|101|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
-    @Test
-    public void testPaddingSequenceNumber() throws Exception {
-        RowType rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT(),
-                            DataTypes.INT(),
-                            DataTypes.INT(),
-                            DataTypes.INT(),
-                            DataTypes.STRING()
-                        },
-                        new String[] {"pt", "a", "b", "sec", "non_time"});
-        GenericRow row1 = GenericRow.of(1, 10, 100, 1685530987, 
BinaryString.fromString("a1"));
-        GenericRow row2 = GenericRow.of(1, 10, 101, 1685530987, 
BinaryString.fromString("a2"));
-        GenericRow row3 = GenericRow.of(1, 10, 101, 1685530987, 
BinaryString.fromString("a3"));
-        FileStoreTable table =
-                createFileStoreTable(
-                        conf -> {
-                            conf.set(CoreOptions.SEQUENCE_FIELD, "sec");
-                            conf.set(
-                                    CoreOptions.SEQUENCE_AUTO_PADDING,
-                                    
CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO.toString());
-                        },
-                        rowType);
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
-        write.write(row1);
-        write.write(row2);
-        commit.commit(0, write.prepareCommit(false, 0));
-        write.write(row3);
-        commit.commit(1, write.prepareCommit(false, 1));
-        write.close();
-        commit.close();
-
-        ReadBuilder readBuilder = table.newReadBuilder();
-        Function<InternalRow, String> toString =
-                row ->
-                        row.getInt(0)
-                                + "|"
-                                + row.getInt(1)
-                                + "|"
-                                + row.getInt(2)
-                                + "|"
-                                + row.getInt(3)
-                                + "|"
-                                + row.getString(4);
-        assertThat(
-                        getResult(
-                                readBuilder.newRead(),
-                                readBuilder.newScan().plan().splits(),
-                                toString))
-                
.isEqualTo(Collections.singletonList("1|10|101|1685530987|a3"));
-    }
-
     @Test
     public void testBatchReadWrite() throws Exception {
         writeData();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
deleted file mode 100644
index f15ed20c0..000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.GenericArray;
-import org.apache.paimon.data.GenericMap;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-
-import org.junit.jupiter.api.Test;
-
-import java.time.LocalDateTime;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static 
org.apache.paimon.CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO;
-import static org.apache.paimon.CoreOptions.SequenceAutoPadding.ROW_KIND_FLAG;
-import static 
org.apache.paimon.CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Test for {@link SequenceGenerator}. */
-public class SequenceGeneratorTest {
-
-    private static final RowType ALL_DATA_TYPE =
-            RowType.of(
-                    new DataType[] {
-                        DataTypes.INT(), // _id
-                        DataTypes.DECIMAL(2, 1), // pt
-                        DataTypes.INT(), // second
-                        DataTypes.BOOLEAN(),
-                        DataTypes.TINYINT(),
-                        DataTypes.SMALLINT(),
-                        DataTypes.BIGINT(),
-                        DataTypes.BIGINT(), // millis
-                        DataTypes.FLOAT(),
-                        DataTypes.DOUBLE(),
-                        DataTypes.STRING(),
-                        DataTypes.DATE(),
-                        DataTypes.TIMESTAMP(0),
-                        DataTypes.TIMESTAMP(3),
-                        DataTypes.TIMESTAMP(6),
-                        DataTypes.CHAR(10),
-                        DataTypes.VARCHAR(20),
-                        DataTypes.BINARY(10),
-                        DataTypes.VARBINARY(20),
-                        DataTypes.BYTES(),
-                        DataTypes.TIME(),
-                        DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
-                        DataTypes.MAP(DataTypes.INT(), DataTypes.INT()),
-                        DataTypes.ARRAY(DataTypes.STRING()),
-                        DataTypes.MULTISET(DataTypes.VARCHAR(8))
-                    },
-                    new String[] {
-                        "_id",
-                        "pt",
-                        "_intsecond",
-                        "_boolean",
-                        "_tinyint",
-                        "_smallint",
-                        "_bigint",
-                        "_bigintmillis",
-                        "_float",
-                        "_double",
-                        "_string",
-                        "_date",
-                        "_timestamp0",
-                        "_timestamp3",
-                        "_timestamp6",
-                        "_char",
-                        "_varchar",
-                        "_binary",
-                        "_varbinary",
-                        "_bytes",
-                        "_time",
-                        "_localtimestamp",
-                        "_map",
-                        "_array",
-                        "_multiset",
-                    });
-    private static final InternalRow row =
-            GenericRow.of(
-                    1,
-                    Decimal.fromUnscaledLong(10, 2, 1),
-                    1685548953,
-                    true,
-                    (byte) 2,
-                    (short) 3,
-                    4000000000000L,
-                    1685548953000L,
-                    2.81f,
-                    3.678008,
-                    BinaryString.fromString("1"),
-                    375, /* 1971-01-11 */
-                    Timestamp.fromEpochMillis(1685548953000L),
-                    Timestamp.fromEpochMillis(1685548953123L),
-                    Timestamp.fromMicros(1685548953123456L),
-                    BinaryString.fromString("3"),
-                    BinaryString.fromString("4"),
-                    "5".getBytes(),
-                    "6".getBytes(),
-                    "7".getBytes(),
-                    123,
-                    Timestamp.fromMicros(1685548953123456L),
-                    new GenericMap(
-                            Collections.singletonMap(
-                                    BinaryString.fromString("mapKey"),
-                                    BinaryString.fromString("mapVal"))),
-                    new GenericArray(
-                            new BinaryString[] {
-                                BinaryString.fromString("a"), 
BinaryString.fromString("b")
-                            }),
-                    new GenericMap(
-                            
Collections.singletonMap(BinaryString.fromString("multiset"), 1)));
-
-    @Test
-    public void testGenerate() {
-        assertThat(getGenerator("_id").generateWithPadding(row, 
0)).isEqualTo(1);
-        assertThat(getGenerator("pt").generateWithPadding(row, 
0)).isEqualTo(1);
-        assertThat(getGenerator("_intsecond").generateWithPadding(row, 
0)).isEqualTo(1685548953);
-        assertThat(getGenerator("_tinyint").generateWithPadding(row, 
0)).isEqualTo(2);
-        assertThat(getGenerator("_smallint").generateWithPadding(row, 
0)).isEqualTo(3);
-        assertThat(getGenerator("_bigint").generateWithPadding(row, 
0)).isEqualTo(4000000000000L);
-        assertThat(getGenerator("_bigintmillis").generateWithPadding(row, 0))
-                .isEqualTo(1685548953000L);
-        assertThat(getGenerator("_float").generateWithPadding(row, 
0)).isEqualTo(2);
-        assertThat(getGenerator("_double").generateWithPadding(row, 
0)).isEqualTo(3);
-        assertThat(getGenerator("_string").generateWithPadding(row, 
0)).isEqualTo(1);
-        assertThat(getGenerator("_date").generateWithPadding(row, 
0)).isEqualTo(375);
-        assertThat(getGenerator("_timestamp0").generateWithPadding(row, 0))
-                .isEqualTo(1685548953000L);
-        assertThat(getGenerator("_timestamp3").generateWithPadding(row, 0))
-                .isEqualTo(1685548953123L);
-        assertThat(getGenerator("_timestamp6").generateWithPadding(row, 0))
-                .isEqualTo(1685548953123L);
-        assertThat(getGenerator("_char").generateWithPadding(row, 
0)).isEqualTo(3);
-        assertThat(getGenerator("_varchar").generateWithPadding(row, 
0)).isEqualTo(4);
-        assertThat(getGenerator("_localtimestamp").generateWithPadding(row, 0))
-                .isEqualTo(1685548953123L);
-        assertUnsupportedDatatype("_boolean");
-        assertUnsupportedDatatype("_binary");
-        assertUnsupportedDatatype("_varbinary");
-        assertUnsupportedDatatype("_bytes");
-        assertUnsupportedDatatype("_time");
-        assertUnsupportedDatatype("_map");
-        assertUnsupportedDatatype("_array");
-        assertUnsupportedDatatype("_multiset");
-    }
-
-    @Test
-    public void testGenerateWithPadding() {
-        assertThat(generateWithPaddingOnSecond("_id", 5)).isEqualTo(1000005L);
-        assertThat(generateWithPaddingOnSecond("pt", 5)).isEqualTo(1000005L);
-
-        assertThat(generateWithPaddingOnSecond("_intsecond", 
5)).isEqualTo(1685548953000005L);
-        assertThat(generateWithPaddingOnSecond("_tinyint", 
5)).isEqualTo(2000005L);
-
-        assertThat(generateWithPaddingOnSecond("_smallint", 
5)).isEqualTo(3000005L);
-
-        assertThat(generateWithPaddingOnMillis("_bigint", 
5)).isEqualTo(4000000000000005L);
-
-        assertThat(generateWithPaddingOnMillis("_bigintmillis", 
5)).isEqualTo(1685548953000005L);
-
-        assertThat(generateWithPaddingOnMillis("_float", 5)).isEqualTo(2005);
-
-        assertThat(generateWithPaddingOnMillis("_double", 5)).isEqualTo(3005);
-
-        assertThat(generateWithPaddingOnMillis("_string", 5)).isEqualTo(1005);
-
-        assertThat(generateWithPaddingOnMillis("_date", 5)).isEqualTo(375005);
-
-        assertThat(generateWithPaddingOnSecond("_timestamp0", 
5)).isEqualTo(1685548953000005L);
-
-        assertThat(generateWithPaddingOnMillis("_timestamp3", 
5)).isEqualTo(1685548953123005L);
-
-        assertThat(generateWithPaddingOnMillis("_timestamp6", 
5)).isEqualTo(1685548953123005L);
-
-        assertThat(generateWithPaddingOnMillis("_char", 5)).isEqualTo(3005);
-
-        assertThat(generateWithPaddingOnMillis("_varchar", 5)).isEqualTo(4005);
-        assertThat(generateWithPaddingOnSecond("_localtimestamp", 
5)).isEqualTo(1685548953000005L);
-        assertThat(generateWithPaddingOnMillis("_localtimestamp", 
5)).isEqualTo(1685548953123005L);
-        assertUnsupportedDatatype("_boolean");
-        assertUnsupportedDatatype("_binary");
-        assertUnsupportedDatatype("_varbinary");
-        assertUnsupportedDatatype("_bytes");
-        assertUnsupportedDatatype("_time");
-        assertUnsupportedDatatype("_map");
-        assertUnsupportedDatatype("_array");
-        assertUnsupportedDatatype("_multiset");
-    }
-
-    @Test
-    public void testGenerateWithPaddingRowKind() {
-        assertThat(generateWithPaddingOnRowKind(1L, 
RowKind.INSERT)).isEqualTo(3);
-        assertThat(generateWithPaddingOnRowKind(1L, 
RowKind.UPDATE_AFTER)).isEqualTo(3);
-        assertThat(generateWithPaddingOnRowKind(1L, 
RowKind.UPDATE_BEFORE)).isEqualTo(2);
-        assertThat(generateWithPaddingOnRowKind(1L, 
RowKind.DELETE)).isEqualTo(2);
-
-        long maxMicros =
-                
Timestamp.fromLocalDateTime(LocalDateTime.parse("5000-01-01T00:00:00")).toMicros();
-        assertThat(generateWithPaddingOnRowKind(maxMicros, RowKind.INSERT))
-                .isEqualTo(191235168000000001L);
-
-        long sequence = generateWithPaddingOnMicrosAndRowKind(1L, 20, 
RowKind.INSERT);
-        assertThat(sequence).isEqualTo(2041);
-        sequence = generateWithPaddingOnMicrosAndRowKind(1L, 30, 
RowKind.UPDATE_BEFORE);
-        System.out.println(sequence);
-        assertThat(sequence).isEqualTo(2060);
-    }
-
-    private SequenceGenerator getGenerator(String field) {
-        return getGenerator(field, Collections.emptyList());
-    }
-
-    private SequenceGenerator getGenerator(
-            String field, List<CoreOptions.SequenceAutoPadding> paddings) {
-        return new SequenceGenerator(field, ALL_DATA_TYPE, paddings);
-    }
-
-    private void assertUnsupportedDatatype(String field) {
-        assertThatThrownBy(() -> getGenerator(field).generateWithPadding(row, 
0))
-                .isInstanceOf(UnsupportedOperationException.class);
-    }
-
-    private long generateWithPaddingOnSecond(String field, long incrSeq) {
-        return getGenerator(field, Collections.singletonList(SECOND_TO_MICRO))
-                .generateWithPadding(row, incrSeq);
-    }
-
-    private long generateWithPaddingOnMillis(String field, long incrSeq) {
-        return getGenerator(field, Collections.singletonList(MILLIS_TO_MICRO))
-                .generateWithPadding(row, incrSeq);
-    }
-
-    private long generateWithPaddingOnRowKind(long sequence, RowKind rowKind) {
-        return getGenerator("_bigint", 
Collections.singletonList(ROW_KIND_FLAG))
-                .generateWithPadding(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 
0, sequence), 0);
-    }
-
-    private long generateWithPaddingOnMicrosAndRowKind(
-            long sequence, long incrSeq, RowKind rowKind) {
-        return getGenerator("_bigint", Arrays.asList(MILLIS_TO_MICRO, 
ROW_KIND_FLAG))
-                .generateWithPadding(
-                        GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, 
sequence), incrSeq);
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index 068d5d33a..afd660fd3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -173,8 +173,8 @@ public class FilesTableTest extends TableTestBase {
             DataFileMeta file = fileEntry.file();
             String minKey = String.valueOf(file.minKey().getInt(0));
             String maxKey = String.valueOf(file.maxKey().getInt(0));
-            String minSequenceNumber = 
String.valueOf(file.minSequenceNumber());
-            String maxSequenceNumber = 
String.valueOf(file.maxSequenceNumber());
+            String minCol1 = 
String.valueOf(file.valueStats().minValues().getInt(2));
+            String maxCol1 = 
String.valueOf(file.valueStats().maxValues().getInt(2));
             expectedRow.add(
                     GenericRow.of(
                             BinaryString.fromString(Arrays.toString(new 
String[] {partition})),
@@ -191,12 +191,10 @@ public class FilesTableTest extends TableTestBase {
                                     String.format("{col1=%s, pk=%s, pt=%s}", 
0, 0, 0)),
                             BinaryString.fromString(
                                     String.format(
-                                            "{col1=%s, pk=%s, pt=%s}",
-                                            minSequenceNumber, minKey, 
partition)),
+                                            "{col1=%s, pk=%s, pt=%s}", 
minCol1, minKey, partition)),
                             BinaryString.fromString(
                                     String.format(
-                                            "{col1=%s, pk=%s, pt=%s}",
-                                            maxSequenceNumber, maxKey, 
partition)),
+                                            "{col1=%s, pk=%s, pt=%s}", 
maxCol1, maxKey, partition)),
                             file.minSequenceNumber(),
                             file.maxSequenceNumber(),
                             file.creationTime()));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 7f9b019bd..185cf554b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -31,12 +31,13 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.sort.BinaryExternalSortBuffer;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.MutableObjectIterator;
 import org.apache.paimon.utils.PartialRow;
 import org.apache.paimon.utils.TypeUtils;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import javax.annotation.Nullable;
 
@@ -46,8 +47,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-
-import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** Lookup table of full cache. */
 public abstract class FullCacheLookupTable implements LookupTable {
@@ -55,7 +55,9 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     protected final Context context;
     protected final RocksDBStateFactory stateFactory;
     protected final RowType projectedType;
-    private final boolean sequenceFieldEnabled;
+
+    @Nullable protected final FieldsComparator userDefinedSeqComparator;
+    protected final int appendUdsFieldNumber;
 
     private LookupStreamingReader reader;
     private Predicate specificPartition;
@@ -68,12 +70,31 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
                         context.table.coreOptions().toConfiguration(),
                         null);
         FileStoreTable table = context.table;
-        this.sequenceFieldEnabled =
-                table.primaryKeys().size() > 0
-                        && new 
CoreOptions(table.options()).sequenceField().isPresent();
+        List<String> sequenceFields = new ArrayList<>();
+        if (table.primaryKeys().size() > 0) {
+            new 
CoreOptions(table.options()).sequenceField().ifPresent(sequenceFields::addAll);
+        }
         RowType projectedType = TypeUtils.project(table.rowType(), 
context.projection);
-        if (sequenceFieldEnabled) {
-            projectedType = projectedType.appendDataField(SEQUENCE_NUMBER, 
DataTypes.BIGINT());
+        if (sequenceFields.size() > 0) {
+            RowType.Builder builder = RowType.builder();
+            projectedType.getFields().forEach(f -> builder.field(f.name(), 
f.type()));
+            RowType rowType = table.rowType();
+            AtomicInteger appendUdsFieldNumber = new AtomicInteger(0);
+            sequenceFields.stream()
+                    .filter(projectedType::notContainsField)
+                    .map(rowType::getField)
+                    .forEach(
+                            f -> {
+                                appendUdsFieldNumber.incrementAndGet();
+                                builder.field(f.name(), f.type());
+                            });
+            projectedType = builder.build();
+            this.userDefinedSeqComparator =
+                    UserDefinedSeqComparator.create(projectedType, 
sequenceFields);
+            this.appendUdsFieldNumber = appendUdsFieldNumber.get();
+        } else {
+            this.userDefinedSeqComparator = null;
+            this.appendUdsFieldNumber = 0;
         }
         this.projectedType = projectedType;
     }
@@ -93,7 +114,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
                         IOManager.create(context.tempPath.toString()), 
context.table.coreOptions());
         Predicate predicate = projectedPredicate();
         try (RecordReaderIterator<InternalRow> batch =
-                new RecordReaderIterator<>(reader.nextBatch(true, 
sequenceFieldEnabled))) {
+                new RecordReaderIterator<>(reader.nextBatch(true))) {
             while (batch.hasNext()) {
                 InternalRow row = batch.next();
                 if (predicate == null || predicate.test(row)) {
@@ -124,11 +145,11 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     public void refresh() throws Exception {
         while (true) {
             try (RecordReaderIterator<InternalRow> batch =
-                    new RecordReaderIterator<>(reader.nextBatch(false, 
sequenceFieldEnabled))) {
+                    new RecordReaderIterator<>(reader.nextBatch(false))) {
                 if (!batch.hasNext()) {
                     return;
                 }
-                refresh(batch, sequenceFieldEnabled);
+                refresh(batch);
             }
         }
     }
@@ -136,21 +157,21 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     @Override
     public final List<InternalRow> get(InternalRow key) throws IOException {
         List<InternalRow> values = innerGet(key);
-        if (!sequenceFieldEnabled) {
+        if (appendUdsFieldNumber == 0) {
             return values;
         }
 
         List<InternalRow> dropSequence = new ArrayList<>(values.size());
         for (InternalRow matchedRow : values) {
-            dropSequence.add(new PartialRow(matchedRow.getFieldCount() - 1, 
matchedRow));
+            dropSequence.add(
+                    new PartialRow(matchedRow.getFieldCount() - 
appendUdsFieldNumber, matchedRow));
         }
         return dropSequence;
     }
 
     public abstract List<InternalRow> innerGet(InternalRow key) throws 
IOException;
 
-    public abstract void refresh(Iterator<InternalRow> input, boolean 
orderByLastField)
-            throws IOException;
+    public abstract void refresh(Iterator<InternalRow> input) throws 
IOException;
 
     @Nullable
     public Predicate projectedPredicate() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index ad6f72670..f12d78e80 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -36,7 +36,6 @@ import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FunctionWithIOException;
 import org.apache.paimon.utils.TypeUtils;
@@ -55,7 +54,6 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PARALLELISM;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
-import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
 
 /** A streaming reader to load data into {@link LookupTable}. */
 public class LookupStreamingReader {
@@ -120,19 +118,13 @@ public class LookupStreamingReader {
         return fileStoreTable.copy(newSchema);
     }
 
-    public RecordReader<InternalRow> nextBatch(boolean useParallelism, boolean 
readSequenceNumber)
-            throws Exception {
+    public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws 
Exception {
         List<Split> splits = scan.plan().splits();
         CoreOptions options = CoreOptions.fromMap(table.options());
         FunctionWithIOException<Split, RecordReader<InternalRow>> 
readerSupplier =
-                readSequenceNumber
-                        ? createReaderWithSequenceSupplier()
-                        : split -> readBuilder.newRead().createReader(split);
+                split -> readBuilder.newRead().createReader(split);
 
         RowType readType = TypeUtils.project(table.rowType(), projection);
-        if (readSequenceNumber) {
-            readType = readType.appendDataField(SEQUENCE_NUMBER, 
DataTypes.BIGINT());
-        }
 
         RecordReader<InternalRow> reader;
         if (useParallelism) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index b931d18b4..7f5d036b1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -68,11 +68,10 @@ public class NoPrimaryKeyLookupTable extends 
FullCacheLookupTable {
     }
 
     @Override
-    public void refresh(Iterator<InternalRow> incremental, boolean 
orderByLastField)
-            throws IOException {
-        if (orderByLastField) {
+    public void refresh(Iterator<InternalRow> incremental) throws IOException {
+        if (userDefinedSeqComparator != null) {
             throw new IllegalArgumentException(
-                    "Append table does not support order by last field.");
+                    "Append table does not support user defined sequence 
fields.");
         }
 
         Predicate predicate = projectedPredicate();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 97034070c..889e1e35b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -91,16 +91,14 @@ public class PrimaryKeyLookupTable extends 
FullCacheLookupTable {
     }
 
     @Override
-    public void refresh(Iterator<InternalRow> incremental, boolean 
orderByLastField)
-            throws IOException {
+    public void refresh(Iterator<InternalRow> incremental) throws IOException {
         Predicate predicate = projectedPredicate();
         while (incremental.hasNext()) {
             InternalRow row = incremental.next();
             primaryKeyRow.replaceRow(row);
-            if (orderByLastField) {
+            if (userDefinedSeqComparator != null) {
                 InternalRow previous = tableState.get(primaryKeyRow);
-                int orderIndex = projectedType.getFieldCount() - 1;
-                if (previous != null && previous.getLong(orderIndex) > 
row.getLong(orderIndex)) {
+                if (previous != null && 
userDefinedSeqComparator.compare(previous, row) > 0) {
                     continue;
                 }
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index d0731ebdf..d4fb22c4b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -72,8 +72,7 @@ public class SecondaryIndexLookupTable extends 
PrimaryKeyLookupTable {
     }
 
     @Override
-    public void refresh(Iterator<InternalRow> incremental, boolean 
orderByLastField)
-            throws IOException {
+    public void refresh(Iterator<InternalRow> incremental) throws IOException {
         Predicate predicate = projectedPredicate();
         while (incremental.hasNext()) {
             InternalRow row = incremental.next();
@@ -81,11 +80,10 @@ public class SecondaryIndexLookupTable extends 
PrimaryKeyLookupTable {
 
             boolean previousFetched = false;
             InternalRow previous = null;
-            if (orderByLastField) {
+            if (userDefinedSeqComparator != null) {
                 previous = tableState.get(primaryKeyRow);
                 previousFetched = true;
-                int orderIndex = projectedType.getFieldCount() - 1;
-                if (previous != null && previous.getLong(orderIndex) > 
row.getLong(orderIndex)) {
+                if (previous != null && 
userDefinedSeqComparator.compare(previous, row) > 0) {
                     continue;
                 }
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index caba2daa1..a0529c575 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -31,12 +31,12 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.PrimaryKeyTableUtils;
 import org.apache.paimon.table.sink.RowKindGenerator;
-import org.apache.paimon.table.sink.SequenceGenerator;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyComparatorSupplier;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -62,7 +62,6 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
     private transient RecordComparator keyComparator;
 
     private transient long recordCount;
-    private transient SequenceGenerator sequenceGenerator;
     private transient RowKindGenerator rowKindGenerator;
     private transient MergeFunction<KeyValue> mergeFunction;
 
@@ -88,12 +87,10 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
         CoreOptions options = new CoreOptions(schema.options());
 
         keyProjection =
-                CodeGenUtils.newProjection(
-                        schema.logicalRowType(), 
schema.projection(schema.primaryKeys()));
+                CodeGenUtils.newProjection(valueType, 
schema.projection(schema.primaryKeys()));
         keyComparator = new KeyComparatorSupplier(keyType).get();
 
         recordCount = 0;
-        sequenceGenerator = SequenceGenerator.create(schema, options);
         rowKindGenerator = RowKindGenerator.create(schema, options);
         mergeFunction =
                 PrimaryKeyTableUtils.createMergeFunctionFactory(
@@ -119,6 +116,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
                 new SortBufferWriteBuffer(
                         keyType,
                         valueType,
+                        UserDefinedSeqComparator.create(valueType, options),
                         new HeapMemorySegmentPool(
                                 options.localMergeBufferSize(), 
options.pageSize()),
                         false,
@@ -141,13 +139,9 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
         row.setRowKind(RowKind.INSERT);
 
         InternalRow key = keyProjection.apply(row);
-        long sequenceNumber =
-                sequenceGenerator == null
-                        ? recordCount
-                        : sequenceGenerator.generateWithPadding(row, 
recordCount);
-        if (!buffer.put(sequenceNumber, rowKind, key, row)) {
+        if (!buffer.put(recordCount, rowKind, key, row)) {
             flushBuffer();
-            if (!buffer.put(sequenceNumber, rowKind, key, row)) {
+            if (!buffer.put(recordCount, rowKind, key, row)) {
                 // change row kind back
                 row.setRowKind(rowKind);
                 output.collect(record);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 3af6b722d..065ace85a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -130,7 +130,7 @@ public class LookupTableTest extends TableTestBase {
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
         for (int i = 1; i <= 100_000; i++) {
             InternalRow row = row(i, 11 * i, 111 * i);
-            records.add(Pair.of(table.toKeyBytes(row), 
table.toValueBytes(sequence(row, -1L))));
+            records.add(Pair.of(table.toKeyBytes(row), 
table.toValueBytes(row)));
         }
         records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), 
o2.getKey()));
         TableBulkLoader bulkLoader = table.createBulkLoader();
@@ -146,18 +146,16 @@ public class LookupTableTest extends TableTestBase {
         }
 
         // test refresh to update
-        table.refresh(singletonList(sequence(row(1, 22, 222), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
         List<InternalRow> result = table.get(row(1));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
 
         // test refresh to delete
-        table.refresh(
-                singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 
111)).iterator());
         assertThat(table.get(row(1))).hasSize(0);
 
-        table.refresh(
-                singletonList(sequence(row(RowKind.DELETE, 3, 33, 333), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(RowKind.DELETE, 3, 33, 
333)).iterator());
         assertThat(table.get(row(3))).hasSize(0);
     }
 
@@ -179,7 +177,7 @@ public class LookupTableTest extends TableTestBase {
 
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
         for (int i = 1; i <= 10; i++) {
-            InternalRow row = sequence(row(i, 11 * i, 111 * i), -1L);
+            InternalRow row = row(i, 11 * i, 111 * i);
             records.add(Pair.of(table.toKeyBytes(row), 
table.toValueBytes(row)));
         }
         records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), 
o2.getKey()));
@@ -190,20 +188,19 @@ public class LookupTableTest extends TableTestBase {
         bulkLoader.finish();
 
         // test refresh to update
-        table.refresh(singletonList(sequence(row(1, 22, 222), 1L)).iterator(), 
true);
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
         List<InternalRow> result = table.get(row(1));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
 
         // refresh with old sequence
-        table.refresh(singletonList((sequence(row(1, 33, 333), 
0L))).iterator(), true);
+        table.refresh(singletonList((row(1, 11, 333))).iterator());
         result = table.get(row(1));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
 
         // test refresh delete data with old sequence
-        table.refresh(
-                singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), 
-1L)).iterator(), true);
+        table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 
111)).iterator());
         assertThat(table.get(row(1))).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
     }
@@ -222,21 +219,20 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
-        table.refresh(singletonList(sequence(row(1, 11, 111), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
         List<InternalRow> result = table.get(row(1));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 11, 111);
 
-        table.refresh(singletonList(sequence(row(1, 22, 222), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
         result = table.get(row(1));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
 
-        table.refresh(
-                singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 
111)).iterator());
         assertThat(table.get(row(1))).hasSize(0);
 
-        table.refresh(singletonList(sequence(row(3, 33, 333), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(3, 33, 333)).iterator());
         assertThat(table.get(row(3))).hasSize(0);
     }
 
@@ -254,12 +250,12 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
-        table.refresh(singletonList(sequence(row(1, 11, 111), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
         List<InternalRow> result = table.get(row(1));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 11, 111);
 
-        table.refresh(singletonList(sequence(row(1, 22, 222), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
         result = table.get(row(1));
         assertThat(result).hasSize(0);
     }
@@ -285,7 +281,7 @@ public class LookupTableTest extends TableTestBase {
         for (int i = 1; i <= 100_000; i++) {
             int secKey = rnd.nextInt(i);
             InternalRow row = row(i, secKey, 111 * i);
-            records.add(Pair.of(table.toKeyBytes(row), 
table.toValueBytes(sequence(row, -1L))));
+            records.add(Pair.of(table.toKeyBytes(row), 
table.toValueBytes(row)));
             secKeyToPk.computeIfAbsent(secKey, k -> new HashSet<>()).add(i);
         }
         records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), 
o2.getKey()));
@@ -302,7 +298,7 @@ public class LookupTableTest extends TableTestBase {
         }
 
         // add new sec key to pk
-        table.refresh(singletonList(sequence(row(1, 22, 222), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
         List<InternalRow> result = table.get(row(22));
         assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
     }
@@ -345,21 +341,14 @@ public class LookupTableTest extends TableTestBase {
                     .containsExactlyInAnyOrderElementsOf(entry.getValue());
         }
 
-        JoinedRow joined = new JoinedRow();
         // add new sec key to pk
-        table.refresh(
-                singletonList((InternalRow) joined.replace(row(1, 22, 222), 
GenericRow.of(1L)))
-                        .iterator(),
-                true);
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
         List<InternalRow> result = table.get(row(22));
         assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
         assertThat(result.stream().map(InternalRow::getFieldCount)).allMatch(n 
-> n == 3);
 
         // refresh with old value
-        table.refresh(
-                singletonList((InternalRow) joined.replace(row(1, 22, 333), 
GenericRow.of(0L)))
-                        .iterator(),
-                true);
+        table.refresh(singletonList(row(1, 11, 333)).iterator());
         result = table.get(row(22));
         assertThat(result.stream().map(row -> 
row.getInt(2))).doesNotContain(333);
     }
@@ -378,30 +367,29 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
-        table.refresh(singletonList(sequence(row(1, 11, 111), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
         List<InternalRow> result = table.get(row(11));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 11, 111);
 
-        table.refresh(singletonList(sequence(row(1, 22, 222), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
         assertThat(table.get(row(11))).hasSize(0);
         result = table.get(row(22));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
 
-        table.refresh(singletonList(sequence(row(2, 22, 222), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(2, 22, 222)).iterator());
         result = table.get(row(22));
         assertThat(result).hasSize(2);
         assertRow(result.get(0), 1, 22, 222);
         assertRow(result.get(1), 2, 22, 222);
 
-        table.refresh(
-                singletonList(sequence(row(RowKind.DELETE, 2, 22, 222), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(RowKind.DELETE, 2, 22, 
222)).iterator());
         result = table.get(row(22));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 22, 222);
 
-        table.refresh(singletonList(sequence(row(3, 33, 333), 
-1L)).iterator(), false);
+        table.refresh(singletonList(row(3, 33, 333)).iterator());
         assertThat(table.get(row(33))).hasSize(0);
     }
 
@@ -443,7 +431,7 @@ public class LookupTableTest extends TableTestBase {
         }
 
         // add new join key value
-        table.refresh(singletonList(row(1, 22, 333)).iterator(), false);
+        table.refresh(singletonList(row(1, 22, 333)).iterator());
         List<InternalRow> result = table.get(row(22));
         assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
     }
@@ -462,16 +450,16 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
-        table.refresh(singletonList(row(1, 11, 333)).iterator(), false);
+        table.refresh(singletonList(row(1, 11, 333)).iterator());
         List<InternalRow> result = table.get(row(11));
         assertThat(result).hasSize(0);
 
-        table.refresh(singletonList(row(1, 11, 111)).iterator(), false);
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
         result = table.get(row(11));
         assertThat(result).hasSize(1);
         assertRow(result.get(0), 1, 11, 111);
 
-        table.refresh(singletonList(row(1, 11, 111)).iterator(), false);
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
         result = table.get(row(11));
         assertThat(result).hasSize(2);
         assertRow(result.get(0), 1, 11, 111);
@@ -596,10 +584,6 @@ public class LookupTableTest extends TableTestBase {
         return row;
     }
 
-    private static InternalRow sequence(InternalRow row, long sequenceNumber) {
-        return new JoinedRow(row.getRowKind(), row, 
GenericRow.of(sequenceNumber));
-    }
-
     private static void assertRow(InternalRow resultRow, int... expected) {
         int[] results = new int[expected.length];
         for (int i = 0; i < results.length; i++) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 55a671879..745e92b68 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
@@ -117,19 +118,27 @@ public class TestChangelogDataReadWrite {
                             RecordReader.RecordIterator<KeyValue>,
                             RecordReader.RecordIterator<InternalRow>>
                     rowDataIteratorCreator) {
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
+        long schemaId = 0;
         KeyValueFileStoreRead read =
                 new KeyValueFileStoreRead(
-                        LocalFileIO.create(),
-                        new SchemaManager(LocalFileIO.create(), tablePath),
-                        0,
+                        schemaManager,
+                        schemaId,
                         KEY_TYPE,
                         VALUE_TYPE,
                         COMPARATOR,
+                        null,
                         DeduplicateMergeFunction.factory(),
-                        ignore -> avro,
-                        pathFactory,
-                        EXTRACTOR,
-                        new CoreOptions(new HashMap<>()));
+                        KeyValueFileReaderFactory.builder(
+                                LocalFileIO.create(),
+                                schemaManager,
+                                schemaId,
+                                KEY_TYPE,
+                                VALUE_TYPE,
+                                ignore -> avro,
+                                pathFactory,
+                                EXTRACTOR,
+                                new CoreOptions(new HashMap<>())));
         return new KeyValueTableRead(read, null) {
 
             @Override
@@ -176,6 +185,7 @@ public class TestChangelogDataReadWrite {
                                 KEY_TYPE,
                                 VALUE_TYPE,
                                 () -> COMPARATOR,
+                                () -> null,
                                 () -> EQUALISER,
                                 DeduplicateMergeFunction.factory(),
                                 pathFactory,

Reply via email to