This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fb7efe59f [core] Sequence Auto padding should based on incremental id
(#2895)
fb7efe59f is described below
commit fb7efe59f899dbbd881c375ea22a437a24e92c13
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Feb 23 16:21:47 2024 +0800
[core] Sequence Auto padding should based on incremental id (#2895)
---
.../concepts/primary-key-table/sequence-rowkind.md | 2 +-
.../apache/paimon/mergetree/MergeTreeWriter.java | 12 +-
.../compact/PartialUpdateMergeFunction.java | 8 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 5 +
.../paimon/table/PrimaryKeyFileStoreTable.java | 12 +-
.../paimon/table/sink/SequenceGenerator.java | 29 ++--
.../apache/paimon/table/sink/TableWriteImpl.java | 8 --
.../apache/paimon/mergetree/MergeTreeTestBase.java | 1 +
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 38 ++++--
.../paimon/table/sink/SequenceGeneratorTest.java | 148 ++++++++++-----------
.../paimon/flink/sink/LocalMergeOperator.java | 4 +-
11 files changed, 129 insertions(+), 138 deletions(-)
diff --git a/docs/content/concepts/primary-key-table/sequence-rowkind.md
b/docs/content/concepts/primary-key-table/sequence-rowkind.md
index fd50155d5..80f20dce9 100644
--- a/docs/content/concepts/primary-key-table/sequence-rowkind.md
+++ b/docs/content/concepts/primary-key-table/sequence-rowkind.md
@@ -63,7 +63,7 @@ option to automatically pad the sequence field for you.
2. Insufficient precision: If the provided `sequence.field` doesn't meet the
precision, like a rough second or
millisecond, you can set `sequence.auto-padding` to `second-to-micro` or
`millis-to-micro` so that the precision
- of sequence number will be made up to microsecond by system.
+ of sequence number will be made up to microsecond by incremental id
(Calculate within a single bucket).
3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add
the micro to the second, and then
pad the row kind flag.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index d12796c74..de5cbf14b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -34,6 +34,7 @@ import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.operation.metrics.WriterMetrics;
+import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.RecordWriter;
@@ -66,6 +67,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
private final KeyValueFileWriterFactory writerFactory;
private final boolean commitForceCompact;
private final ChangelogProducer changelogProducer;
+ @Nullable private final SequenceGenerator sequenceGenerator;
private final LinkedHashSet<DataFileMeta> newFiles;
private final LinkedHashSet<DataFileMeta> newFilesChangelog;
@@ -91,6 +93,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
boolean commitForceCompact,
ChangelogProducer changelogProducer,
@Nullable CommitIncrement increment,
+ @Nullable SequenceGenerator sequenceGenerator,
WriterMetrics writerMetrics) {
this.writeBufferSpillable = writeBufferSpillable;
this.sortMaxFan = sortMaxFan;
@@ -105,6 +108,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
this.writerFactory = writerFactory;
this.commitForceCompact = commitForceCompact;
this.changelogProducer = changelogProducer;
+ this.sequenceGenerator = sequenceGenerator;
this.newFiles = new LinkedHashSet<>();
this.newFilesChangelog = new LinkedHashSet<>();
@@ -148,10 +152,10 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
@Override
public void write(KeyValue kv) throws Exception {
- long sequenceNumber =
- kv.sequenceNumber() == KeyValue.UNKNOWN_SEQUENCE
- ? newSequenceNumber()
- : kv.sequenceNumber();
+ long sequenceNumber = newSequenceNumber();
+ if (sequenceGenerator != null) {
+ sequenceNumber = sequenceGenerator.generateWithPadding(kv.value(),
sequenceNumber);
+ }
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(),
kv.key(), kv.value());
if (!success) {
flushWriteBuffer(false, false);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index b850248da..6c7fa0567 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -144,9 +144,9 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
row.setField(i, field);
}
} else {
- Long currentSeq = sequenceGen.generateNullable(kv.value());
+ Long currentSeq =
sequenceGen.generateWithoutPadding(kv.value());
if (currentSeq != null) {
- Long previousSeq = sequenceGen.generateNullable(row);
+ Long previousSeq = sequenceGen.generateWithoutPadding(row);
if (previousSeq == null || currentSeq >= previousSeq) {
row.setField(
i, aggregator == null ? field :
aggregator.agg(accumulator, field));
@@ -162,9 +162,9 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
for (int i = 0; i < getters.length; i++) {
SequenceGenerator sequenceGen = fieldSequences.get(i);
if (sequenceGen != null) {
- Long currentSeq = sequenceGen.generateNullable(kv.value());
+ Long currentSeq =
sequenceGen.generateWithoutPadding(kv.value());
if (currentSeq != null) {
- Long previousSeq = sequenceGen.generateNullable(row);
+ Long previousSeq = sequenceGen.generateWithoutPadding(row);
FieldAggregator aggregator = fieldAggregators.get(i);
if (previousSeq == null || currentSeq >= previousSeq) {
if (sequenceGen.index() == i) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 2b2d3f927..3073dc8d9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -53,6 +53,8 @@ import
org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -82,6 +84,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
private final Supplier<RecordEqualiser> valueEqualiserSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;
+ private final TableSchema schema;
private final CoreOptions options;
private final FileIO fileIO;
private final RowType keyType;
@@ -132,6 +135,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
this.keyComparatorSupplier = keyComparatorSupplier;
this.valueEqualiserSupplier = valueEqualiserSupplier;
this.mfFactory = mfFactory;
+ this.schema = schemaManager.schema(schemaId);
this.options = options;
}
@@ -180,6 +184,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.commitForceCompact(),
options.changelogProducer(),
restoreIncrement,
+ SequenceGenerator.create(schema, options),
getWriterMetrics(partition, bucket));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 2bacb1373..11e557f17 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -38,7 +38,6 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKindGenerator;
-import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.KeyValueTableRead;
@@ -187,23 +186,18 @@ class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
String commitUser, ManifestCacheFilter manifestFilter) {
TableSchema schema = schema();
CoreOptions options = store().options();
- final SequenceGenerator sequenceGenerator =
SequenceGenerator.create(schema, options);
- final RowKindGenerator rowKindGenerator =
RowKindGenerator.create(schema, options);
- final KeyValue kv = new KeyValue();
+ RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema,
options);
+ KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
record -> {
InternalRow row = record.row();
- long sequenceNumber =
- sequenceGenerator == null
- ? KeyValue.UNKNOWN_SEQUENCE
- : sequenceGenerator.generate(row);
RowKind rowKind =
rowKindGenerator == null
? row.getRowKind()
: rowKindGenerator.generate(row);
- return kv.replace(record.primaryKey(), sequenceNumber,
rowKind, row);
+ return kv.replace(record.primaryKey(),
KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
});
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
index d6aaa2864..9c6b81159 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
@@ -45,7 +45,6 @@ import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** Generate sequence number. */
@@ -108,11 +107,11 @@ public class SequenceGenerator {
}
@Nullable
- public Long generateNullable(InternalRow row) {
+ public Long generateWithoutPadding(InternalRow row) {
return generator.generateNullable(row, index);
}
- public long generate(InternalRow row) {
+ public long generateWithPadding(InternalRow row, long incrSeq) {
long sequence = generator.generate(row, index);
for (SequenceAutoPadding padding : paddings) {
switch (padding) {
@@ -120,10 +119,10 @@ public class SequenceGenerator {
sequence = addRowKindFlag(sequence, row.getRowKind());
break;
case SECOND_TO_MICRO:
- sequence = secondToMicro(sequence);
+ sequence = secondToMicro(sequence, incrSeq);
break;
case MILLIS_TO_MICRO:
- sequence = millisToMicro(sequence);
+ sequence = millisToMicro(sequence, incrSeq);
break;
default:
throw new UnsupportedOperationException(
@@ -137,27 +136,15 @@ public class SequenceGenerator {
return (sequence << 1) | (rowKind.isAdd() ? 1 : 0);
}
- private long millisToMicro(long sequence) {
+ private long millisToMicro(long sequence, long incrSeq) {
// Generated value is millis
- return sequence * 1_000 + getCurrentMicroOfMillis();
+ return sequence * 1_000 + (incrSeq % 1_000);
}
- private long secondToMicro(long sequence) {
+ private long secondToMicro(long sequence, long incrSeq) {
// timestamp returns millis
long second = fieldType.is(DataTypeFamily.TIMESTAMP) ? sequence / 1000
: sequence;
- return second * 1_000_000 + getCurrentMicroOfSeconds();
- }
-
- private static long getCurrentMicroOfMillis() {
- long currentNanoTime = System.nanoTime();
- long mills = TimeUnit.MILLISECONDS.convert(currentNanoTime,
TimeUnit.NANOSECONDS);
- return (currentNanoTime - mills * 1_000_000) / 1000;
- }
-
- private static long getCurrentMicroOfSeconds() {
- long currentNanoTime = System.nanoTime();
- long seconds = TimeUnit.SECONDS.convert(currentNanoTime,
TimeUnit.NANOSECONDS);
- return (currentNanoTime - seconds * 1_000_000_000) / 1000;
+ return second * 1_000_000 + (incrSeq % 1_000_000);
}
private interface Generator {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 1c859bf03..b1cacc4c6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -133,14 +133,6 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
return record;
}
- @VisibleForTesting
- public T writeAndReturnData(InternalRow row) throws Exception {
- SinkRecord record = toSinkRecord(row);
- T data = recordExtractor.extract(record);
- write.write(record.partition(), record.bucket(), data);
- return data;
- }
-
private SinkRecord toSinkRecord(InternalRow row) {
keyAndBucketExtractor.setRecord(row);
return new SinkRecord(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 1aa708306..d3cfc2af7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -426,6 +426,7 @@ public abstract class MergeTreeTestBase {
options.commitForceCompact(),
ChangelogProducer.NONE,
null,
+ null,
null);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 2bd3fe1a4..262320d18 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -46,7 +45,6 @@ import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
-import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.ReadBuilder;
@@ -74,7 +72,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -222,6 +219,7 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
new String[] {"pt", "a", "b", "sec", "non_time"});
GenericRow row1 = GenericRow.of(1, 10, 100, 1685530987,
BinaryString.fromString("a1"));
GenericRow row2 = GenericRow.of(1, 10, 101, 1685530987,
BinaryString.fromString("a2"));
+ GenericRow row3 = GenericRow.of(1, 10, 101, 1685530987,
BinaryString.fromString("a3"));
FileStoreTable table =
createFileStoreTable(
conf -> {
@@ -232,17 +230,33 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
},
rowType);
StreamTableWrite write = table.newWrite(commitUser);
- long sequenceNumber1 =
- ((TableWriteImpl<KeyValue>)
write).writeAndReturnData(row1).sequenceNumber();
- long sequenceNumber2 =
- ((TableWriteImpl<KeyValue>)
write).writeAndReturnData(row2).sequenceNumber();
- assertThat(TimeUnit.SECONDS.convert(sequenceNumber1,
TimeUnit.MICROSECONDS))
- .isEqualTo(1685530987);
- assertThat(TimeUnit.SECONDS.convert(sequenceNumber2,
TimeUnit.MICROSECONDS))
- .isEqualTo(1685530987);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ write.write(row1);
+ write.write(row2);
+ commit.commit(0, write.prepareCommit(false, 0));
+ write.write(row3);
+ commit.commit(1, write.prepareCommit(false, 1));
write.close();
+ commit.close();
- // Do not check results, they are unstable
+ ReadBuilder readBuilder = table.newReadBuilder();
+ Function<InternalRow, String> toString =
+ row ->
+ row.getInt(0)
+ + "|"
+ + row.getInt(1)
+ + "|"
+ + row.getInt(2)
+ + "|"
+ + row.getInt(3)
+ + "|"
+ + row.getString(4);
+ assertThat(
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ toString))
+
.isEqualTo(Collections.singletonList("1|10|101|1685530987|a3"));
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
index b8a5795dd..f15ed20c0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
@@ -37,7 +37,6 @@ import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import static
org.apache.paimon.CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO;
import static org.apache.paimon.CoreOptions.SequenceAutoPadding.ROW_KIND_FLAG;
@@ -141,23 +140,28 @@ public class SequenceGeneratorTest {
@Test
public void testGenerate() {
- assertThat(getGenerator("_id").generate(row)).isEqualTo(1);
- assertThat(getGenerator("pt").generate(row)).isEqualTo(1);
-
assertThat(getGenerator("_intsecond").generate(row)).isEqualTo(1685548953);
- assertThat(getGenerator("_tinyint").generate(row)).isEqualTo(2);
- assertThat(getGenerator("_smallint").generate(row)).isEqualTo(3);
-
assertThat(getGenerator("_bigint").generate(row)).isEqualTo(4000000000000L);
-
assertThat(getGenerator("_bigintmillis").generate(row)).isEqualTo(1685548953000L);
- assertThat(getGenerator("_float").generate(row)).isEqualTo(2);
- assertThat(getGenerator("_double").generate(row)).isEqualTo(3);
- assertThat(getGenerator("_string").generate(row)).isEqualTo(1);
- assertThat(getGenerator("_date").generate(row)).isEqualTo(375);
-
assertThat(getGenerator("_timestamp0").generate(row)).isEqualTo(1685548953000L);
-
assertThat(getGenerator("_timestamp3").generate(row)).isEqualTo(1685548953123L);
-
assertThat(getGenerator("_timestamp6").generate(row)).isEqualTo(1685548953123L);
- assertThat(getGenerator("_char").generate(row)).isEqualTo(3);
- assertThat(getGenerator("_varchar").generate(row)).isEqualTo(4);
-
assertThat(getGenerator("_localtimestamp").generate(row)).isEqualTo(1685548953123L);
+ assertThat(getGenerator("_id").generateWithPadding(row,
0)).isEqualTo(1);
+ assertThat(getGenerator("pt").generateWithPadding(row,
0)).isEqualTo(1);
+ assertThat(getGenerator("_intsecond").generateWithPadding(row,
0)).isEqualTo(1685548953);
+ assertThat(getGenerator("_tinyint").generateWithPadding(row,
0)).isEqualTo(2);
+ assertThat(getGenerator("_smallint").generateWithPadding(row,
0)).isEqualTo(3);
+ assertThat(getGenerator("_bigint").generateWithPadding(row,
0)).isEqualTo(4000000000000L);
+ assertThat(getGenerator("_bigintmillis").generateWithPadding(row, 0))
+ .isEqualTo(1685548953000L);
+ assertThat(getGenerator("_float").generateWithPadding(row,
0)).isEqualTo(2);
+ assertThat(getGenerator("_double").generateWithPadding(row,
0)).isEqualTo(3);
+ assertThat(getGenerator("_string").generateWithPadding(row,
0)).isEqualTo(1);
+ assertThat(getGenerator("_date").generateWithPadding(row,
0)).isEqualTo(375);
+ assertThat(getGenerator("_timestamp0").generateWithPadding(row, 0))
+ .isEqualTo(1685548953000L);
+ assertThat(getGenerator("_timestamp3").generateWithPadding(row, 0))
+ .isEqualTo(1685548953123L);
+ assertThat(getGenerator("_timestamp6").generateWithPadding(row, 0))
+ .isEqualTo(1685548953123L);
+ assertThat(getGenerator("_char").generateWithPadding(row,
0)).isEqualTo(3);
+ assertThat(getGenerator("_varchar").generateWithPadding(row,
0)).isEqualTo(4);
+ assertThat(getGenerator("_localtimestamp").generateWithPadding(row, 0))
+ .isEqualTo(1685548953123L);
assertUnsupportedDatatype("_boolean");
assertUnsupportedDatatype("_binary");
assertUnsupportedDatatype("_varbinary");
@@ -170,46 +174,37 @@ public class SequenceGeneratorTest {
@Test
public void testGenerateWithPadding() {
-
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("_id")))
- .isEqualTo(1);
-
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("pt")))
- .isEqualTo(1);
-
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("_intsecond")))
- .isEqualTo(1685548953);
-
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("_tinyint")))
- .isEqualTo(2);
-
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("_smallint")))
- .isEqualTo(3);
-
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_bigint")))
- .isEqualTo(4000000000000L);
-
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_bigintmillis")))
- .isEqualTo(1685548953000L);
-
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_float")))
- .isEqualTo(2);
-
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_double")))
- .isEqualTo(3);
-
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_string")))
- .isEqualTo(1);
-
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_date")))
- .isEqualTo(375);
-
assertThat(getSecondFromGeneratedWithPadding(generateWithPaddingOnSecond("_timestamp0")))
- .isEqualTo(1685548953L);
-
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_timestamp3")))
- .isEqualTo(1685548953123L);
-
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_timestamp6")))
- .isEqualTo(1685548953123L);
-
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_char")))
- .isEqualTo(3);
-
assertThat(getMillisFromGeneratedWithPadding(generateWithPaddingOnMillis("_varchar")))
- .isEqualTo(4);
- assertThat(
- getSecondFromGeneratedWithPadding(
-
generateWithPaddingOnSecond("_localtimestamp")))
- .isEqualTo(1685548953L);
- assertThat(
- getMillisFromGeneratedWithPadding(
-
generateWithPaddingOnMillis("_localtimestamp")))
- .isEqualTo(1685548953123L);
+ assertThat(generateWithPaddingOnSecond("_id", 5)).isEqualTo(1000005L);
+ assertThat(generateWithPaddingOnSecond("pt", 5)).isEqualTo(1000005L);
+
+ assertThat(generateWithPaddingOnSecond("_intsecond",
5)).isEqualTo(1685548953000005L);
+ assertThat(generateWithPaddingOnSecond("_tinyint",
5)).isEqualTo(2000005L);
+
+ assertThat(generateWithPaddingOnSecond("_smallint",
5)).isEqualTo(3000005L);
+
+ assertThat(generateWithPaddingOnMillis("_bigint",
5)).isEqualTo(4000000000000005L);
+
+ assertThat(generateWithPaddingOnMillis("_bigintmillis",
5)).isEqualTo(1685548953000005L);
+
+ assertThat(generateWithPaddingOnMillis("_float", 5)).isEqualTo(2005);
+
+ assertThat(generateWithPaddingOnMillis("_double", 5)).isEqualTo(3005);
+
+ assertThat(generateWithPaddingOnMillis("_string", 5)).isEqualTo(1005);
+
+ assertThat(generateWithPaddingOnMillis("_date", 5)).isEqualTo(375005);
+
+ assertThat(generateWithPaddingOnSecond("_timestamp0",
5)).isEqualTo(1685548953000005L);
+
+ assertThat(generateWithPaddingOnMillis("_timestamp3",
5)).isEqualTo(1685548953123005L);
+
+ assertThat(generateWithPaddingOnMillis("_timestamp6",
5)).isEqualTo(1685548953123005L);
+
+ assertThat(generateWithPaddingOnMillis("_char", 5)).isEqualTo(3005);
+
+ assertThat(generateWithPaddingOnMillis("_varchar", 5)).isEqualTo(4005);
+ assertThat(generateWithPaddingOnSecond("_localtimestamp",
5)).isEqualTo(1685548953000005L);
+ assertThat(generateWithPaddingOnMillis("_localtimestamp",
5)).isEqualTo(1685548953123005L);
assertUnsupportedDatatype("_boolean");
assertUnsupportedDatatype("_binary");
assertUnsupportedDatatype("_varbinary");
@@ -232,10 +227,11 @@ public class SequenceGeneratorTest {
assertThat(generateWithPaddingOnRowKind(maxMicros, RowKind.INSERT))
.isEqualTo(191235168000000001L);
- assertThat(generateWithPaddingOnMicrosAndRowKind(1L, RowKind.INSERT))
- .isBetween(2001L, 3999L);
- assertThat(generateWithPaddingOnMicrosAndRowKind(1L,
RowKind.UPDATE_BEFORE))
- .isBetween(2000L, 3998L);
+ long sequence = generateWithPaddingOnMicrosAndRowKind(1L, 20,
RowKind.INSERT);
+ assertThat(sequence).isEqualTo(2041);
+ sequence = generateWithPaddingOnMicrosAndRowKind(1L, 30,
RowKind.UPDATE_BEFORE);
+ System.out.println(sequence);
+ assertThat(sequence).isEqualTo(2060);
}
private SequenceGenerator getGenerator(String field) {
@@ -248,33 +244,29 @@ public class SequenceGeneratorTest {
}
private void assertUnsupportedDatatype(String field) {
- assertThatThrownBy(() -> getGenerator(field).generate(row))
+ assertThatThrownBy(() -> getGenerator(field).generateWithPadding(row,
0))
.isInstanceOf(UnsupportedOperationException.class);
}
- private long generateWithPaddingOnSecond(String field) {
- return getGenerator(field,
Collections.singletonList(SECOND_TO_MICRO)).generate(row);
- }
-
- private long getSecondFromGeneratedWithPadding(long generated) {
- return TimeUnit.SECONDS.convert(generated, TimeUnit.MICROSECONDS);
+ private long generateWithPaddingOnSecond(String field, long incrSeq) {
+ return getGenerator(field, Collections.singletonList(SECOND_TO_MICRO))
+ .generateWithPadding(row, incrSeq);
}
- private long generateWithPaddingOnMillis(String field) {
- return getGenerator(field,
Collections.singletonList(MILLIS_TO_MICRO)).generate(row);
+ private long generateWithPaddingOnMillis(String field, long incrSeq) {
+ return getGenerator(field, Collections.singletonList(MILLIS_TO_MICRO))
+ .generateWithPadding(row, incrSeq);
}
private long generateWithPaddingOnRowKind(long sequence, RowKind rowKind) {
return getGenerator("_bigint",
Collections.singletonList(ROW_KIND_FLAG))
- .generate(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0,
sequence));
+ .generateWithPadding(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0,
0, sequence), 0);
}
- private long generateWithPaddingOnMicrosAndRowKind(long sequence, RowKind
rowKind) {
+ private long generateWithPaddingOnMicrosAndRowKind(
+ long sequence, long incrSeq, RowKind rowKind) {
return getGenerator("_bigint", Arrays.asList(MILLIS_TO_MICRO,
ROW_KIND_FLAG))
- .generate(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0,
sequence));
- }
-
- private long getMillisFromGeneratedWithPadding(long generated) {
- return TimeUnit.MILLISECONDS.convert(generated, TimeUnit.MICROSECONDS);
+ .generateWithPadding(
+ GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0,
sequence), incrSeq);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 6ce55a9bd..caba2daa1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -142,7 +142,9 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
InternalRow key = keyProjection.apply(row);
long sequenceNumber =
- sequenceGenerator == null ? recordCount :
sequenceGenerator.generate(row);
+ sequenceGenerator == null
+ ? recordCount
+ : sequenceGenerator.generateWithPadding(row,
recordCount);
if (!buffer.put(sequenceNumber, rowKind, key, row)) {
flushBuffer();
if (!buffer.put(sequenceNumber, rowKind, key, row)) {