This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fb7efe59f [core] Sequence Auto padding should based on incremental id 
(#2895)
fb7efe59f is described below

commit fb7efe59f899dbbd881c375ea22a437a24e92c13
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Feb 23 16:21:47 2024 +0800

    [core] Sequence Auto padding should based on incremental id (#2895)
---
 .../concepts/primary-key-table/sequence-rowkind.md |   2 +-
 .../apache/paimon/mergetree/MergeTreeWriter.java   |  12 +-
 .../compact/PartialUpdateMergeFunction.java        |   8 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |   5 +
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  12 +-
 .../paimon/table/sink/SequenceGenerator.java       |  29 ++--
 .../apache/paimon/table/sink/TableWriteImpl.java   |   8 --
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   1 +
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  38 ++++--
 .../paimon/table/sink/SequenceGeneratorTest.java   | 148 ++++++++++-----------
 .../paimon/flink/sink/LocalMergeOperator.java      |   4 +-
 11 files changed, 129 insertions(+), 138 deletions(-)

diff --git a/docs/content/concepts/primary-key-table/sequence-rowkind.md 
b/docs/content/concepts/primary-key-table/sequence-rowkind.md
index fd50155d5..80f20dce9 100644
--- a/docs/content/concepts/primary-key-table/sequence-rowkind.md
+++ b/docs/content/concepts/primary-key-table/sequence-rowkind.md
@@ -63,7 +63,7 @@ option to automatically pad the sequence field for you.
 
 2. Insufficient precision: If the provided `sequence.field` doesn't meet the 
precision, like a rough second or
    millisecond, you can set `sequence.auto-padding` to `second-to-micro` or 
`millis-to-micro` so that the precision
-   of sequence number will be made up to microsecond by system.
+   of sequence number will be made up to microsecond by incremental id 
(Calculate within a single bucket).
 
 3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add 
the micro to the second, and then
    pad the row kind flag.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index d12796c74..de5cbf14b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -34,6 +34,7 @@ import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.mergetree.compact.MergeFunction;
 import org.apache.paimon.operation.metrics.WriterMetrics;
+import org.apache.paimon.table.sink.SequenceGenerator;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.RecordWriter;
@@ -66,6 +67,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
     private final KeyValueFileWriterFactory writerFactory;
     private final boolean commitForceCompact;
     private final ChangelogProducer changelogProducer;
+    @Nullable private final SequenceGenerator sequenceGenerator;
 
     private final LinkedHashSet<DataFileMeta> newFiles;
     private final LinkedHashSet<DataFileMeta> newFilesChangelog;
@@ -91,6 +93,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
             boolean commitForceCompact,
             ChangelogProducer changelogProducer,
             @Nullable CommitIncrement increment,
+            @Nullable SequenceGenerator sequenceGenerator,
             WriterMetrics writerMetrics) {
         this.writeBufferSpillable = writeBufferSpillable;
         this.sortMaxFan = sortMaxFan;
@@ -105,6 +108,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         this.writerFactory = writerFactory;
         this.commitForceCompact = commitForceCompact;
         this.changelogProducer = changelogProducer;
+        this.sequenceGenerator = sequenceGenerator;
 
         this.newFiles = new LinkedHashSet<>();
         this.newFilesChangelog = new LinkedHashSet<>();
@@ -148,10 +152,10 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
 
     @Override
     public void write(KeyValue kv) throws Exception {
-        long sequenceNumber =
-                kv.sequenceNumber() == KeyValue.UNKNOWN_SEQUENCE
-                        ? newSequenceNumber()
-                        : kv.sequenceNumber();
+        long sequenceNumber = newSequenceNumber();
+        if (sequenceGenerator != null) {
+            sequenceNumber = sequenceGenerator.generateWithPadding(kv.value(), 
sequenceNumber);
+        }
         boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), 
kv.key(), kv.value());
         if (!success) {
             flushWriteBuffer(false, false);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index b850248da..6c7fa0567 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -144,9 +144,9 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                     row.setField(i, field);
                 }
             } else {
-                Long currentSeq = sequenceGen.generateNullable(kv.value());
+                Long currentSeq = 
sequenceGen.generateWithoutPadding(kv.value());
                 if (currentSeq != null) {
-                    Long previousSeq = sequenceGen.generateNullable(row);
+                    Long previousSeq = sequenceGen.generateWithoutPadding(row);
                     if (previousSeq == null || currentSeq >= previousSeq) {
                         row.setField(
                                 i, aggregator == null ? field : 
aggregator.agg(accumulator, field));
@@ -162,9 +162,9 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         for (int i = 0; i < getters.length; i++) {
             SequenceGenerator sequenceGen = fieldSequences.get(i);
             if (sequenceGen != null) {
-                Long currentSeq = sequenceGen.generateNullable(kv.value());
+                Long currentSeq = 
sequenceGen.generateWithoutPadding(kv.value());
                 if (currentSeq != null) {
-                    Long previousSeq = sequenceGen.generateNullable(row);
+                    Long previousSeq = sequenceGen.generateWithoutPadding(row);
                     FieldAggregator aggregator = fieldAggregators.get(i);
                     if (previousSeq == null || currentSeq >= previousSeq) {
                         if (sequenceGen.index() == i) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 2b2d3f927..3073dc8d9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -53,6 +53,8 @@ import 
org.apache.paimon.mergetree.compact.UniversalCompaction;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.SequenceGenerator;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -82,6 +84,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
     private final Supplier<RecordEqualiser> valueEqualiserSupplier;
     private final MergeFunctionFactory<KeyValue> mfFactory;
+    private final TableSchema schema;
     private final CoreOptions options;
     private final FileIO fileIO;
     private final RowType keyType;
@@ -132,6 +135,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         this.keyComparatorSupplier = keyComparatorSupplier;
         this.valueEqualiserSupplier = valueEqualiserSupplier;
         this.mfFactory = mfFactory;
+        this.schema = schemaManager.schema(schemaId);
         this.options = options;
     }
 
@@ -180,6 +184,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 options.commitForceCompact(),
                 options.changelogProducer(),
                 restoreIncrement,
+                SequenceGenerator.create(schema, options),
                 getWriterMetrics(partition, bucket));
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 2bacb1373..11e557f17 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -38,7 +38,6 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.query.LocalTableQuery;
 import org.apache.paimon.table.sink.RowKindGenerator;
-import org.apache.paimon.table.sink.SequenceGenerator;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.KeyValueTableRead;
@@ -187,23 +186,18 @@ class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
             String commitUser, ManifestCacheFilter manifestFilter) {
         TableSchema schema = schema();
         CoreOptions options = store().options();
-        final SequenceGenerator sequenceGenerator = 
SequenceGenerator.create(schema, options);
-        final RowKindGenerator rowKindGenerator = 
RowKindGenerator.create(schema, options);
-        final KeyValue kv = new KeyValue();
+        RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, 
options);
+        KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 store().newWrite(commitUser, manifestFilter),
                 createRowKeyExtractor(),
                 record -> {
                     InternalRow row = record.row();
-                    long sequenceNumber =
-                            sequenceGenerator == null
-                                    ? KeyValue.UNKNOWN_SEQUENCE
-                                    : sequenceGenerator.generate(row);
                     RowKind rowKind =
                             rowKindGenerator == null
                                     ? row.getRowKind()
                                     : rowKindGenerator.generate(row);
-                    return kv.replace(record.primaryKey(), sequenceNumber, 
rowKind, row);
+                    return kv.replace(record.primaryKey(), 
KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
                 });
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
index d6aaa2864..9c6b81159 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
@@ -45,7 +45,6 @@ import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /** Generate sequence number. */
@@ -108,11 +107,11 @@ public class SequenceGenerator {
     }
 
     @Nullable
-    public Long generateNullable(InternalRow row) {
+    public Long generateWithoutPadding(InternalRow row) {
         return generator.generateNullable(row, index);
     }
 
-    public long generate(InternalRow row) {
+    public long generateWithPadding(InternalRow row, long incrSeq) {
         long sequence = generator.generate(row, index);
         for (SequenceAutoPadding padding : paddings) {
             switch (padding) {
@@ -120,10 +119,10 @@ public class SequenceGenerator {
                     sequence = addRowKindFlag(sequence, row.getRowKind());
                     break;
                 case SECOND_TO_MICRO:
-                    sequence = secondToMicro(sequence);
+                    sequence = secondToMicro(sequence, incrSeq);
                     break;
                 case MILLIS_TO_MICRO:
-                    sequence = millisToMicro(sequence);
+                    sequence = millisToMicro(sequence, incrSeq);
                     break;
                 default:
                     throw new UnsupportedOperationException(
@@ -137,27 +136,15 @@ public class SequenceGenerator {
         return (sequence << 1) | (rowKind.isAdd() ? 1 : 0);
     }
 
-    private long millisToMicro(long sequence) {
+    private long millisToMicro(long sequence, long incrSeq) {
         // Generated value is millis
-        return sequence * 1_000 + getCurrentMicroOfMillis();
+        return sequence * 1_000 + (incrSeq % 1_000);
     }
 
-    private long secondToMicro(long sequence) {
+    private long secondToMicro(long sequence, long incrSeq) {
         // timestamp returns millis
         long second = fieldType.is(DataTypeFamily.TIMESTAMP) ? sequence / 1000 
: sequence;
-        return second * 1_000_000 + getCurrentMicroOfSeconds();
-    }
-
-    private static long getCurrentMicroOfMillis() {
-        long currentNanoTime = System.nanoTime();
-        long mills = TimeUnit.MILLISECONDS.convert(currentNanoTime, 
TimeUnit.NANOSECONDS);
-        return (currentNanoTime - mills * 1_000_000) / 1000;
-    }
-
-    private static long getCurrentMicroOfSeconds() {
-        long currentNanoTime = System.nanoTime();
-        long seconds = TimeUnit.SECONDS.convert(currentNanoTime, 
TimeUnit.NANOSECONDS);
-        return (currentNanoTime - seconds * 1_000_000_000) / 1000;
+        return second * 1_000_000 + (incrSeq % 1_000_000);
     }
 
     private interface Generator {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 1c859bf03..b1cacc4c6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -133,14 +133,6 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
         return record;
     }
 
-    @VisibleForTesting
-    public T writeAndReturnData(InternalRow row) throws Exception {
-        SinkRecord record = toSinkRecord(row);
-        T data = recordExtractor.extract(record);
-        write.write(record.partition(), record.bucket(), data);
-        return data;
-    }
-
     private SinkRecord toSinkRecord(InternalRow row) {
         keyAndBucketExtractor.setRecord(row);
         return new SinkRecord(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 1aa708306..d3cfc2af7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -426,6 +426,7 @@ public abstract class MergeTreeTestBase {
                         options.commitForceCompact(),
                         ChangelogProducer.NONE,
                         null,
+                        null,
                         null);
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 2bd3fe1a4..262320d18 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.KeyValue;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -46,7 +45,6 @@ import org.apache.paimon.table.sink.InnerTableCommit;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
-import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -74,7 +72,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -222,6 +219,7 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                         new String[] {"pt", "a", "b", "sec", "non_time"});
         GenericRow row1 = GenericRow.of(1, 10, 100, 1685530987, 
BinaryString.fromString("a1"));
         GenericRow row2 = GenericRow.of(1, 10, 101, 1685530987, 
BinaryString.fromString("a2"));
+        GenericRow row3 = GenericRow.of(1, 10, 101, 1685530987, 
BinaryString.fromString("a3"));
         FileStoreTable table =
                 createFileStoreTable(
                         conf -> {
@@ -232,17 +230,33 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                         },
                         rowType);
         StreamTableWrite write = table.newWrite(commitUser);
-        long sequenceNumber1 =
-                ((TableWriteImpl<KeyValue>) 
write).writeAndReturnData(row1).sequenceNumber();
-        long sequenceNumber2 =
-                ((TableWriteImpl<KeyValue>) 
write).writeAndReturnData(row2).sequenceNumber();
-        assertThat(TimeUnit.SECONDS.convert(sequenceNumber1, 
TimeUnit.MICROSECONDS))
-                .isEqualTo(1685530987);
-        assertThat(TimeUnit.SECONDS.convert(sequenceNumber2, 
TimeUnit.MICROSECONDS))
-                .isEqualTo(1685530987);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        write.write(row1);
+        write.write(row2);
+        commit.commit(0, write.prepareCommit(false, 0));
+        write.write(row3);
+        commit.commit(1, write.prepareCommit(false, 1));
         write.close();
+        commit.close();
 
-        // Do not check results, they are unstable
+        ReadBuilder readBuilder = table.newReadBuilder();
+        Function<InternalRow, String> toString =
+                row ->
+                        row.getInt(0)
+                                + "|"
+                                + row.getInt(1)
+                                + "|"
+                                + row.getInt(2)
+                                + "|"
+                                + row.getInt(3)
+                                + "|"
+                                + row.getString(4);
+        assertThat(
+                        getResult(
+                                readBuilder.newRead(),
+                                readBuilder.newScan().plan().splits(),
+                                toString))
+                
.isEqualTo(Collections.singletonList("1|10|101|1685530987|a3"));
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
index b8a5795dd..f15ed20c0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
@@ -37,7 +37,6 @@ import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.paimon.CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO;
 import static org.apache.paimon.CoreOptions.SequenceAutoPadding.ROW_KIND_FLAG;
@@ -141,23 +140,28 @@ public class SequenceGeneratorTest {
 
     @Test
     public void testGenerate() {
-        assertThat(getGenerator("_id").generate(row)).isEqualTo(1);
-        assertThat(getGenerator("pt").generate(row)).isEqualTo(1);
-        
assertThat(getGenerator("_intsecond").generate(row)).isEqualTo(1685548953);
-        assertThat(getGenerator("_tinyint").generate(row)).isEqualTo(2);
-        assertThat(getGenerator("_smallint").generate(row)).isEqualTo(3);
-        
assertThat(getGenerator("_bigint").generate(row)).isEqualTo(4000000000000L);
-        
assertThat(getGenerator("_bigintmillis").generate(row)).isEqualTo(1685548953000L);
-        assertThat(getGenerator("_float").generate(row)).isEqualTo(2);
-        assertThat(getGenerator("_double").generate(row)).isEqualTo(3);
-        assertThat(getGenerator("_string").generate(row)).isEqualTo(1);
-        assertThat(getGenerator("_date").generate(row)).isEqualTo(375);
-        
assertThat(getGenerator("_timestamp0").generate(row)).isEqualTo(1685548953000L);
-        
assertThat(getGenerator("_timestamp3").generate(row)).isEqualTo(1685548953123L);
-        
assertThat(getGenerator("_timestamp6").generate(row)).isEqualTo(1685548953123L);
-        assertThat(getGenerator("_char").generate(row)).isEqualTo(3);
-        assertThat(getGenerator("_varchar").generate(row)).isEqualTo(4);
-        
assertThat(getGenerator("_localtimestamp").generate(row)).isEqualTo(1685548953123L);
+        assertThat(getGenerator("_id").generateWithPadding(row, 
0)).isEqualTo(1);
+        assertThat(getGenerator("pt").generateWithPadding(row, 
0)).isEqualTo(1);
+        assertThat(getGenerator("_intsecond").generateWithPadding(row, 
0)).isEqualTo(1685548953);
+        assertThat(getGenerator("_tinyint").generateWithPadding(row, 
0)).isEqualTo(2);
+        assertThat(getGenerator("_smallint").generateWithPadding(row, 
0)).isEqualTo(3);
+        assertThat(getGenerator("_bigint").generateWithPadding(row, 
0)).isEqualTo(4000000000000L);
+        assertThat(getGenerator("_bigintmillis").generateWithPadding(row, 0))
+                .isEqualTo(1685548953000L);
+        assertThat(getGenerator("_float").generateWithPadding(row, 
0)).isEqualTo(2);
+        assertThat(getGenerator("_double").generateWithPadding(row, 
0)).isEqualTo(3);
+        assertThat(getGenerator("_string").generateWithPadding(row, 
0)).isEqualTo(1);
+        assertThat(getGenerator("_date").generateWithPadding(row, 
0)).isEqualTo(375);
+        assertThat(getGenerator("_timestamp0").generateWithPadding(row, 0))
+                .isEqualTo(1685548953000L);
+        assertThat(getGenerator("_timestamp3").generateWithPadding(row, 0))
+                .isEqualTo(1685548953123L);
+        assertThat(getGenerator("_timestamp6").generateWithPadding(row, 0))
+                .isEqualTo(1685548953123L);
+        assertThat(getGenerator("_char").generateWithPadding(row, 
0)).isEqualTo(3);
+        assertThat(getGenerator("_varchar").generateWithPadding(row, 
0)).isEqualTo(4);
+        assertThat(getGenerator("_localtimestamp").generateWithPadding(row, 0))
+                .isEqualTo(1685548953123L);
         assertUnsupportedDatatype("_boolean");
         assertUnsupportedDatatype("_binary");
         assertUnsupportedDatatype("_varbinary");
@@ -170,46 +174,37 @@ public class SequenceGeneratorTest {
 
     @Test
     public void testGenerateWithPadding() {
-        
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("_id")))
-                .isEqualTo(1);
-        
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("pt")))
-                .isEqualTo(1);
-        
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("_intsecond")))
-                .isEqualTo(1685548953);
-        
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("_tinyint")))
-                .isEqualTo(2);
-        
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("_smallint")))
-                .isEqualTo(3);
-        
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_bigint")))
-                .isEqualTo(4000000000000L);
-        
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_bigintmillis")))
-                .isEqualTo(1685548953000L);
-        
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_float")))
-                .isEqualTo(2);
-        
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_double")))
-                .isEqualTo(3);
-        
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_string")))
-                .isEqualTo(1);
-        
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_date")))
-                .isEqualTo(375);
-        
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("_timestamp0")))
-                .isEqualTo(1685548953L);
-        
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_timestamp3")))
-                .isEqualTo(1685548953123L);
-        
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_timestamp6")))
-                .isEqualTo(1685548953123L);
-        
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_char")))
-                .isEqualTo(3);
-        
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_varchar")))
-                .isEqualTo(4);
-        assertThat(
-                        getSecondFromGeneratedWithPadding(
-                                
generateWithPaddingOnSecond("_localtimestamp")))
-                .isEqualTo(1685548953L);
-        assertThat(
-                        getMillisFromGeneratedWithPadding(
-                                
generateWithPaddingOnMillis("_localtimestamp")))
-                .isEqualTo(1685548953123L);
+        assertThat(generateWithPaddingOnSecond("_id", 5)).isEqualTo(1000005L);
+        assertThat(generateWithPaddingOnSecond("pt", 5)).isEqualTo(1000005L);
+
+        assertThat(generateWithPaddingOnSecond("_intsecond", 
5)).isEqualTo(1685548953000005L);
+        assertThat(generateWithPaddingOnSecond("_tinyint", 
5)).isEqualTo(2000005L);
+
+        assertThat(generateWithPaddingOnSecond("_smallint", 
5)).isEqualTo(3000005L);
+
+        assertThat(generateWithPaddingOnMillis("_bigint", 
5)).isEqualTo(4000000000000005L);
+
+        assertThat(generateWithPaddingOnMillis("_bigintmillis", 
5)).isEqualTo(1685548953000005L);
+
+        assertThat(generateWithPaddingOnMillis("_float", 5)).isEqualTo(2005);
+
+        assertThat(generateWithPaddingOnMillis("_double", 5)).isEqualTo(3005);
+
+        assertThat(generateWithPaddingOnMillis("_string", 5)).isEqualTo(1005);
+
+        assertThat(generateWithPaddingOnMillis("_date", 5)).isEqualTo(375005);
+
+        assertThat(generateWithPaddingOnSecond("_timestamp0", 
5)).isEqualTo(1685548953000005L);
+
+        assertThat(generateWithPaddingOnMillis("_timestamp3", 
5)).isEqualTo(1685548953123005L);
+
+        assertThat(generateWithPaddingOnMillis("_timestamp6", 
5)).isEqualTo(1685548953123005L);
+
+        assertThat(generateWithPaddingOnMillis("_char", 5)).isEqualTo(3005);
+
+        assertThat(generateWithPaddingOnMillis("_varchar", 5)).isEqualTo(4005);
+        assertThat(generateWithPaddingOnSecond("_localtimestamp", 
5)).isEqualTo(1685548953000005L);
+        assertThat(generateWithPaddingOnMillis("_localtimestamp", 
5)).isEqualTo(1685548953123005L);
         assertUnsupportedDatatype("_boolean");
         assertUnsupportedDatatype("_binary");
         assertUnsupportedDatatype("_varbinary");
@@ -232,10 +227,11 @@ public class SequenceGeneratorTest {
         assertThat(generateWithPaddingOnRowKind(maxMicros, RowKind.INSERT))
                 .isEqualTo(191235168000000001L);
 
-        assertThat(generateWithPaddingOnMicrosAndRowKind(1L, RowKind.INSERT))
-                .isBetween(2001L, 3999L);
-        assertThat(generateWithPaddingOnMicrosAndRowKind(1L, 
RowKind.UPDATE_BEFORE))
-                .isBetween(2000L, 3998L);
+        long sequence = generateWithPaddingOnMicrosAndRowKind(1L, 20, 
RowKind.INSERT);
+        assertThat(sequence).isEqualTo(2041);
+        sequence = generateWithPaddingOnMicrosAndRowKind(1L, 30, 
RowKind.UPDATE_BEFORE);
+        System.out.println(sequence);
+        assertThat(sequence).isEqualTo(2060);
     }
 
     private SequenceGenerator getGenerator(String field) {
@@ -248,33 +244,29 @@ public class SequenceGeneratorTest {
     }
 
     private void assertUnsupportedDatatype(String field) {
-        assertThatThrownBy(() -> getGenerator(field).generate(row))
+        assertThatThrownBy(() -> getGenerator(field).generateWithPadding(row, 
0))
                 .isInstanceOf(UnsupportedOperationException.class);
     }
 
-    private long generateWithPaddingOnSecond(String field) {
-        return getGenerator(field, 
Collections.singletonList(SECOND_TO_MICRO)).generate(row);
-    }
-
-    private long getSecondFromGeneratedWithPadding(long generated) {
-        return TimeUnit.SECONDS.convert(generated, TimeUnit.MICROSECONDS);
+    private long generateWithPaddingOnSecond(String field, long incrSeq) {
+        return getGenerator(field, Collections.singletonList(SECOND_TO_MICRO))
+                .generateWithPadding(row, incrSeq);
     }
 
-    private long generateWithPaddingOnMillis(String field) {
-        return getGenerator(field, 
Collections.singletonList(MILLIS_TO_MICRO)).generate(row);
+    private long generateWithPaddingOnMillis(String field, long incrSeq) {
+        return getGenerator(field, Collections.singletonList(MILLIS_TO_MICRO))
+                .generateWithPadding(row, incrSeq);
     }
 
     private long generateWithPaddingOnRowKind(long sequence, RowKind rowKind) {
         return getGenerator("_bigint", 
Collections.singletonList(ROW_KIND_FLAG))
-                .generate(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, 
sequence));
+                .generateWithPadding(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 
0, sequence), 0);
     }
 
-    private long generateWithPaddingOnMicrosAndRowKind(long sequence, RowKind 
rowKind) {
+    private long generateWithPaddingOnMicrosAndRowKind(
+            long sequence, long incrSeq, RowKind rowKind) {
         return getGenerator("_bigint", Arrays.asList(MILLIS_TO_MICRO, 
ROW_KIND_FLAG))
-                .generate(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, 
sequence));
-    }
-
-    private long getMillisFromGeneratedWithPadding(long generated) {
-        return TimeUnit.MILLISECONDS.convert(generated, TimeUnit.MICROSECONDS);
+                .generateWithPadding(
+                        GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, 
sequence), incrSeq);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 6ce55a9bd..caba2daa1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -142,7 +142,9 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
 
         InternalRow key = keyProjection.apply(row);
         long sequenceNumber =
-                sequenceGenerator == null ? recordCount : 
sequenceGenerator.generate(row);
+                sequenceGenerator == null
+                        ? recordCount
+                        : sequenceGenerator.generateWithPadding(row, 
recordCount);
         if (!buffer.put(sequenceNumber, rowKind, key, row)) {
             flushBuffer();
             if (!buffer.put(sequenceNumber, rowKind, key, row)) {

Reply via email to