This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 272f11818 [Bug] Fix the lookup changelog producer may produce wrong
result (#1555)
272f11818 is described below
commit 272f118189d09bb6a1de521d0cfa0d4938087886
Author: Aitozi <[email protected]>
AuthorDate: Thu Jul 13 11:24:19 2023 +0800
[Bug] Fix the lookup changelog producer may produce wrong result (#1555)
---
.../mergetree/compact/LookupMergeFunction.java | 24 ++++++++++++++++------
.../table/ChangelogWithKeyFileStoreTable.java | 7 +++++--
.../LookupChangelogMergeFunctionWrapperTest.java | 10 +++++++--
.../paimon/flink/LookupChangelogWithAggITCase.java | 17 +++++++++++++++
4 files changed, 48 insertions(+), 10 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 254e5cb25..7d3d8c050 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -19,6 +19,8 @@
package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -34,12 +36,17 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
private final MergeFunction<KeyValue> mergeFunction;
private final LinkedList<KeyValue> candidates = new LinkedList<>();
+ private final InternalRowSerializer keySerializer;
+ private final InternalRowSerializer valueSerializer;
KeyValue highLevel;
boolean containLevel0;
- public LookupMergeFunction(MergeFunction<KeyValue> mergeFunction) {
+ public LookupMergeFunction(
+ MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType
valueType) {
this.mergeFunction = mergeFunction;
+ this.keySerializer = new InternalRowSerializer(keyType);
+ this.valueSerializer = new InternalRowSerializer(valueType);
}
@Override
@@ -51,7 +58,7 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
@Override
public void add(KeyValue kv) {
- candidates.add(kv);
+ candidates.add(kv.copy(keySerializer, valueSerializer));
}
@Override
@@ -77,8 +84,9 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
return mergeFunction.getResult();
}
- public static MergeFunctionFactory<KeyValue>
wrap(MergeFunctionFactory<KeyValue> wrapped) {
- return new Factory(wrapped);
+ public static MergeFunctionFactory<KeyValue> wrap(
+ MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType
valueType) {
+ return new Factory(wrapped, keyType, valueType);
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -86,14 +94,18 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
private static final long serialVersionUID = 1L;
private final MergeFunctionFactory<KeyValue> wrapped;
+ private final RowType keyType;
+ private final RowType rowType;
- private Factory(MergeFunctionFactory<KeyValue> wrapped) {
+ private Factory(MergeFunctionFactory<KeyValue> wrapped, RowType
keyType, RowType rowType) {
this.wrapped = wrapped;
+ this.keyType = keyType;
+ this.rowType = rowType;
}
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- return new LookupMergeFunction(wrapped.create(projection));
+ return new LookupMergeFunction(wrapped.create(projection),
keyType, rowType);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 705e5c2a9..512354f0f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -96,6 +96,8 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
CoreOptions options = new CoreOptions(conf);
CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
MergeFunctionFactory<KeyValue> mfFactory;
+ KeyValueFieldsExtractor extractor =
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
+
switch (mergeEngine) {
case DEDUPLICATE:
mfFactory = DeduplicateMergeFunction.factory();
@@ -117,10 +119,11 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
}
if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
- mfFactory = LookupMergeFunction.wrap(mfFactory);
+ mfFactory =
+ LookupMergeFunction.wrap(
+ mfFactory, new
RowType(extractor.keyFields(tableSchema)), rowType);
}
- KeyValueFieldsExtractor extractor =
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
lazyStore =
new KeyValueFileStore(
fileIO(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 56c62bf49..a468bbef5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -26,6 +26,7 @@ import
org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -53,7 +54,10 @@ public class LookupChangelogMergeFunctionWrapperTest {
Map<InternalRow, KeyValue> highLevel = new HashMap<>();
LookupChangelogMergeFunctionWrapper function =
new LookupChangelogMergeFunctionWrapper(
-
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+ LookupMergeFunction.wrap(
+ DeduplicateMergeFunction.factory(),
+ RowType.of(DataTypes.INT()),
+ RowType.of(DataTypes.INT())),
highLevel::get,
EQUALISER,
changelogRowDeduplicate);
@@ -207,7 +211,9 @@ public class LookupChangelogMergeFunctionWrapperTest {
},
new FieldAggregator[] {
new
FieldSumAgg(DataTypes.INT())
- })),
+ }),
+ RowType.of(DataTypes.INT()),
+ RowType.of(DataTypes.INT())),
key -> null,
EQUALISER,
changlogRowDeduplicate);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
index a0481eb9d..96f06860d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -76,4 +77,20 @@ public class LookupChangelogWithAggITCase extends
CatalogITCaseBase {
iterator.close();
}
+
+ @Test
+ public void testLookupChangelogProducerWithValueSwitch() throws Exception {
+ sql(
+ "CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ("
+ + "'bucket'='3', "
+ + "'changelog-producer'='lookup', "
+ + "'merge-engine'='aggregation', "
+ + "'fields.v.aggregate-function'='sum')");
+ BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM T");
+
+ sql("INSERT INTO T VALUES (1, 1), (2, 2), (1, 3), (1, 4), (1, 5)");
+ assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1,
13), Row.of(2, 2));
+
+ iterator.close();
+ }
}