This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1502e83bf [core] Redefine user defined sequence fields (#2945)
1502e83bf is described below
commit 1502e83bfcbb529557dc9fa4adc336ec30aa961d
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 6 16:40:07 2024 +0800
[core] Redefine user defined sequence fields (#2945)
---
.../concepts/primary-key-table/sequence-rowkind.md | 24 +-
.../shortcodes/generated/core_configuration.html | 6 -
.../main/java/org/apache/paimon/CoreOptions.java | 32 +--
.../main/java/org/apache/paimon/types/RowType.java | 23 ++
.../java/org/apache/paimon/KeyValueFileStore.java | 11 +
.../apache/paimon/mergetree/MergeTreeReaders.java | 3 +-
.../apache/paimon/mergetree/MergeTreeWriter.java | 12 +-
.../paimon/mergetree/SortBufferWriteBuffer.java | 33 ++-
.../compact/ChangelogMergeTreeRewriter.java | 14 +-
.../FullChangelogMergeTreeCompactRewriter.java | 5 +
.../compact/LookupMergeTreeCompactRewriter.java | 5 +
.../compact/MergeTreeCompactRewriter.java | 7 +
.../compact/PartialUpdateMergeFunction.java | 156 +++++++++++-
.../paimon/operation/KeyValueFileStoreRead.java | 43 +---
.../paimon/operation/KeyValueFileStoreWrite.java | 32 ++-
.../org/apache/paimon/schema/SchemaValidation.java | 22 +-
.../apache/paimon/table/query/LocalTableQuery.java | 2 +-
.../paimon/table/sink/SequenceGenerator.java | 236 ------------------
.../paimon/utils/UserDefinedSeqComparator.java | 81 ++++++
.../apache/paimon/mergetree/MergeTreeTestBase.java | 2 +
.../mergetree/SortBufferWriteBufferTestBase.java | 1 +
.../apache/paimon/schema/SchemaManagerTest.java | 2 +-
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 54 ----
.../paimon/table/sink/SequenceGeneratorTest.java | 272 ---------------------
.../apache/paimon/table/system/FilesTableTest.java | 10 +-
.../paimon/flink/lookup/FullCacheLookupTable.java | 53 ++--
.../paimon/flink/lookup/LookupStreamingReader.java | 12 +-
.../flink/lookup/NoPrimaryKeyLookupTable.java | 7 +-
.../paimon/flink/lookup/PrimaryKeyLookupTable.java | 8 +-
.../flink/lookup/SecondaryIndexLookupTable.java | 8 +-
.../paimon/flink/sink/LocalMergeOperator.java | 16 +-
.../paimon/flink/lookup/LookupTableTest.java | 70 ++----
.../flink/source/TestChangelogDataReadWrite.java | 24 +-
33 files changed, 483 insertions(+), 803 deletions(-)
diff --git a/docs/content/concepts/primary-key-table/sequence-rowkind.md
b/docs/content/concepts/primary-key-table/sequence-rowkind.md
index 80f20dce9..65df770bb 100644
--- a/docs/content/concepts/primary-key-table/sequence-rowkind.md
+++ b/docs/content/concepts/primary-key-table/sequence-rowkind.md
@@ -41,32 +41,18 @@ CREATE TABLE my_table (
pk BIGINT PRIMARY KEY NOT ENFORCED,
v1 DOUBLE,
v2 BIGINT,
- dt TIMESTAMP
+ update_time TIMESTAMP
) WITH (
- 'sequence.field' = 'dt'
+ 'sequence.field' = 'update_time'
);
```
{{< /tab >}}
{{< /tabs >}}
-The record with the largest `sequence.field` value will be the last to merge,
regardless of the input order.
+The record with the largest `sequence.field` value will be the last to merge,
if the values are the same, the input
+order will be used to determine which one is the last one.
-**Sequence Auto Padding**:
-
-When the record is updated or deleted, the `sequence.field` must become larger
and cannot remain unchanged.
-For -U and +U, their sequence-fields must be different. If you cannot meet
this requirement, Paimon provides
-option to automatically pad the sequence field for you.
-
-1. `'sequence.auto-padding' = 'row-kind-flag'`: If you are using same value
for -U and +U, just like "`op_ts`"
- (the time that the change was made in the database) in Mysql Binlog. It is
recommended to use the automatic
- padding for row kind flag, which will automatically distinguish between -U
(-D) and +U (+I).
-
-2. Insufficient precision: If the provided `sequence.field` doesn't meet the
precision, like a rough second or
- millisecond, you can set `sequence.auto-padding` to `second-to-micro` or
`millis-to-micro` so that the precision
- of sequence number will be made up to microsecond by incremental id
(Calculate within a single bucket).
-
-3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add
the micro to the second, and then
- pad the row kind flag.
+You can define multiple fields for `sequence.field`, for example
`'update_time,flag'`, multiple fields will be compared in order.
## Row Kind Field
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 2e25fe96b..a123d3a77 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -527,12 +527,6 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Long</td>
<td>Optional timestamp used in case of "from-timestamp" scan mode.
If there is no snapshot earlier than this time, the earliest snapshot will be
chosen.</td>
</tr>
- <tr>
- <td><h5>sequence.auto-padding</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>String</td>
- <td>Specify the way of padding precision, if the provided sequence
field is used to indicate "time" but doesn't meet the precise.<ul><li>You can
specific:</li><li>1. "row-kind-flag": Pads a bit flag to indicate whether it is
retract (0) or add (1) message.</li><li>2. "second-to-micro": Pads the sequence
field that indicates time with precision of seconds to micro-second.</li><li>3.
"millis-to-micro": Pads the sequence field that indicates time with precision
of milli-second t [...]
- </tr>
<tr>
<td><h5>sequence.field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 6430c919e..201525ab0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -486,26 +486,6 @@ public class CoreOptions implements Serializable {
"The field that generates the row kind for primary
key table,"
+ " the row kind determines which data is
'+I', '-U', '+U' or '-D'.");
- public static final ConfigOption<String> SEQUENCE_AUTO_PADDING =
- key("sequence.auto-padding")
- .stringType()
- .noDefaultValue()
- .withDescription(
- Description.builder()
- .text(
- "Specify the way of padding
precision, if the provided sequence field is used to indicate \"time\" but
doesn't meet the precise.")
- .list(
- text("You can specific:"),
- text(
- "1. \"row-kind-flag\":
Pads a bit flag to indicate whether it is retract (0) or add (1) message."),
- text(
- "2. \"second-to-micro\":
Pads the sequence field that indicates time with precision of seconds to
micro-second."),
- text(
- "3. \"millis-to-micro\":
Pads the sequence field that indicates time with precision of milli-second to
micro-second."),
- text(
- "4. Composite pattern: for
example, \"second-to-micro,row-kind-flag\"."))
- .build());
-
public static final ConfigOption<StartupMode> SCAN_MODE =
key("scan.mode")
.enumType(StartupMode.class)
@@ -1486,22 +1466,14 @@ public class CoreOptions implements Serializable {
return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
}
- public Optional<String> sequenceField() {
- return options.getOptional(SEQUENCE_FIELD);
+ public Optional<List<String>> sequenceField() {
+ return options.getOptional(SEQUENCE_FIELD).map(s ->
Arrays.asList(s.split(",")));
}
public Optional<String> rowkindField() {
return options.getOptional(ROWKIND_FIELD);
}
- public List<String> sequenceAutoPadding() {
- String padding = options.get(SEQUENCE_AUTO_PADDING);
- if (padding == null) {
- return Collections.emptyList();
- }
- return Arrays.asList(padding.split(","));
- }
-
public boolean writeOnly() {
return options.get(WRITE_ONLY);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
index 1462d330e..fe11976de 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
@@ -99,6 +99,29 @@ public final class RowType extends DataType {
return -1;
}
+ public boolean containsField(String fieldName) {
+ for (DataField field : fields) {
+ if (field.name().equals(fieldName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean notContainsField(String fieldName) {
+ return !containsField(fieldName);
+ }
+
+ public DataField getField(String fieldName) {
+ for (DataField field : fields) {
+ if (field.name().equals(fieldName)) {
+ return field;
+ }
+ }
+
+ throw new RuntimeException("Cannot find field: " + fieldName);
+ }
+
@Override
public DataType copy(boolean isNullable) {
return new RowType(
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 373bce35c..727bf8f82 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -37,10 +37,14 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.KeyComparatorSupplier;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.apache.paimon.utils.ValueEqualiserSupplier;
+import javax.annotation.Nullable;
+
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -123,6 +127,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
keyType,
valueType,
newKeyComparator(),
+ userDefinedSeqComparator(),
mfFactory,
newReaderFactoryBuilder());
}
@@ -140,6 +145,11 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
options);
}
+ @Nullable
+ private FieldsComparator userDefinedSeqComparator() {
+ return UserDefinedSeqComparator.create(valueType, options);
+ }
+
@Override
public KeyValueFileStoreWrite newWrite(String commitUser) {
return newWrite(commitUser, null);
@@ -159,6 +169,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
keyType,
valueType,
keyComparatorSupplier,
+ this::userDefinedSeqComparator,
valueEqualiserSupplier,
mfFactory,
pathFactory(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index 186371b81..1485b72b2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -47,6 +47,7 @@ public class MergeTreeReaders {
boolean dropDelete,
KeyValueFileReaderFactory readerFactory,
Comparator<InternalRow> userKeyComparator,
+ @Nullable FieldsComparator userDefinedSeqComparator,
MergeFunction<KeyValue> mergeFunction,
MergeSorter mergeSorter)
throws IOException {
@@ -58,7 +59,7 @@ public class MergeTreeReaders {
section,
readerFactory,
userKeyComparator,
- null,
+ userDefinedSeqComparator,
new
ReducerMergeFunctionWrapper(mergeFunction),
mergeSorter));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 046fad893..537d838d0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -33,9 +33,9 @@ import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
-import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.RecordWriter;
import javax.annotation.Nullable;
@@ -66,7 +66,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
private final KeyValueFileWriterFactory writerFactory;
private final boolean commitForceCompact;
private final ChangelogProducer changelogProducer;
- @Nullable private final SequenceGenerator sequenceGenerator;
+ @Nullable private final FieldsComparator userDefinedSeqComparator;
private final LinkedHashSet<DataFileMeta> newFiles;
private final LinkedHashSet<DataFileMeta> newFilesChangelog;
@@ -90,7 +90,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
boolean commitForceCompact,
ChangelogProducer changelogProducer,
@Nullable CommitIncrement increment,
- @Nullable SequenceGenerator sequenceGenerator) {
+ @Nullable FieldsComparator userDefinedSeqComparator) {
this.writeBufferSpillable = writeBufferSpillable;
this.sortMaxFan = sortMaxFan;
this.sortCompression = sortCompression;
@@ -104,7 +104,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
this.writerFactory = writerFactory;
this.commitForceCompact = commitForceCompact;
this.changelogProducer = changelogProducer;
- this.sequenceGenerator = sequenceGenerator;
+ this.userDefinedSeqComparator = userDefinedSeqComparator;
this.newFiles = new LinkedHashSet<>();
this.newFilesChangelog = new LinkedHashSet<>();
@@ -138,6 +138,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
new SortBufferWriteBuffer(
keyType,
valueType,
+ userDefinedSeqComparator,
memoryPool,
writeBufferSpillable,
sortMaxFan,
@@ -148,9 +149,6 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
@Override
public void write(KeyValue kv) throws Exception {
long sequenceNumber = newSequenceNumber();
- if (sequenceGenerator != null) {
- sequenceNumber = sequenceGenerator.generateWithPadding(kv.value(),
sequenceNumber);
- }
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(),
kv.key(), kv.value());
if (!success) {
flushWriteBuffer(false, false);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
index ae877f5cb..f0e9fcf07 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
@@ -40,6 +40,8 @@ import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.MutableObjectIterator;
import javax.annotation.Nullable;
@@ -61,6 +63,7 @@ public class SortBufferWriteBuffer implements WriteBuffer {
public SortBufferWriteBuffer(
RowType keyType,
RowType valueType,
+ @Nullable FieldsComparator userDefinedSeqComparator,
MemorySegmentPool memoryPool,
boolean spillable,
int sortMaxFan,
@@ -70,17 +73,33 @@ public class SortBufferWriteBuffer implements WriteBuffer {
this.valueType = valueType;
this.serializer = new KeyValueSerializer(keyType, valueType);
- // user key + sequenceNumber
- List<DataType> sortKeyTypes = new ArrayList<>(keyType.getFieldTypes());
- sortKeyTypes.add(new BigIntType(false));
+ // key fields
+ IntStream sortFields = IntStream.range(0, keyType.getFieldCount());
+
+ // user define sequence fields
+ if (userDefinedSeqComparator != null) {
+ IntStream udsFields =
+ IntStream.of(userDefinedSeqComparator.compareFields())
+ .map(operand -> operand + keyType.getFieldCount()
+ 2);
+ sortFields = IntStream.concat(sortFields, udsFields);
+ }
+
+ // sequence field
+ sortFields = IntStream.concat(sortFields,
IntStream.of(keyType.getFieldCount()));
+
+ int[] sortFieldArray = sortFields.toArray();
+
+ // row type
+ List<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes());
+ fieldTypes.add(new BigIntType(false));
+ fieldTypes.add(new TinyIntType(false));
+ fieldTypes.addAll(valueType.getFieldTypes());
- // for sort binary buffer
- int[] sortFields = IntStream.range(0, sortKeyTypes.size()).toArray();
NormalizedKeyComputer normalizedKeyComputer =
CodeGenUtils.newNormalizedKeyComputer(
- sortKeyTypes, sortFields, "MemTableKeyComputer");
+ fieldTypes, sortFieldArray, "MemTableKeyComputer");
RecordComparator keyComparator =
- CodeGenUtils.newRecordComparator(sortKeyTypes, sortFields,
"MemTableComparator");
+ CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray,
"MemTableComparator");
if (memoryPool.freePages() < 3) {
throw new IllegalArgumentException(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 99f5f052d..f338ee056 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -30,6 +30,9 @@ import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.utils.FieldsComparator;
+
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
@@ -51,9 +54,16 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
+ @Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter) {
- super(readerFactory, writerFactory, keyComparator, mfFactory,
mergeSorter);
+ super(
+ readerFactory,
+ writerFactory,
+ keyComparator,
+ userDefinedSeqComparator,
+ mfFactory,
+ mergeSorter);
this.maxLevel = maxLevel;
this.mergeEngine = mergeEngine;
}
@@ -112,7 +122,7 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
section,
readerFactory,
keyComparator,
- null,
+ userDefinedSeqComparator,
createMergeWrapper(outputLevel),
mergeSorter));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 553b0ff5d..3ed7a5d69 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -27,8 +27,11 @@ import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.Preconditions;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
@@ -48,6 +51,7 @@ public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRew
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
+ @Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
RecordEqualiser valueEqualiser,
@@ -58,6 +62,7 @@ public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRew
readerFactory,
writerFactory,
keyComparator,
+ userDefinedSeqComparator,
mfFactory,
mergeSorter);
this.valueEqualiser = valueEqualiser;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index b88c26b99..5335ba82a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -28,6 +28,9 @@ import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.utils.FieldsComparator;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -54,6 +57,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
+ @Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
MergeFunctionWrapperFactory<T> wrapperFactory) {
@@ -63,6 +67,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
readerFactory,
writerFactory,
keyComparator,
+ userDefinedSeqComparator,
mfFactory,
mergeSorter);
this.lookupLevels = lookupLevels;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index 228a3aa11..bd3338bf7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -30,6 +30,9 @@ import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.utils.FieldsComparator;
+
+import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
@@ -40,6 +43,7 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
protected final KeyValueFileReaderFactory readerFactory;
protected final KeyValueFileWriterFactory writerFactory;
protected final Comparator<InternalRow> keyComparator;
+ @Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final MergeFunctionFactory<KeyValue> mfFactory;
protected final MergeSorter mergeSorter;
@@ -47,11 +51,13 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
+ @Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter) {
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.keyComparator = keyComparator;
+ this.userDefinedSeqComparator = userDefinedSeqComparator;
this.mfFactory = mfFactory;
this.mergeSorter = mergeSorter;
}
@@ -72,6 +78,7 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
dropDelete,
readerFactory,
keyComparator,
+ userDefinedSeqComparator,
mfFactory.create(),
mergeSorter);
writer.write(new RecordReaderIterator<>(sectionsReader));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 6c7fa0567..dbf6dfd75 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -24,10 +24,23 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.options.Options;
-import org.apache.paimon.table.sink.SequenceGenerator;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeDefaultVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.utils.Projection;
import javax.annotation.Nullable;
@@ -144,9 +157,9 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
row.setField(i, field);
}
} else {
- Long currentSeq =
sequenceGen.generateWithoutPadding(kv.value());
+ Long currentSeq = sequenceGen.generate(kv.value());
if (currentSeq != null) {
- Long previousSeq = sequenceGen.generateWithoutPadding(row);
+ Long previousSeq = sequenceGen.generate(row);
if (previousSeq == null || currentSeq >= previousSeq) {
row.setField(
i, aggregator == null ? field :
aggregator.agg(accumulator, field));
@@ -162,9 +175,9 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
for (int i = 0; i < getters.length; i++) {
SequenceGenerator sequenceGen = fieldSequences.get(i);
if (sequenceGen != null) {
- Long currentSeq =
sequenceGen.generateWithoutPadding(kv.value());
+ Long currentSeq = sequenceGen.generate(kv.value());
if (currentSeq != null) {
- Long previousSeq = sequenceGen.generateWithoutPadding(row);
+ Long previousSeq = sequenceGen.generate(row);
FieldAggregator aggregator = fieldAggregators.get(i);
if (previousSeq == null || currentSeq >= previousSeq) {
if (sequenceGen.index() == i) {
@@ -391,4 +404,137 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
return fieldAggregators;
}
}
+
+ private static class SequenceGenerator {
+
+ private final int index;
+
+ private final Generator generator;
+ private final DataType fieldType;
+
+ private SequenceGenerator(String field, RowType rowType) {
+ index = rowType.getFieldNames().indexOf(field);
+ if (index == -1) {
+ throw new RuntimeException(
+ String.format(
+ "Can not find sequence field %s in table
schema: %s",
+ field, rowType));
+ }
+ fieldType = rowType.getTypeAt(index);
+ generator = fieldType.accept(new SequenceGeneratorVisitor());
+ }
+
+ public SequenceGenerator(int index, DataType dataType) {
+ this.index = index;
+
+ this.fieldType = dataType;
+ if (index == -1) {
+ throw new RuntimeException(String.format("Index : %s is
invalid", index));
+ }
+ generator = fieldType.accept(new SequenceGeneratorVisitor());
+ }
+
+ public int index() {
+ return index;
+ }
+
+ public DataType fieldType() {
+ return fieldType;
+ }
+
+ @Nullable
+ public Long generate(InternalRow row) {
+ return generator.generateNullable(row, index);
+ }
+
+ private interface Generator {
+ long generate(InternalRow row, int i);
+
+ @Nullable
+ default Long generateNullable(InternalRow row, int i) {
+ if (row.isNullAt(i)) {
+ return null;
+ }
+ return generate(row, i);
+ }
+ }
+
+ private static class SequenceGeneratorVisitor extends
DataTypeDefaultVisitor<Generator> {
+
+ @Override
+ public Generator visit(CharType charType) {
+ return stringGenerator();
+ }
+
+ @Override
+ public Generator visit(VarCharType varCharType) {
+ return stringGenerator();
+ }
+
+ private Generator stringGenerator() {
+ return (row, i) -> Long.parseLong(row.getString(i).toString());
+ }
+
+ @Override
+ public Generator visit(DecimalType decimalType) {
+ return (row, i) ->
+ InternalRowUtils.castToIntegral(
+ row.getDecimal(
+ i, decimalType.getPrecision(),
decimalType.getScale()));
+ }
+
+ @Override
+ public Generator visit(TinyIntType tinyIntType) {
+ return InternalRow::getByte;
+ }
+
+ @Override
+ public Generator visit(SmallIntType smallIntType) {
+ return InternalRow::getShort;
+ }
+
+ @Override
+ public Generator visit(IntType intType) {
+ return InternalRow::getInt;
+ }
+
+ @Override
+ public Generator visit(BigIntType bigIntType) {
+ return InternalRow::getLong;
+ }
+
+ @Override
+ public Generator visit(FloatType floatType) {
+ return (row, i) -> (long) row.getFloat(i);
+ }
+
+ @Override
+ public Generator visit(DoubleType doubleType) {
+ return (row, i) -> (long) row.getDouble(i);
+ }
+
+ @Override
+ public Generator visit(DateType dateType) {
+ return InternalRow::getInt;
+ }
+
+ @Override
+ public Generator visit(TimestampType timestampType) {
+ return (row, i) ->
+ row.getTimestamp(i,
timestampType.getPrecision()).getMillisecond();
+ }
+
+ @Override
+ public Generator visit(LocalZonedTimestampType
localZonedTimestampType) {
+ return (row, i) ->
+ row.getTimestamp(i,
localZonedTimestampType.getPrecision())
+ .getMillisecond();
+ }
+
+ @Override
+ protected Generator defaultMethod(DataType dataType) {
+ throw new UnsupportedOperationException("Unsupported type: " +
dataType);
+ }
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index e8accda5b..8852e38a3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -24,8 +24,6 @@ import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.format.FileFormatDiscover;
-import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.mergetree.DropDeleteReader;
@@ -41,12 +39,11 @@ import
org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.ProjectedRow;
import javax.annotation.Nullable;
@@ -72,6 +69,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
private final Comparator<InternalRow> keyComparator;
private final MergeFunctionFactory<KeyValue> mfFactory;
private final MergeSorter mergeSorter;
+ @Nullable private final FieldsComparator userDefinedSeqComparator;
@Nullable private int[][] keyProjectedFields;
@@ -84,49 +82,20 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
private boolean forceKeepDelete = false;
- public KeyValueFileStoreRead(
- FileIO fileIO,
- SchemaManager schemaManager,
- long schemaId,
- RowType keyType,
- RowType valueType,
- Comparator<InternalRow> keyComparator,
- MergeFunctionFactory<KeyValue> mfFactory,
- FileFormatDiscover formatDiscover,
- FileStorePathFactory pathFactory,
- KeyValueFieldsExtractor extractor,
- CoreOptions options) {
- this(
- schemaManager,
- schemaId,
- keyType,
- valueType,
- keyComparator,
- mfFactory,
- KeyValueFileReaderFactory.builder(
- fileIO,
- schemaManager,
- schemaId,
- keyType,
- valueType,
- formatDiscover,
- pathFactory,
- extractor,
- options));
- }
-
public KeyValueFileStoreRead(
SchemaManager schemaManager,
long schemaId,
RowType keyType,
RowType valueType,
Comparator<InternalRow> keyComparator,
+ @Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
KeyValueFileReaderFactory.Builder readerFactoryBuilder) {
this.tableSchema = schemaManager.schema(schemaId);
this.readerFactoryBuilder = readerFactoryBuilder;
this.keyComparator = keyComparator;
this.mfFactory = mfFactory;
+ this.userDefinedSeqComparator = userDefinedSeqComparator;
this.mergeSorter =
new MergeSorter(
CoreOptions.fromMap(tableSchema.options()), keyType,
valueType, null);
@@ -227,7 +196,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
batchMergeRead(
split.partition(), split.bucket(),
split.dataFiles(), false),
keyComparator,
- null,
+ userDefinedSeqComparator,
mergeSorter,
forceKeepDelete);
}
@@ -256,7 +225,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
? overlappedSectionFactory
: nonOverlappedSectionFactory,
keyComparator,
- null,
+ userDefinedSeqComparator,
mergeFuncWrapper,
mergeSorter));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 269ff2dc5..8fb99645b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -55,12 +55,12 @@ import
org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,9 +84,9 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
+ private final Supplier<FieldsComparator> udsComparatorSupplier;
private final Supplier<RecordEqualiser> valueEqualiserSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;
- private final TableSchema schema;
private final CoreOptions options;
private final FileIO fileIO;
private final RowType keyType;
@@ -100,6 +100,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
RowType keyType,
RowType valueType,
Supplier<Comparator<InternalRow>> keyComparatorSupplier,
+ Supplier<FieldsComparator> udsComparatorSupplier,
Supplier<RecordEqualiser> valueEqualiserSupplier,
MergeFunctionFactory<KeyValue> mfFactory,
FileStorePathFactory pathFactory,
@@ -114,6 +115,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
this.fileIO = fileIO;
this.keyType = keyType;
this.valueType = valueType;
+ this.udsComparatorSupplier = udsComparatorSupplier;
this.readerFactoryBuilder =
KeyValueFileReaderFactory.builder(
fileIO,
@@ -137,7 +139,6 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
this.keyComparatorSupplier = keyComparatorSupplier;
this.valueEqualiserSupplier = valueEqualiserSupplier;
this.mfFactory = mfFactory;
- this.schema = schemaManager.schema(schemaId);
this.options = options;
}
@@ -186,7 +187,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.commitForceCompact(),
options.changelogProducer(),
restoreIncrement,
- SequenceGenerator.create(schema, options));
+ UserDefinedSeqComparator.create(valueType, options));
}
@VisibleForTesting
@@ -204,7 +205,10 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
return new NoopCompactManager();
} else {
Comparator<InternalRow> keyComparator =
keyComparatorSupplier.get();
- CompactRewriter rewriter = createRewriter(partition, bucket,
keyComparator, levels);
+ @Nullable FieldsComparator userDefinedSeqComparator =
udsComparatorSupplier.get();
+ CompactRewriter rewriter =
+ createRewriter(
+ partition, bucket, keyComparator,
userDefinedSeqComparator, levels);
return new MergeTreeCompactManager(
compactExecutor,
levels,
@@ -220,7 +224,11 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
}
private MergeTreeCompactRewriter createRewriter(
- BinaryRow partition, int bucket, Comparator<InternalRow>
keyComparator, Levels levels) {
+ BinaryRow partition,
+ int bucket,
+ Comparator<InternalRow> keyComparator,
+ @Nullable FieldsComparator userDefinedSeqComparator,
+ Levels levels) {
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(partition, bucket);
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
@@ -235,6 +243,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
readerFactory,
writerFactory,
keyComparator,
+ userDefinedSeqComparator,
mfFactory,
mergeSorter,
valueEqualiserSupplier.get(),
@@ -253,6 +262,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
readerFactory,
writerFactory,
keyComparator,
+ userDefinedSeqComparator,
mfFactory,
mergeSorter,
new FirstRowMergeFunctionWrapperFactory());
@@ -265,6 +275,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
readerFactory,
writerFactory,
keyComparator,
+ userDefinedSeqComparator,
mfFactory,
mergeSorter,
new LookupMergeFunctionWrapperFactory(
@@ -273,7 +284,12 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
}
default:
return new MergeTreeCompactRewriter(
- readerFactory, writerFactory, keyComparator,
mfFactory, mergeSorter);
+ readerFactory,
+ writerFactory,
+ keyComparator,
+ userDefinedSeqComparator,
+ mfFactory,
+ mergeSorter);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 9fac38114..91ccf9c11 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -164,13 +164,13 @@ public class SchemaValidation {
}
}
- Optional<String> sequenceField = options.sequenceField();
+ Optional<List<String>> sequenceField = options.sequenceField();
sequenceField.ifPresent(
- field ->
+ fields ->
checkArgument(
- schema.fieldNames().contains(field),
- "Nonexistent sequence field: '%s'",
- field));
+ schema.fieldNames().containsAll(fields),
+ "Nonexistent sequence fields: '%s'",
+ fields));
Optional<String> rowkindField = options.rowkindField();
rowkindField.ifPresent(
@@ -181,11 +181,13 @@ public class SchemaValidation {
field));
sequenceField.ifPresent(
- field ->
- checkArgument(
- options.fieldAggFunc(field) == null,
- "Should not define aggregation on sequence
field: '%s'",
- field));
+ fields ->
+ fields.forEach(
+ field ->
+ checkArgument(
+ options.fieldAggFunc(field) ==
null,
+ "Should not define aggregation
on sequence field: '%s'",
+ field)));
CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 9f008eb39..3ed9a3ac7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -109,7 +109,7 @@ public class LocalTableQuery implements TableQuery {
int bucket,
List<DataFileMeta> beforeFiles,
List<DataFileMeta> dataFiles) {
- LookupLevels lookupLevels =
+ LookupLevels<KeyValue> lookupLevels =
tableView.computeIfAbsent(partition, k -> new
HashMap<>()).get(bucket);
if (lookupLevels == null) {
Preconditions.checkArgument(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
deleted file mode 100644
index 9c6b81159..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.SequenceAutoPadding;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.CharType;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypeDefaultVisitor;
-import org.apache.paimon.types.DataTypeFamily;
-import org.apache.paimon.types.DateType;
-import org.apache.paimon.types.DecimalType;
-import org.apache.paimon.types.DoubleType;
-import org.apache.paimon.types.FloatType;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.LocalZonedTimestampType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.SmallIntType;
-import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.TinyIntType;
-import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.InternalRowUtils;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/** Generate sequence number. */
-public class SequenceGenerator {
-
- private final int index;
- private final List<SequenceAutoPadding> paddings;
-
- private final Generator generator;
- private final DataType fieldType;
-
- public SequenceGenerator(String field, RowType rowType) {
- this(field, rowType, Collections.emptyList());
- }
-
- public SequenceGenerator(String field, RowType rowType,
List<SequenceAutoPadding> paddings) {
- index = rowType.getFieldNames().indexOf(field);
- this.paddings = paddings;
-
- if (index == -1) {
- throw new RuntimeException(
- String.format(
- "Can not find sequence field %s in table schema:
%s", field, rowType));
- }
- fieldType = rowType.getTypeAt(index);
- generator = fieldType.accept(new SequenceGeneratorVisitor());
- }
-
- public SequenceGenerator(int index, DataType dataType) {
- this.index = index;
- this.paddings = Collections.emptyList();
-
- this.fieldType = dataType;
- if (index == -1) {
- throw new RuntimeException(String.format("Index : %s is invalid",
index));
- }
- generator = fieldType.accept(new SequenceGeneratorVisitor());
- }
-
- @Nullable
- public static SequenceGenerator create(TableSchema schema, CoreOptions
options) {
- List<SequenceAutoPadding> sequenceAutoPadding =
- options.sequenceAutoPadding().stream()
- .map(SequenceAutoPadding::fromString)
- .collect(Collectors.toList());
- return options.sequenceField()
- .map(
- field ->
- new SequenceGenerator(
- field, schema.logicalRowType(),
sequenceAutoPadding))
- .orElse(null);
- }
-
- public int index() {
- return index;
- }
-
- public DataType fieldType() {
- return fieldType;
- }
-
- @Nullable
- public Long generateWithoutPadding(InternalRow row) {
- return generator.generateNullable(row, index);
- }
-
- public long generateWithPadding(InternalRow row, long incrSeq) {
- long sequence = generator.generate(row, index);
- for (SequenceAutoPadding padding : paddings) {
- switch (padding) {
- case ROW_KIND_FLAG:
- sequence = addRowKindFlag(sequence, row.getRowKind());
- break;
- case SECOND_TO_MICRO:
- sequence = secondToMicro(sequence, incrSeq);
- break;
- case MILLIS_TO_MICRO:
- sequence = millisToMicro(sequence, incrSeq);
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown sequence padding mode " + padding);
- }
- }
- return sequence;
- }
-
- private long addRowKindFlag(long sequence, RowKind rowKind) {
- return (sequence << 1) | (rowKind.isAdd() ? 1 : 0);
- }
-
- private long millisToMicro(long sequence, long incrSeq) {
- // Generated value is millis
- return sequence * 1_000 + (incrSeq % 1_000);
- }
-
- private long secondToMicro(long sequence, long incrSeq) {
- // timestamp returns millis
- long second = fieldType.is(DataTypeFamily.TIMESTAMP) ? sequence / 1000
: sequence;
- return second * 1_000_000 + (incrSeq % 1_000_000);
- }
-
- private interface Generator {
- long generate(InternalRow row, int i);
-
- @Nullable
- default Long generateNullable(InternalRow row, int i) {
- if (row.isNullAt(i)) {
- return null;
- }
- return generate(row, i);
- }
- }
-
- private static class SequenceGeneratorVisitor extends
DataTypeDefaultVisitor<Generator> {
-
- @Override
- public Generator visit(CharType charType) {
- return stringGenerator();
- }
-
- @Override
- public Generator visit(VarCharType varCharType) {
- return stringGenerator();
- }
-
- private Generator stringGenerator() {
- return (row, i) -> Long.parseLong(row.getString(i).toString());
- }
-
- @Override
- public Generator visit(DecimalType decimalType) {
- return (row, i) ->
- InternalRowUtils.castToIntegral(
- row.getDecimal(i, decimalType.getPrecision(),
decimalType.getScale()));
- }
-
- @Override
- public Generator visit(TinyIntType tinyIntType) {
- return InternalRow::getByte;
- }
-
- @Override
- public Generator visit(SmallIntType smallIntType) {
- return InternalRow::getShort;
- }
-
- @Override
- public Generator visit(IntType intType) {
- return InternalRow::getInt;
- }
-
- @Override
- public Generator visit(BigIntType bigIntType) {
- return InternalRow::getLong;
- }
-
- @Override
- public Generator visit(FloatType floatType) {
- return (row, i) -> (long) row.getFloat(i);
- }
-
- @Override
- public Generator visit(DoubleType doubleType) {
- return (row, i) -> (long) row.getDouble(i);
- }
-
- @Override
- public Generator visit(DateType dateType) {
- return InternalRow::getInt;
- }
-
- @Override
- public Generator visit(TimestampType timestampType) {
- return (row, i) -> row.getTimestamp(i,
timestampType.getPrecision()).getMillisecond();
- }
-
- @Override
- public Generator visit(LocalZonedTimestampType
localZonedTimestampType) {
- return (row, i) ->
- row.getTimestamp(i,
localZonedTimestampType.getPrecision()).getMillisecond();
- }
-
- @Override
- protected Generator defaultMethod(DataType dataType) {
- throw new UnsupportedOperationException("Unsupported type: " +
dataType);
- }
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
new file mode 100644
index 000000000..b1ea75ac1
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link FieldsComparator} for user defined sequence fields. */
+public class UserDefinedSeqComparator implements FieldsComparator {
+
+ private final int[] fields;
+ private final RecordComparator comparator;
+
+ public UserDefinedSeqComparator(int[] fields, RecordComparator comparator)
{
+ this.fields = fields;
+ this.comparator = comparator;
+ }
+
+ @Override
+ public int[] compareFields() {
+ return fields;
+ }
+
+ @Override
+ public int compare(InternalRow o1, InternalRow o2) {
+ return comparator.compare(o1, o2);
+ }
+
+ @Nullable
+ public static UserDefinedSeqComparator create(RowType rowType, CoreOptions
options) {
+ Optional<List<String>> sequenceField = options.sequenceField();
+ if (!sequenceField.isPresent()) {
+ return null;
+ }
+
+ List<String> fieldNames = rowType.getFieldNames();
+ int[] fields =
sequenceField.get().stream().mapToInt(fieldNames::indexOf).toArray();
+ RecordComparator comparator =
+ CodeGenUtils.newRecordComparator(
+ rowType.getFieldTypes(), fields,
"UserDefinedSeqComparator");
+ return new UserDefinedSeqComparator(fields, comparator);
+ }
+
+ @Nullable
+ public static UserDefinedSeqComparator create(RowType rowType,
List<String> sequenceFields) {
+ if (sequenceFields.isEmpty()) {
+ return null;
+ }
+
+ List<String> fieldNames = rowType.getFieldNames();
+ int[] fields =
sequenceFields.stream().mapToInt(fieldNames::indexOf).toArray();
+ RecordComparator comparator =
+ CodeGenUtils.newRecordComparator(
+ rowType.getFieldTypes(), fields,
"UserDefinedSeqComparator");
+ return new UserDefinedSeqComparator(fields, comparator);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 1aa708306..6e718a085 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -554,6 +554,7 @@ public abstract class MergeTreeTestBase {
dropDelete,
readerFactory,
comparator,
+ null,
DeduplicateMergeFunction.factory().create(),
new MergeSorter(options, null, null, null));
List<TestRecord> records = new ArrayList<>();
@@ -600,6 +601,7 @@ public abstract class MergeTreeTestBase {
dropDelete,
compactReaderFactory,
comparator,
+ null,
DeduplicateMergeFunction.factory().create(),
new MergeSorter(options, null, null, null));
writer.write(new RecordReaderIterator<>(sectionsReader));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 170a66779..0d873507f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -66,6 +66,7 @@ public abstract class SortBufferWriteBufferTestBase {
new RowType(Collections.singletonList(new DataField(0,
"key", new IntType()))),
new RowType(
Collections.singletonList(new DataField(1,
"value", new BigIntType()))),
+ null,
new HeapMemorySegmentPool(32 * 1024 * 3L, 32 * 1024),
false,
128,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 9c731ffea..db37284be 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -131,7 +131,7 @@ public class SchemaManagerTest {
"f4"),
""))))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("Nonexistent sequence field: 'f4'");
+ .hasMessageContaining("Nonexistent sequence fields: '[f4]'");
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index d561ce5ca..ff280c27e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -209,60 +209,6 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
"1|11|101|binary|varbinary|mapKey:mapVal|multiset"));
}
- @Test
- public void testPaddingSequenceNumber() throws Exception {
- RowType rowType =
- RowType.of(
- new DataType[] {
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.STRING()
- },
- new String[] {"pt", "a", "b", "sec", "non_time"});
- GenericRow row1 = GenericRow.of(1, 10, 100, 1685530987,
BinaryString.fromString("a1"));
- GenericRow row2 = GenericRow.of(1, 10, 101, 1685530987,
BinaryString.fromString("a2"));
- GenericRow row3 = GenericRow.of(1, 10, 101, 1685530987,
BinaryString.fromString("a3"));
- FileStoreTable table =
- createFileStoreTable(
- conf -> {
- conf.set(CoreOptions.SEQUENCE_FIELD, "sec");
- conf.set(
- CoreOptions.SEQUENCE_AUTO_PADDING,
-
CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO.toString());
- },
- rowType);
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
- write.write(row1);
- write.write(row2);
- commit.commit(0, write.prepareCommit(false, 0));
- write.write(row3);
- commit.commit(1, write.prepareCommit(false, 1));
- write.close();
- commit.close();
-
- ReadBuilder readBuilder = table.newReadBuilder();
- Function<InternalRow, String> toString =
- row ->
- row.getInt(0)
- + "|"
- + row.getInt(1)
- + "|"
- + row.getInt(2)
- + "|"
- + row.getInt(3)
- + "|"
- + row.getString(4);
- assertThat(
- getResult(
- readBuilder.newRead(),
- readBuilder.newScan().plan().splits(),
- toString))
-
.isEqualTo(Collections.singletonList("1|10|101|1685530987|a3"));
- }
-
@Test
public void testBatchReadWrite() throws Exception {
writeData();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
deleted file mode 100644
index f15ed20c0..000000000
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.GenericArray;
-import org.apache.paimon.data.GenericMap;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-
-import org.junit.jupiter.api.Test;
-
-import java.time.LocalDateTime;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static
org.apache.paimon.CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO;
-import static org.apache.paimon.CoreOptions.SequenceAutoPadding.ROW_KIND_FLAG;
-import static
org.apache.paimon.CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Test for {@link SequenceGenerator}. */
-public class SequenceGeneratorTest {
-
- private static final RowType ALL_DATA_TYPE =
- RowType.of(
- new DataType[] {
- DataTypes.INT(), // _id
- DataTypes.DECIMAL(2, 1), // pt
- DataTypes.INT(), // second
- DataTypes.BOOLEAN(),
- DataTypes.TINYINT(),
- DataTypes.SMALLINT(),
- DataTypes.BIGINT(),
- DataTypes.BIGINT(), // millis
- DataTypes.FLOAT(),
- DataTypes.DOUBLE(),
- DataTypes.STRING(),
- DataTypes.DATE(),
- DataTypes.TIMESTAMP(0),
- DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(6),
- DataTypes.CHAR(10),
- DataTypes.VARCHAR(20),
- DataTypes.BINARY(10),
- DataTypes.VARBINARY(20),
- DataTypes.BYTES(),
- DataTypes.TIME(),
- DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
- DataTypes.MAP(DataTypes.INT(), DataTypes.INT()),
- DataTypes.ARRAY(DataTypes.STRING()),
- DataTypes.MULTISET(DataTypes.VARCHAR(8))
- },
- new String[] {
- "_id",
- "pt",
- "_intsecond",
- "_boolean",
- "_tinyint",
- "_smallint",
- "_bigint",
- "_bigintmillis",
- "_float",
- "_double",
- "_string",
- "_date",
- "_timestamp0",
- "_timestamp3",
- "_timestamp6",
- "_char",
- "_varchar",
- "_binary",
- "_varbinary",
- "_bytes",
- "_time",
- "_localtimestamp",
- "_map",
- "_array",
- "_multiset",
- });
- private static final InternalRow row =
- GenericRow.of(
- 1,
- Decimal.fromUnscaledLong(10, 2, 1),
- 1685548953,
- true,
- (byte) 2,
- (short) 3,
- 4000000000000L,
- 1685548953000L,
- 2.81f,
- 3.678008,
- BinaryString.fromString("1"),
- 375, /* 1971-01-11 */
- Timestamp.fromEpochMillis(1685548953000L),
- Timestamp.fromEpochMillis(1685548953123L),
- Timestamp.fromMicros(1685548953123456L),
- BinaryString.fromString("3"),
- BinaryString.fromString("4"),
- "5".getBytes(),
- "6".getBytes(),
- "7".getBytes(),
- 123,
- Timestamp.fromMicros(1685548953123456L),
- new GenericMap(
- Collections.singletonMap(
- BinaryString.fromString("mapKey"),
- BinaryString.fromString("mapVal"))),
- new GenericArray(
- new BinaryString[] {
- BinaryString.fromString("a"),
BinaryString.fromString("b")
- }),
- new GenericMap(
-
Collections.singletonMap(BinaryString.fromString("multiset"), 1)));
-
- @Test
- public void testGenerate() {
- assertThat(getGenerator("_id").generateWithPadding(row,
0)).isEqualTo(1);
- assertThat(getGenerator("pt").generateWithPadding(row,
0)).isEqualTo(1);
- assertThat(getGenerator("_intsecond").generateWithPadding(row,
0)).isEqualTo(1685548953);
- assertThat(getGenerator("_tinyint").generateWithPadding(row,
0)).isEqualTo(2);
- assertThat(getGenerator("_smallint").generateWithPadding(row,
0)).isEqualTo(3);
- assertThat(getGenerator("_bigint").generateWithPadding(row,
0)).isEqualTo(4000000000000L);
- assertThat(getGenerator("_bigintmillis").generateWithPadding(row, 0))
- .isEqualTo(1685548953000L);
- assertThat(getGenerator("_float").generateWithPadding(row,
0)).isEqualTo(2);
- assertThat(getGenerator("_double").generateWithPadding(row,
0)).isEqualTo(3);
- assertThat(getGenerator("_string").generateWithPadding(row,
0)).isEqualTo(1);
- assertThat(getGenerator("_date").generateWithPadding(row,
0)).isEqualTo(375);
- assertThat(getGenerator("_timestamp0").generateWithPadding(row, 0))
- .isEqualTo(1685548953000L);
- assertThat(getGenerator("_timestamp3").generateWithPadding(row, 0))
- .isEqualTo(1685548953123L);
- assertThat(getGenerator("_timestamp6").generateWithPadding(row, 0))
- .isEqualTo(1685548953123L);
- assertThat(getGenerator("_char").generateWithPadding(row,
0)).isEqualTo(3);
- assertThat(getGenerator("_varchar").generateWithPadding(row,
0)).isEqualTo(4);
- assertThat(getGenerator("_localtimestamp").generateWithPadding(row, 0))
- .isEqualTo(1685548953123L);
- assertUnsupportedDatatype("_boolean");
- assertUnsupportedDatatype("_binary");
- assertUnsupportedDatatype("_varbinary");
- assertUnsupportedDatatype("_bytes");
- assertUnsupportedDatatype("_time");
- assertUnsupportedDatatype("_map");
- assertUnsupportedDatatype("_array");
- assertUnsupportedDatatype("_multiset");
- }
-
- @Test
- public void testGenerateWithPadding() {
- assertThat(generateWithPaddingOnSecond("_id", 5)).isEqualTo(1000005L);
- assertThat(generateWithPaddingOnSecond("pt", 5)).isEqualTo(1000005L);
-
- assertThat(generateWithPaddingOnSecond("_intsecond",
5)).isEqualTo(1685548953000005L);
- assertThat(generateWithPaddingOnSecond("_tinyint",
5)).isEqualTo(2000005L);
-
- assertThat(generateWithPaddingOnSecond("_smallint",
5)).isEqualTo(3000005L);
-
- assertThat(generateWithPaddingOnMillis("_bigint",
5)).isEqualTo(4000000000000005L);
-
- assertThat(generateWithPaddingOnMillis("_bigintmillis",
5)).isEqualTo(1685548953000005L);
-
- assertThat(generateWithPaddingOnMillis("_float", 5)).isEqualTo(2005);
-
- assertThat(generateWithPaddingOnMillis("_double", 5)).isEqualTo(3005);
-
- assertThat(generateWithPaddingOnMillis("_string", 5)).isEqualTo(1005);
-
- assertThat(generateWithPaddingOnMillis("_date", 5)).isEqualTo(375005);
-
- assertThat(generateWithPaddingOnSecond("_timestamp0",
5)).isEqualTo(1685548953000005L);
-
- assertThat(generateWithPaddingOnMillis("_timestamp3",
5)).isEqualTo(1685548953123005L);
-
- assertThat(generateWithPaddingOnMillis("_timestamp6",
5)).isEqualTo(1685548953123005L);
-
- assertThat(generateWithPaddingOnMillis("_char", 5)).isEqualTo(3005);
-
- assertThat(generateWithPaddingOnMillis("_varchar", 5)).isEqualTo(4005);
- assertThat(generateWithPaddingOnSecond("_localtimestamp",
5)).isEqualTo(1685548953000005L);
- assertThat(generateWithPaddingOnMillis("_localtimestamp",
5)).isEqualTo(1685548953123005L);
- assertUnsupportedDatatype("_boolean");
- assertUnsupportedDatatype("_binary");
- assertUnsupportedDatatype("_varbinary");
- assertUnsupportedDatatype("_bytes");
- assertUnsupportedDatatype("_time");
- assertUnsupportedDatatype("_map");
- assertUnsupportedDatatype("_array");
- assertUnsupportedDatatype("_multiset");
- }
-
- @Test
- public void testGenerateWithPaddingRowKind() {
- assertThat(generateWithPaddingOnRowKind(1L,
RowKind.INSERT)).isEqualTo(3);
- assertThat(generateWithPaddingOnRowKind(1L,
RowKind.UPDATE_AFTER)).isEqualTo(3);
- assertThat(generateWithPaddingOnRowKind(1L,
RowKind.UPDATE_BEFORE)).isEqualTo(2);
- assertThat(generateWithPaddingOnRowKind(1L,
RowKind.DELETE)).isEqualTo(2);
-
- long maxMicros =
-
Timestamp.fromLocalDateTime(LocalDateTime.parse("5000-01-01T00:00:00")).toMicros();
- assertThat(generateWithPaddingOnRowKind(maxMicros, RowKind.INSERT))
- .isEqualTo(191235168000000001L);
-
- long sequence = generateWithPaddingOnMicrosAndRowKind(1L, 20,
RowKind.INSERT);
- assertThat(sequence).isEqualTo(2041);
- sequence = generateWithPaddingOnMicrosAndRowKind(1L, 30,
RowKind.UPDATE_BEFORE);
- System.out.println(sequence);
- assertThat(sequence).isEqualTo(2060);
- }
-
- private SequenceGenerator getGenerator(String field) {
- return getGenerator(field, Collections.emptyList());
- }
-
- private SequenceGenerator getGenerator(
- String field, List<CoreOptions.SequenceAutoPadding> paddings) {
- return new SequenceGenerator(field, ALL_DATA_TYPE, paddings);
- }
-
- private void assertUnsupportedDatatype(String field) {
- assertThatThrownBy(() -> getGenerator(field).generateWithPadding(row,
0))
- .isInstanceOf(UnsupportedOperationException.class);
- }
-
- private long generateWithPaddingOnSecond(String field, long incrSeq) {
- return getGenerator(field, Collections.singletonList(SECOND_TO_MICRO))
- .generateWithPadding(row, incrSeq);
- }
-
- private long generateWithPaddingOnMillis(String field, long incrSeq) {
- return getGenerator(field, Collections.singletonList(MILLIS_TO_MICRO))
- .generateWithPadding(row, incrSeq);
- }
-
- private long generateWithPaddingOnRowKind(long sequence, RowKind rowKind) {
- return getGenerator("_bigint",
Collections.singletonList(ROW_KIND_FLAG))
- .generateWithPadding(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0,
0, sequence), 0);
- }
-
- private long generateWithPaddingOnMicrosAndRowKind(
- long sequence, long incrSeq, RowKind rowKind) {
- return getGenerator("_bigint", Arrays.asList(MILLIS_TO_MICRO,
ROW_KIND_FLAG))
- .generateWithPadding(
- GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0,
sequence), incrSeq);
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index 068d5d33a..afd660fd3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -173,8 +173,8 @@ public class FilesTableTest extends TableTestBase {
DataFileMeta file = fileEntry.file();
String minKey = String.valueOf(file.minKey().getInt(0));
String maxKey = String.valueOf(file.maxKey().getInt(0));
- String minSequenceNumber =
String.valueOf(file.minSequenceNumber());
- String maxSequenceNumber =
String.valueOf(file.maxSequenceNumber());
+ String minCol1 =
String.valueOf(file.valueStats().minValues().getInt(2));
+ String maxCol1 =
String.valueOf(file.valueStats().maxValues().getInt(2));
expectedRow.add(
GenericRow.of(
BinaryString.fromString(Arrays.toString(new
String[] {partition})),
@@ -191,12 +191,10 @@ public class FilesTableTest extends TableTestBase {
String.format("{col1=%s, pk=%s, pt=%s}",
0, 0, 0)),
BinaryString.fromString(
String.format(
- "{col1=%s, pk=%s, pt=%s}",
- minSequenceNumber, minKey,
partition)),
+ "{col1=%s, pk=%s, pt=%s}",
minCol1, minKey, partition)),
BinaryString.fromString(
String.format(
- "{col1=%s, pk=%s, pt=%s}",
- maxSequenceNumber, maxKey,
partition)),
+ "{col1=%s, pk=%s, pt=%s}",
maxCol1, maxKey, partition)),
file.minSequenceNumber(),
file.maxSequenceNumber(),
file.creationTime()));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 7f9b019bd..185cf554b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -31,12 +31,13 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.PartialRow;
import org.apache.paimon.utils.TypeUtils;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
import javax.annotation.Nullable;
@@ -46,8 +47,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-
-import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
+import java.util.concurrent.atomic.AtomicInteger;
/** Lookup table of full cache. */
public abstract class FullCacheLookupTable implements LookupTable {
@@ -55,7 +55,9 @@ public abstract class FullCacheLookupTable implements
LookupTable {
protected final Context context;
protected final RocksDBStateFactory stateFactory;
protected final RowType projectedType;
- private final boolean sequenceFieldEnabled;
+
+ @Nullable protected final FieldsComparator userDefinedSeqComparator;
+ protected final int appendUdsFieldNumber;
private LookupStreamingReader reader;
private Predicate specificPartition;
@@ -68,12 +70,31 @@ public abstract class FullCacheLookupTable implements
LookupTable {
context.table.coreOptions().toConfiguration(),
null);
FileStoreTable table = context.table;
- this.sequenceFieldEnabled =
- table.primaryKeys().size() > 0
- && new
CoreOptions(table.options()).sequenceField().isPresent();
+ List<String> sequenceFields = new ArrayList<>();
+ if (table.primaryKeys().size() > 0) {
+ new
CoreOptions(table.options()).sequenceField().ifPresent(sequenceFields::addAll);
+ }
RowType projectedType = TypeUtils.project(table.rowType(),
context.projection);
- if (sequenceFieldEnabled) {
- projectedType = projectedType.appendDataField(SEQUENCE_NUMBER,
DataTypes.BIGINT());
+ if (sequenceFields.size() > 0) {
+ RowType.Builder builder = RowType.builder();
+ projectedType.getFields().forEach(f -> builder.field(f.name(),
f.type()));
+ RowType rowType = table.rowType();
+ AtomicInteger appendUdsFieldNumber = new AtomicInteger(0);
+ sequenceFields.stream()
+ .filter(projectedType::notContainsField)
+ .map(rowType::getField)
+ .forEach(
+ f -> {
+ appendUdsFieldNumber.incrementAndGet();
+ builder.field(f.name(), f.type());
+ });
+ projectedType = builder.build();
+ this.userDefinedSeqComparator =
+ UserDefinedSeqComparator.create(projectedType,
sequenceFields);
+ this.appendUdsFieldNumber = appendUdsFieldNumber.get();
+ } else {
+ this.userDefinedSeqComparator = null;
+ this.appendUdsFieldNumber = 0;
}
this.projectedType = projectedType;
}
@@ -93,7 +114,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
IOManager.create(context.tempPath.toString()),
context.table.coreOptions());
Predicate predicate = projectedPredicate();
try (RecordReaderIterator<InternalRow> batch =
- new RecordReaderIterator<>(reader.nextBatch(true,
sequenceFieldEnabled))) {
+ new RecordReaderIterator<>(reader.nextBatch(true))) {
while (batch.hasNext()) {
InternalRow row = batch.next();
if (predicate == null || predicate.test(row)) {
@@ -124,11 +145,11 @@ public abstract class FullCacheLookupTable implements
LookupTable {
public void refresh() throws Exception {
while (true) {
try (RecordReaderIterator<InternalRow> batch =
- new RecordReaderIterator<>(reader.nextBatch(false,
sequenceFieldEnabled))) {
+ new RecordReaderIterator<>(reader.nextBatch(false))) {
if (!batch.hasNext()) {
return;
}
- refresh(batch, sequenceFieldEnabled);
+ refresh(batch);
}
}
}
@@ -136,21 +157,21 @@ public abstract class FullCacheLookupTable implements
LookupTable {
@Override
public final List<InternalRow> get(InternalRow key) throws IOException {
List<InternalRow> values = innerGet(key);
- if (!sequenceFieldEnabled) {
+ if (appendUdsFieldNumber == 0) {
return values;
}
List<InternalRow> dropSequence = new ArrayList<>(values.size());
for (InternalRow matchedRow : values) {
- dropSequence.add(new PartialRow(matchedRow.getFieldCount() - 1,
matchedRow));
+ dropSequence.add(
+ new PartialRow(matchedRow.getFieldCount() -
appendUdsFieldNumber, matchedRow));
}
return dropSequence;
}
public abstract List<InternalRow> innerGet(InternalRow key) throws
IOException;
- public abstract void refresh(Iterator<InternalRow> input, boolean
orderByLastField)
- throws IOException;
+ public abstract void refresh(Iterator<InternalRow> input) throws
IOException;
@Nullable
public Predicate projectedPredicate() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index ad6f72670..f12d78e80 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -36,7 +36,6 @@ import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FunctionWithIOException;
import org.apache.paimon.utils.TypeUtils;
@@ -55,7 +54,6 @@ import java.util.stream.IntStream;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PARALLELISM;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
-import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
/** A streaming reader to load data into {@link LookupTable}. */
public class LookupStreamingReader {
@@ -120,19 +118,13 @@ public class LookupStreamingReader {
return fileStoreTable.copy(newSchema);
}
- public RecordReader<InternalRow> nextBatch(boolean useParallelism, boolean
readSequenceNumber)
- throws Exception {
+ public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws
Exception {
List<Split> splits = scan.plan().splits();
CoreOptions options = CoreOptions.fromMap(table.options());
FunctionWithIOException<Split, RecordReader<InternalRow>>
readerSupplier =
- readSequenceNumber
- ? createReaderWithSequenceSupplier()
- : split -> readBuilder.newRead().createReader(split);
+ split -> readBuilder.newRead().createReader(split);
RowType readType = TypeUtils.project(table.rowType(), projection);
- if (readSequenceNumber) {
- readType = readType.appendDataField(SEQUENCE_NUMBER,
DataTypes.BIGINT());
- }
RecordReader<InternalRow> reader;
if (useParallelism) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index b931d18b4..7f5d036b1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -68,11 +68,10 @@ public class NoPrimaryKeyLookupTable extends
FullCacheLookupTable {
}
@Override
- public void refresh(Iterator<InternalRow> incremental, boolean
orderByLastField)
- throws IOException {
- if (orderByLastField) {
+ public void refresh(Iterator<InternalRow> incremental) throws IOException {
+ if (userDefinedSeqComparator != null) {
throw new IllegalArgumentException(
- "Append table does not support order by last field.");
+ "Append table does not support user defined sequence
fields.");
}
Predicate predicate = projectedPredicate();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 97034070c..889e1e35b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -91,16 +91,14 @@ public class PrimaryKeyLookupTable extends
FullCacheLookupTable {
}
@Override
- public void refresh(Iterator<InternalRow> incremental, boolean
orderByLastField)
- throws IOException {
+ public void refresh(Iterator<InternalRow> incremental) throws IOException {
Predicate predicate = projectedPredicate();
while (incremental.hasNext()) {
InternalRow row = incremental.next();
primaryKeyRow.replaceRow(row);
- if (orderByLastField) {
+ if (userDefinedSeqComparator != null) {
InternalRow previous = tableState.get(primaryKeyRow);
- int orderIndex = projectedType.getFieldCount() - 1;
- if (previous != null && previous.getLong(orderIndex) >
row.getLong(orderIndex)) {
+ if (previous != null &&
userDefinedSeqComparator.compare(previous, row) > 0) {
continue;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index d0731ebdf..d4fb22c4b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -72,8 +72,7 @@ public class SecondaryIndexLookupTable extends
PrimaryKeyLookupTable {
}
@Override
- public void refresh(Iterator<InternalRow> incremental, boolean
orderByLastField)
- throws IOException {
+ public void refresh(Iterator<InternalRow> incremental) throws IOException {
Predicate predicate = projectedPredicate();
while (incremental.hasNext()) {
InternalRow row = incremental.next();
@@ -81,11 +80,10 @@ public class SecondaryIndexLookupTable extends
PrimaryKeyLookupTable {
boolean previousFetched = false;
InternalRow previous = null;
- if (orderByLastField) {
+ if (userDefinedSeqComparator != null) {
previous = tableState.get(primaryKeyRow);
previousFetched = true;
- int orderIndex = projectedType.getFieldCount() - 1;
- if (previous != null && previous.getLong(orderIndex) >
row.getLong(orderIndex)) {
+ if (previous != null &&
userDefinedSeqComparator.compare(previous, row) > 0) {
continue;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index caba2daa1..a0529c575 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -31,12 +31,12 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.table.sink.RowKindGenerator;
-import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -62,7 +62,6 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
private transient RecordComparator keyComparator;
private transient long recordCount;
- private transient SequenceGenerator sequenceGenerator;
private transient RowKindGenerator rowKindGenerator;
private transient MergeFunction<KeyValue> mergeFunction;
@@ -88,12 +87,10 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
CoreOptions options = new CoreOptions(schema.options());
keyProjection =
- CodeGenUtils.newProjection(
- schema.logicalRowType(),
schema.projection(schema.primaryKeys()));
+ CodeGenUtils.newProjection(valueType,
schema.projection(schema.primaryKeys()));
keyComparator = new KeyComparatorSupplier(keyType).get();
recordCount = 0;
- sequenceGenerator = SequenceGenerator.create(schema, options);
rowKindGenerator = RowKindGenerator.create(schema, options);
mergeFunction =
PrimaryKeyTableUtils.createMergeFunctionFactory(
@@ -119,6 +116,7 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
new SortBufferWriteBuffer(
keyType,
valueType,
+ UserDefinedSeqComparator.create(valueType, options),
new HeapMemorySegmentPool(
options.localMergeBufferSize(),
options.pageSize()),
false,
@@ -141,13 +139,9 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
row.setRowKind(RowKind.INSERT);
InternalRow key = keyProjection.apply(row);
- long sequenceNumber =
- sequenceGenerator == null
- ? recordCount
- : sequenceGenerator.generateWithPadding(row,
recordCount);
- if (!buffer.put(sequenceNumber, rowKind, key, row)) {
+ if (!buffer.put(recordCount, rowKind, key, row)) {
flushBuffer();
- if (!buffer.put(sequenceNumber, rowKind, key, row)) {
+ if (!buffer.put(recordCount, rowKind, key, row)) {
// change row kind back
row.setRowKind(rowKind);
output.collect(record);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 3af6b722d..065ace85a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -130,7 +130,7 @@ public class LookupTableTest extends TableTestBase {
List<Pair<byte[], byte[]>> records = new ArrayList<>();
for (int i = 1; i <= 100_000; i++) {
InternalRow row = row(i, 11 * i, 111 * i);
- records.add(Pair.of(table.toKeyBytes(row),
table.toValueBytes(sequence(row, -1L))));
+ records.add(Pair.of(table.toKeyBytes(row),
table.toValueBytes(row)));
}
records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(),
o2.getKey()));
TableBulkLoader bulkLoader = table.createBulkLoader();
@@ -146,18 +146,16 @@ public class LookupTableTest extends TableTestBase {
}
// test refresh to update
- table.refresh(singletonList(sequence(row(1, 22, 222),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(1, 22, 222)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
// test refresh to delete
- table.refresh(
- singletonList(sequence(row(RowKind.DELETE, 1, 11, 111),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(RowKind.DELETE, 1, 11,
111)).iterator());
assertThat(table.get(row(1))).hasSize(0);
- table.refresh(
- singletonList(sequence(row(RowKind.DELETE, 3, 33, 333),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(RowKind.DELETE, 3, 33,
333)).iterator());
assertThat(table.get(row(3))).hasSize(0);
}
@@ -179,7 +177,7 @@ public class LookupTableTest extends TableTestBase {
List<Pair<byte[], byte[]>> records = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
- InternalRow row = sequence(row(i, 11 * i, 111 * i), -1L);
+ InternalRow row = row(i, 11 * i, 111 * i);
records.add(Pair.of(table.toKeyBytes(row),
table.toValueBytes(row)));
}
records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(),
o2.getKey()));
@@ -190,20 +188,19 @@ public class LookupTableTest extends TableTestBase {
bulkLoader.finish();
// test refresh to update
- table.refresh(singletonList(sequence(row(1, 22, 222), 1L)).iterator(),
true);
+ table.refresh(singletonList(row(1, 22, 222)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
// refresh with old sequence
- table.refresh(singletonList((sequence(row(1, 33, 333),
0L))).iterator(), true);
+ table.refresh(singletonList((row(1, 11, 333))).iterator());
result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
// test refresh delete data with old sequence
- table.refresh(
- singletonList(sequence(row(RowKind.DELETE, 1, 11, 111),
-1L)).iterator(), true);
+ table.refresh(singletonList(row(RowKind.DELETE, 1, 11,
111)).iterator());
assertThat(table.get(row(1))).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
}
@@ -222,21 +219,20 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
- table.refresh(singletonList(sequence(row(1, 11, 111),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(1, 11, 111)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 11, 111);
- table.refresh(singletonList(sequence(row(1, 22, 222),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(1, 22, 222)).iterator());
result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
- table.refresh(
- singletonList(sequence(row(RowKind.DELETE, 1, 11, 111),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(RowKind.DELETE, 1, 11,
111)).iterator());
assertThat(table.get(row(1))).hasSize(0);
- table.refresh(singletonList(sequence(row(3, 33, 333),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(3, 33, 333)).iterator());
assertThat(table.get(row(3))).hasSize(0);
}
@@ -254,12 +250,12 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
- table.refresh(singletonList(sequence(row(1, 11, 111),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(1, 11, 111)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 11, 111);
- table.refresh(singletonList(sequence(row(1, 22, 222),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(1, 22, 222)).iterator());
result = table.get(row(1));
assertThat(result).hasSize(0);
}
@@ -285,7 +281,7 @@ public class LookupTableTest extends TableTestBase {
for (int i = 1; i <= 100_000; i++) {
int secKey = rnd.nextInt(i);
InternalRow row = row(i, secKey, 111 * i);
- records.add(Pair.of(table.toKeyBytes(row),
table.toValueBytes(sequence(row, -1L))));
+ records.add(Pair.of(table.toKeyBytes(row),
table.toValueBytes(row)));
secKeyToPk.computeIfAbsent(secKey, k -> new HashSet<>()).add(i);
}
records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(),
o2.getKey()));
@@ -302,7 +298,7 @@ public class LookupTableTest extends TableTestBase {
}
// add new sec key to pk
- table.refresh(singletonList(sequence(row(1, 22, 222),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(1, 22, 222)).iterator());
List<InternalRow> result = table.get(row(22));
assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
}
@@ -345,21 +341,14 @@ public class LookupTableTest extends TableTestBase {
.containsExactlyInAnyOrderElementsOf(entry.getValue());
}
- JoinedRow joined = new JoinedRow();
// add new sec key to pk
- table.refresh(
- singletonList((InternalRow) joined.replace(row(1, 22, 222),
GenericRow.of(1L)))
- .iterator(),
- true);
+ table.refresh(singletonList(row(1, 22, 222)).iterator());
List<InternalRow> result = table.get(row(22));
assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
assertThat(result.stream().map(InternalRow::getFieldCount)).allMatch(n
-> n == 3);
// refresh with old value
- table.refresh(
- singletonList((InternalRow) joined.replace(row(1, 22, 333),
GenericRow.of(0L)))
- .iterator(),
- true);
+ table.refresh(singletonList(row(1, 11, 333)).iterator());
result = table.get(row(22));
assertThat(result.stream().map(row ->
row.getInt(2))).doesNotContain(333);
}
@@ -378,30 +367,29 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
- table.refresh(singletonList(sequence(row(1, 11, 111),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(1, 11, 111)).iterator());
List<InternalRow> result = table.get(row(11));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 11, 111);
- table.refresh(singletonList(sequence(row(1, 22, 222),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(1, 22, 222)).iterator());
assertThat(table.get(row(11))).hasSize(0);
result = table.get(row(22));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
- table.refresh(singletonList(sequence(row(2, 22, 222),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(2, 22, 222)).iterator());
result = table.get(row(22));
assertThat(result).hasSize(2);
assertRow(result.get(0), 1, 22, 222);
assertRow(result.get(1), 2, 22, 222);
- table.refresh(
- singletonList(sequence(row(RowKind.DELETE, 2, 22, 222),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(RowKind.DELETE, 2, 22,
222)).iterator());
result = table.get(row(22));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 22, 222);
- table.refresh(singletonList(sequence(row(3, 33, 333),
-1L)).iterator(), false);
+ table.refresh(singletonList(row(3, 33, 333)).iterator());
assertThat(table.get(row(33))).hasSize(0);
}
@@ -443,7 +431,7 @@ public class LookupTableTest extends TableTestBase {
}
// add new join key value
- table.refresh(singletonList(row(1, 22, 333)).iterator(), false);
+ table.refresh(singletonList(row(1, 22, 333)).iterator());
List<InternalRow> result = table.get(row(22));
assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
}
@@ -462,16 +450,16 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
- table.refresh(singletonList(row(1, 11, 333)).iterator(), false);
+ table.refresh(singletonList(row(1, 11, 333)).iterator());
List<InternalRow> result = table.get(row(11));
assertThat(result).hasSize(0);
- table.refresh(singletonList(row(1, 11, 111)).iterator(), false);
+ table.refresh(singletonList(row(1, 11, 111)).iterator());
result = table.get(row(11));
assertThat(result).hasSize(1);
assertRow(result.get(0), 1, 11, 111);
- table.refresh(singletonList(row(1, 11, 111)).iterator(), false);
+ table.refresh(singletonList(row(1, 11, 111)).iterator());
result = table.get(row(11));
assertThat(result).hasSize(2);
assertRow(result.get(0), 1, 11, 111);
@@ -596,10 +584,6 @@ public class LookupTableTest extends TableTestBase {
return row;
}
- private static InternalRow sequence(InternalRow row, long sequenceNumber) {
- return new JoinedRow(row.getRowKind(), row,
GenericRow.of(sequenceNumber));
- }
-
private static void assertRow(InternalRow resultRow, int... expected) {
int[] results = new int[expected.length];
for (int i = 0; i < results.length; i++) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 55a671879..745e92b68 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
@@ -117,19 +118,27 @@ public class TestChangelogDataReadWrite {
RecordReader.RecordIterator<KeyValue>,
RecordReader.RecordIterator<InternalRow>>
rowDataIteratorCreator) {
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
+ long schemaId = 0;
KeyValueFileStoreRead read =
new KeyValueFileStoreRead(
- LocalFileIO.create(),
- new SchemaManager(LocalFileIO.create(), tablePath),
- 0,
+ schemaManager,
+ schemaId,
KEY_TYPE,
VALUE_TYPE,
COMPARATOR,
+ null,
DeduplicateMergeFunction.factory(),
- ignore -> avro,
- pathFactory,
- EXTRACTOR,
- new CoreOptions(new HashMap<>()));
+ KeyValueFileReaderFactory.builder(
+ LocalFileIO.create(),
+ schemaManager,
+ schemaId,
+ KEY_TYPE,
+ VALUE_TYPE,
+ ignore -> avro,
+ pathFactory,
+ EXTRACTOR,
+ new CoreOptions(new HashMap<>())));
return new KeyValueTableRead(read, null) {
@Override
@@ -176,6 +185,7 @@ public class TestChangelogDataReadWrite {
KEY_TYPE,
VALUE_TYPE,
() -> COMPARATOR,
+ () -> null,
() -> EQUALISER,
DeduplicateMergeFunction.factory(),
pathFactory,