This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e102abd771 [flink] optimize flink Data-Evolution-Merge-Into (#7324)
e102abd771 is described below
commit e102abd771a6a1aa03362cf11e19482fef5a3812
Author: Faiz <[email protected]>
AuthorDate: Tue Mar 3 22:28:22 2026 +0800
[flink] optimize flink Data-Evolution-Merge-Into (#7324)
This PR optimize Data-Evolution-Merge-Into in several aspects:
1. Introduce a MergeIntoUpdateChecker to check if some global-indexed
columns are updated (This is same as Spark's implementation)
2. Use calcite to rename target table (current implementation is based
on regex, which is very unstable)
3. Use calcite to find _row_id field (if exists) in source table. We can
eliminate join process.
---
.../flink/action/DataEvolutionMergeIntoAction.java | 255 ++++++++++++++++-----
.../DataEvolutionPartialWriteOperator.java | 7 -
.../flink/dataevolution/FirstRowIdAssigner.java | 20 +-
.../dataevolution/MergeIntoUpdateChecker.java | 161 +++++++++++++
.../paimon/flink/utils/FlinkCalciteClasses.java | 48 +++-
.../action/DataEvolutionMergeIntoActionITCase.java | 203 ++++++++++++++--
6 files changed, 598 insertions(+), 96 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
index 8b0901ca88..57c56f6f50 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -25,13 +25,16 @@ import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.dataevolution.DataEvolutionPartialWriteOperator;
import org.apache.paimon.flink.dataevolution.FirstRowIdAssigner;
+import org.apache.paimon.flink.dataevolution.MergeIntoUpdateChecker;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
import org.apache.paimon.flink.sink.NoopCommittableStateManager;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.flink.sorter.SortOperator;
+import org.apache.paimon.flink.utils.FlinkCalciteClasses;
import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.DataField;
@@ -41,7 +44,6 @@ import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.StringUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.dag.Transformation;
@@ -49,8 +51,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
@@ -69,10 +70,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
@@ -95,7 +95,6 @@ import static
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValue
public class DataEvolutionMergeIntoAction extends TableActionBase {
private static final Logger LOG =
LoggerFactory.getLogger(DataEvolutionMergeIntoAction.class);
- public static final String IDENTIFIER_QUOTE = "`";
private final CoreOptions coreOptions;
@@ -120,6 +119,7 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
// merge condition
private String mergeCondition;
+ private MergeConditionParser mergeConditionParser;
// set statement
private String matchedUpdateSet;
@@ -137,6 +137,17 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
table.getClass().getName()));
}
+ Long latestSnapshotId = ((FileStoreTable)
table).snapshotManager().latestSnapshotId();
+ if (latestSnapshotId == null) {
+ throw new UnsupportedOperationException(
+ "merge-into action doesn't support updating an empty
table.");
+ }
+ table =
+ table.copy(
+ Collections.singletonMap(
+
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(),
+ latestSnapshotId.toString()));
+
this.coreOptions = ((FileStoreTable) table).coreOptions();
if (!coreOptions.dataEvolutionEnabled()) {
@@ -168,6 +179,12 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
public DataEvolutionMergeIntoAction withMergeCondition(String
mergeCondition) {
this.mergeCondition = mergeCondition;
+ try {
+ this.mergeConditionParser = new
MergeConditionParser(mergeCondition);
+ } catch (Exception e) {
+ LOG.error("Failed to parse merge condition: {}", mergeCondition,
e);
+ throw new RuntimeException("Failed to parse merge condition " +
mergeCondition, e);
+ }
return this;
}
@@ -196,7 +213,12 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
DataStream<Committable> written =
writePartialColumns(shuffled, sourceWithType.f1,
sinkParallelism);
// 4. commit
- DataStream<?> committed = commit(written);
+ Set<String> updatedColumns =
+ sourceWithType.f1.getFields().stream()
+ .map(DataField::name)
+ .filter(name ->
!SpecialFields.ROW_ID.name().equals(name))
+ .collect(Collectors.toSet());
+ DataStream<?> committed = commit(written, updatedColumns);
// execute internal
Transformation<?> transformations =
@@ -219,8 +241,7 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
List<String> project;
if (matchedUpdateSet.equals("*")) {
// if sourceName is qualified like 'default.S', we should build a
project like S.*
- String[] splits = sourceTable.split("\\.");
- project = Collections.singletonList(splits[splits.length - 1] +
".*");
+ project = Collections.singletonList(sourceTableName() + ".*");
} else {
// validate upsert changes
Map<String, String> changes =
parseCommaSeparatedKeyValues(matchedUpdateSet);
@@ -245,16 +266,38 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
.collect(Collectors.toList());
}
- // use join to find matched rows and assign row id for each source row.
- // _ROW_ID is the first field of joined table.
- String query =
- String.format(
- "SELECT %s, %s FROM %s INNER JOIN %s AS RT ON %s",
- "`RT`.`_ROW_ID` as `_ROW_ID`",
- String.join(",", project),
- escapedSourceName(),
- escapedRowTrackingTargetName(),
- rewriteMergeCondition(mergeCondition));
+ String query;
+ Optional<String> sourceRowIdField;
+ try {
+ sourceRowIdField =
mergeConditionParser.extractRowIdFieldFromSource(targetTableName());
+ } catch (Exception e) {
+ LOG.error("Error happened when extract row id field from source
table.", e);
+ throw new RuntimeException(
+ "Error happened when extract row id field from source
table.", e);
+ }
+
+ // if source table already contains _ROW_ID field, we could avoid join
+ if (sourceRowIdField.isPresent()) {
+ query =
+ String.format(
+ // cast _ROW_ID to BIGINT
+ "SELECT CAST(`%s`.`%s` AS BIGINT) AS `_ROW_ID`, %s
FROM %s",
+ sourceTableName(),
+ sourceRowIdField.get(),
+ String.join(",", project),
+ escapedSourceName());
+ } else {
+ // use join to find matched rows and assign row id for each source
row.
+ // _ROW_ID is the first field of joined table.
+ query =
+ String.format(
+ "SELECT %s, %s FROM %s INNER JOIN %s AS RT ON %s",
+ "`RT`.`_ROW_ID` as `_ROW_ID`",
+ String.join(",", project),
+ escapedSourceName(),
+ escapedRowTrackingTargetName(),
+ rewriteMergeCondition(mergeCondition));
+ }
LOG.info("Source query: {}", query);
@@ -286,11 +329,15 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
Preconditions.checkState(
!firstRowIds.isEmpty(), "Should not MERGE INTO an empty target
table.");
+ // if firstRowIds is not empty, there must be a valid nextRowId
+ long maxRowId = table.latestSnapshot().get().nextRowId() - 1;
+
OneInputTransformation<RowData, Tuple2<Long, RowData>>
assignedFirstRowId =
new OneInputTransformation<>(
sourceTransformation,
"ASSIGN FIRST_ROW_ID",
- new StreamMap<>(new FirstRowIdAssigner(firstRowIds,
sourceType)),
+ new StreamFlatMap<>(
+ new FirstRowIdAssigner(firstRowIds, maxRowId,
sourceType)),
new TupleTypeInfo<>(
BasicTypeInfo.LONG_TYPE_INFO,
sourceTransformation.getOutputType()),
sourceTransformation.getParallelism(),
@@ -334,9 +381,20 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
.setParallelism(sinkParallelism);
}
- public DataStream<Committable> commit(DataStream<Committable> written) {
+ public DataStream<Committable> commit(
+ DataStream<Committable> written, Set<String> updatedColumns) {
FileStoreTable storeTable = (FileStoreTable) table;
- OneInputStreamOperatorFactory<Committable, Committable>
committerOperator =
+
+ // Check if some global-indexed columns are updated
+ DataStream<Committable> checked =
+ written.transform(
+ "Updated Column Check",
+ new CommittableTypeInfo(),
+ new MergeIntoUpdateChecker(storeTable,
updatedColumns))
+ .setParallelism(1)
+ .setMaxParallelism(1);
+
+ CommitterOperatorFactory<Committable, ManifestCommittable>
committerOperator =
new CommitterOperatorFactory<>(
false,
true,
@@ -348,7 +406,7 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
context),
new NoopCommittableStateManager());
- return written.transform("COMMIT OPERATOR", new CommittableTypeInfo(),
committerOperator)
+ return checked.transform("COMMIT OPERATOR", new CommittableTypeInfo(),
committerOperator)
.setParallelism(1)
.setMaxParallelism(1);
}
@@ -382,28 +440,13 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
*/
@VisibleForTesting
public String rewriteMergeCondition(String mergeCondition) {
- // skip single and double-quoted chunks
- String skipQuoted = "'(?:''|[^'])*'" + "|\"(?:\"\"|[^\"])*\"";
- String targetTableRegex =
- "(?i)(?:\\b"
- + Pattern.quote(targetTableName())
- + "\\b|`"
- + Pattern.quote(targetTableName())
- + "`)\\s*\\.";
-
- Pattern pattern = Pattern.compile(skipQuoted + "|(" + targetTableRegex
+ ")");
- Matcher matcher = pattern.matcher(mergeCondition);
-
- StringBuffer sb = new StringBuffer();
- while (matcher.find()) {
- if (matcher.group(1) != null) {
- matcher.appendReplacement(sb,
Matcher.quoteReplacement("`RT`."));
- } else {
- matcher.appendReplacement(sb,
Matcher.quoteReplacement(matcher.group(0)));
- }
+ try {
+ Object rewrittenNode =
mergeConditionParser.rewriteSqlNode(targetTableName(), "RT");
+ return rewrittenNode.toString();
+ } catch (Exception e) {
+ LOG.error("Failed to rewrite merge condition: {}", mergeCondition,
e);
+ throw new RuntimeException("Failed to rewrite merge condition " +
mergeCondition, e);
}
- matcher.appendTail(sb);
- return sb.toString();
}
/**
@@ -432,7 +475,8 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
foundRowIdColumn = true;
Preconditions.checkState(
flinkColumn.getDataType().getLogicalType().getTypeRoot()
- == LogicalTypeRoot.BIGINT);
+ == LogicalTypeRoot.BIGINT,
+ "_ROW_ID field should be BIGINT type.");
} else {
DataField targetField =
targetFields.get(flinkColumn.getName());
if (targetField == null) {
@@ -497,6 +541,11 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
return targetAlias == null ? identifier.getObjectName() : targetAlias;
}
+ private String sourceTableName() {
+ String[] splits = sourceTable.split("\\.");
+ return splits[splits.length - 1];
+ }
+
private String escapedSourceName() {
return Arrays.stream(sourceTable.split("\\."))
.map(s -> String.format("`%s`", s))
@@ -514,28 +563,108 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
catalogName, identifier.getDatabaseName(),
identifier.getObjectName());
}
- private List<String> normalizeFieldName(List<String> fieldNames) {
- return
fieldNames.stream().map(this::normalizeFieldName).collect(Collectors.toList());
- }
+ /** The parser to parse merge condition through calcite sql parser. */
+ static class MergeConditionParser {
+
+ private final FlinkCalciteClasses calciteClasses;
+ private final Object sqlNode;
+
+ MergeConditionParser(String mergeCondition) throws Exception {
+ this.calciteClasses = new FlinkCalciteClasses();
+ this.sqlNode = initializeSqlNode(mergeCondition);
+ }
- private String normalizeFieldName(String fieldName) {
- if (StringUtils.isNullOrWhitespaceOnly(fieldName) ||
fieldName.endsWith(IDENTIFIER_QUOTE)) {
- return fieldName;
+ private Object initializeSqlNode(String mergeCondition) throws
Exception {
+ Object config =
+ calciteClasses
+ .configDelegate()
+ .withLex(
+
calciteClasses.sqlParserDelegate().config(),
+ calciteClasses.lexDelegate().java());
+ Object sqlParser =
calciteClasses.sqlParserDelegate().create(mergeCondition, config);
+ return
calciteClasses.sqlParserDelegate().parseExpression(sqlParser);
}
- String[] splitFieldNames = fieldName.split("\\.");
- if (!targetFieldNames.contains(splitFieldNames[splitFieldNames.length
- 1])) {
- return fieldName;
+ /**
+ * Rewrite the SQL node, replacing all references from the 'from'
table to the 'to' table.
+ */
+ public Object rewriteSqlNode(String from, String to) throws Exception {
+ return rewriteNode(sqlNode, from, to);
}
- return String.join(
- ".",
- Arrays.stream(splitFieldNames)
- .map(
- part ->
- part.endsWith(IDENTIFIER_QUOTE)
- ? part
- : IDENTIFIER_QUOTE + part +
IDENTIFIER_QUOTE)
- .toArray(String[]::new));
+ private Object rewriteNode(Object node, String from, String to) throws
Exception {
+ // It's a SqlBasicCall, recursively rewrite children operands
+ if
(calciteClasses.sqlBasicCallDelegate().instanceOfSqlBasicCall(node)) {
+ List<?> operandList =
calciteClasses.sqlBasicCallDelegate().getOperandList(node);
+ List<Object> newNodes = new java.util.ArrayList<>();
+ for (Object operand : operandList) {
+ newNodes.add(rewriteNode(operand, from, to));
+ }
+
+ Object operator =
calciteClasses.sqlBasicCallDelegate().getOperator(node);
+ Object parserPos =
calciteClasses.sqlBasicCallDelegate().getParserPosition(node);
+ Object functionQuantifier =
+
calciteClasses.sqlBasicCallDelegate().getFunctionQuantifier(node);
+ return calciteClasses
+ .sqlBasicCallDelegate()
+ .create(operator, newNodes, parserPos,
functionQuantifier);
+ } else if
(calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(node)) {
+ // It's a sql identifier, try to replace the table name
+ List<String> names =
calciteClasses.sqlIndentifierDelegate().getNames(node);
+ Preconditions.checkState(
+ names.size() >= 2, "Please specify the table name for
the column: " + node);
+ int nameLen = names.size();
+ if (names.get(nameLen - 2).equals(from)) {
+ return
calciteClasses.sqlIndentifierDelegate().setName(node, nameLen - 2, to);
+ }
+ return node;
+ } else {
+ return node;
+ }
+ }
+
+ /**
+ * Find the row id field in source table. This method looks for an
equality condition like
+ * `target_table._ROW_ID = source_table.some_field` or
`source_table.some_field =
+ * target_table._ROW_ID`, and returns the field name that is paired
with _ROW_ID.
+ */
+ public Optional<String> extractRowIdFieldFromSource(String
targetTable) throws Exception {
+ Object operator =
calciteClasses.sqlBasicCallDelegate().getOperator(sqlNode);
+ Object kind =
calciteClasses.sqlOperatorDelegate().getKind(operator);
+
+ if (kind == calciteClasses.sqlKindDelegate().equals()) {
+ List<?> operandList =
calciteClasses.sqlBasicCallDelegate().getOperandList(sqlNode);
+
+ Object left = operandList.get(0);
+ Object right = operandList.get(1);
+
+ if
(calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(left)
+ &&
calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(right)) {
+
+ List<String> leftNames =
calciteClasses.sqlIndentifierDelegate().getNames(left);
+ List<String> rightNames =
+
calciteClasses.sqlIndentifierDelegate().getNames(right);
+ Preconditions.checkState(
+ leftNames.size() >= 2,
+ "Please specify the table name for the column: " +
left);
+ Preconditions.checkState(
+ rightNames.size() >= 2,
+ "Please specify the table name for the column: " +
right);
+
+ if (leftNames.get(leftNames.size() -
1).equals(SpecialFields.ROW_ID.name())
+ && leftNames.get(leftNames.size() -
2).equals(targetTable)) {
+ return Optional.of(rightNames.get(rightNames.size() -
1));
+ } else if (rightNames
+ .get(rightNames.size() - 1)
+ .equals(SpecialFields.ROW_ID.name())
+ && rightNames.get(rightNames.size() -
2).equals(targetTable)) {
+ return Optional.of(leftNames.get(leftNames.size() -
1));
+ }
+ return Optional.empty();
+ }
+ }
+
+ return Optional.empty();
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
index 4f9ec8c643..f09bdefe28 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
@@ -35,7 +35,6 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.CommitIncrement;
@@ -72,7 +71,6 @@ public class DataEvolutionPartialWriteOperator
// dataType
private final RowType dataType;
- private final InternalRow.FieldGetter[] fieldGetters;
private final int rowIdIndex;
// data type excludes of _ROW_ID field.
@@ -101,11 +99,6 @@ public class DataEvolutionPartialWriteOperator
this.dataType =
SpecialFields.rowTypeWithRowId(table.rowType()).project(dataType.getFieldNames());
this.rowIdIndex =
this.dataType.getFieldIndex(SpecialFields.ROW_ID.name());
- this.fieldGetters = new
InternalRow.FieldGetter[dataType.getFieldCount()];
- List<DataField> fields = this.dataType.getFields();
- for (int i = 0; i < fields.size(); i++) {
- this.fieldGetters[i] =
InternalRow.createFieldGetter(fields.get(i).type(), i);
- }
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
index ad87c5b016..bf1c41a871 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
@@ -24,30 +24,38 @@ import org.apache.paimon.utils.MurmurHashUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
import java.util.List;
-/** Assign first row id for each row through binary search. */
-public class FirstRowIdAssigner extends RichMapFunction<RowData, Tuple2<Long,
RowData>> {
+/**
+ * Assign first row id for each row through binary search. Rows with invalid
row ids are filtered
+ * out.
+ */
+public class FirstRowIdAssigner extends RichFlatMapFunction<RowData,
Tuple2<Long, RowData>> {
private final FirstRowIdLookup firstRowIdLookup;
+ private final long maxRowId;
private final int rowIdFieldIndex;
- public FirstRowIdAssigner(List<Long> firstRowIds, RowType rowType) {
+ public FirstRowIdAssigner(List<Long> firstRowIds, long maxRowId, RowType
rowType) {
this.firstRowIdLookup = new FirstRowIdLookup(firstRowIds);
+ this.maxRowId = maxRowId;
this.rowIdFieldIndex =
rowType.getFieldNames().indexOf(SpecialFields.ROW_ID.name());
Preconditions.checkState(this.rowIdFieldIndex >= 0, "Do not found
_ROW_ID column.");
}
@Override
- public Tuple2<Long, RowData> map(RowData value) throws Exception {
+ public void flatMap(RowData value, Collector<Tuple2<Long, RowData>> out)
throws Exception {
long rowId = value.getLong(rowIdFieldIndex);
- return new Tuple2<>(firstRowIdLookup.lookup(rowId), value);
+ if (rowId >= 0 && rowId <= maxRowId) {
+ out.collect(new Tuple2<>(firstRowIdLookup.lookup(rowId), value));
+ }
}
/** The Key Selector to get firstRowId from tuple2. */
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java
new file mode 100644
index 0000000000..94392a84ec
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The checker for merge into update result. It will check each committable to
see if some
+ * global-indexed columns are updated. It will take some actions according to
{@link
+ * CoreOptions#GLOBAL_INDEX_COLUMN_UPDATE_ACTION}.
+ */
+public class MergeIntoUpdateChecker extends
BoundedOneInputOperator<Committable, Committable> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MergeIntoUpdateChecker.class);
+
+ private final FileStoreTable table;
+ private final Set<String> updatedColumns;
+
+ private transient Set<BinaryRow> affectedPartitions;
+
+ public MergeIntoUpdateChecker(FileStoreTable table, Set<String>
updatedColumns) {
+ this.table = table;
+ this.updatedColumns = updatedColumns;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ affectedPartitions = new HashSet<>();
+
+ Preconditions.checkState(
+
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks() == 1,
+ "Parallelism of MergeIntoUpdateChecker must be 1.");
+ }
+
+ @Override
+ public void processElement(StreamRecord<Committable> element) throws
Exception {
+ affectedPartitions.add(element.getValue().commitMessage().partition());
+ output.collect(element);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ checkUpdatedColumns();
+ }
+
+ private void checkUpdatedColumns() {
+ Optional<Snapshot> latestSnapshot = table.latestSnapshot();
+ RowType rowType = table.rowType();
+ Preconditions.checkState(latestSnapshot.isPresent());
+
+ List<IndexManifestEntry> affectedEntries =
+ table.store()
+ .newIndexFileHandler()
+ .scan(
+ latestSnapshot.get(),
+ entry -> {
+ GlobalIndexMeta globalIndexMeta =
+
entry.indexFile().globalIndexMeta();
+ if (globalIndexMeta != null) {
+ String fieldName =
+
rowType.getField(globalIndexMeta.indexFieldId())
+ .name();
+ return
updatedColumns.contains(fieldName)
+ &&
affectedPartitions.contains(entry.partition());
+ }
+ return false;
+ });
+
+ if (!affectedEntries.isEmpty()) {
+ CoreOptions.GlobalIndexColumnUpdateAction updateAction =
+ table.coreOptions().globalIndexColumnUpdateAction();
+ switch (updateAction) {
+ case THROW_ERROR:
+ Set<String> conflictedColumns =
+ affectedEntries.stream()
+ .map(file ->
file.indexFile().globalIndexMeta().indexFieldId())
+ .map(id -> rowType.getField(id).name())
+ .collect(Collectors.toSet());
+
+ throw new RuntimeException(
+ String.format(
+ "MergeInto: update columns contain
globally indexed columns, not supported now.\n"
+ + "Updated columns: %s\nConflicted
columns: %s\n",
+ updatedColumns, conflictedColumns));
+ case DROP_PARTITION_INDEX:
+ Map<BinaryRow, List<IndexFileMeta>> entriesByParts =
+ affectedEntries.stream()
+ .collect(
+ Collectors.groupingBy(
+
IndexManifestEntry::partition,
+ Collectors.mapping(
+
IndexManifestEntry::indexFile,
+
Collectors.toList())));
+
+ for (Map.Entry<BinaryRow, List<IndexFileMeta>> entry :
+ entriesByParts.entrySet()) {
+ LOG.debug(
+ "Dropping index files {} due to indexed fields
update.",
+ entry.getValue());
+
+ CommitMessage commitMessage =
+ new CommitMessageImpl(
+ entry.getKey(),
+ 0,
+ null,
+
DataIncrement.deleteIndexIncrement(entry.getValue()),
+ CompactIncrement.emptyIncrement());
+
+ Committable committable = new
Committable(Long.MAX_VALUE, commitMessage);
+
+ output.collect(new StreamRecord<>(committable));
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported
option: " + updateAction);
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
index 5410dc3533..0ae93fdde6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
@@ -74,7 +74,7 @@ public class FlinkCalciteClasses {
}
}
- public FlinkCalciteClasses() throws ClassNotFoundException {
+ public FlinkCalciteClasses() throws ClassNotFoundException,
NoSuchFieldException {
sqlNodeListDelegate = new SqlNodeListDelegate();
sqlLiteralDelegate = new SqlLiteralDelegate();
sqlBasicCallDelegate = new SqlBasicCallDelegate();
@@ -232,8 +232,11 @@ public class FlinkCalciteClasses {
static final String CLASS_NAME = "org.apache.calcite.sql.SqlBasicCall";
private final Class<?> clazz;
+ private final Class<?> sqlParserPosClass;
+
public SqlBasicCallDelegate() throws ClassNotFoundException {
this.clazz = loadCalciteClass(CLASS_NAME);
+ this.sqlParserPosClass =
loadCalciteClass("org.apache.calcite.sql.parser.SqlParserPos");
}
public Object getOperator(Object basicCall) throws Exception {
@@ -244,6 +247,32 @@ public class FlinkCalciteClasses {
return (List<?>)
invokeMethod(clazz, basicCall, "getOperandList", new
Class[0], new Object[0]);
}
+
+ public boolean instanceOfSqlBasicCall(Object sqlNode) throws Exception
{
+ return clazz.isAssignableFrom(sqlNode.getClass());
+ }
+
+ public Object getParserPosition(Object basicCall) throws Exception {
+ return invokeMethod(clazz, basicCall, "getParserPosition", new
Class[0], new Object[0]);
+ }
+
+ public Object getFunctionQuantifier(Object basicCall) throws Exception
{
+ return invokeMethod(
+ clazz, basicCall, "getFunctionQuantifier", new Class[0],
new Object[0]);
+ }
+
+ public Object create(
+ Object operator, List<?> operands, Object parserPos, Object
functionQuantifier)
+ throws Exception {
+ java.lang.reflect.Constructor<?> constructor =
+ clazz.getConstructor(
+ loadCalciteClass(SqlOperatorDelegate.SQL_OPERATOR),
+ List.class,
+ sqlParserPosClass,
+
loadCalciteClass("org.apache.calcite.sql.SqlLiteral"));
+ constructor.setAccessible(true);
+ return constructor.newInstance(operator, operands, parserPos,
functionQuantifier);
+ }
}
/** Accessing org.apache.calcite.sql.SqlOperator by Reflection. */
@@ -288,14 +317,29 @@ public class FlinkCalciteClasses {
public static class SqlIdentifierDelegate {
private static final String SQL_IDENTIFIER =
"org.apache.calcite.sql.SqlIdentifier";
private final Class<?> identifierClazz;
+ private final Field namesField;
- public SqlIdentifierDelegate() throws ClassNotFoundException {
+ public SqlIdentifierDelegate() throws ClassNotFoundException,
NoSuchFieldException {
this.identifierClazz = loadCalciteClass(SQL_IDENTIFIER);
+ this.namesField = identifierClazz.getField("names");
}
public boolean instanceOfSqlIdentifier(Object sqlNode) throws
Exception {
return identifierClazz.isAssignableFrom(sqlNode.getClass());
}
+
+ public List<String> getNames(Object sqlIdentifier) throws
IllegalAccessException {
+ return (List<String>) namesField.get(sqlIdentifier);
+ }
+
+ public Object setName(Object sqlIdentifier, int i, String name) throws
Exception {
+ return invokeMethod(
+ identifierClazz,
+ sqlIdentifier,
+ "setName",
+ new Class[] {int.class, String.class},
+ new Object[] {i, name});
+ }
}
/** Accessing org.apache.calcite.sql.SqlNodeList by Reflection. */
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
index 18fa729787..e5fac565c6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Assertions;
@@ -36,7 +38,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Optional;
import java.util.stream.Stream;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
@@ -48,7 +50,11 @@ import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.sEnv;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** ITCase for {@link DataEvolutionMergeIntoAction}. */
public class DataEvolutionMergeIntoActionITCase extends ActionITCaseBase {
@@ -146,7 +152,7 @@ public class DataEvolutionMergeIntoActionITCase extends
ActionITCaseBase {
DataEvolutionMergeIntoActionBuilder builder =
builder(warehouse, targetDb, "T")
.withMergeCondition("T.id=S.id")
-
.withMatchedUpdateSet("T.name=S.name,T.value=S.`value`")
+
.withMatchedUpdateSet("T.value=S.`value`,T.name=S.name")
.withSourceTable("S")
.withSinkParallelism(2);
@@ -418,30 +424,191 @@ public class DataEvolutionMergeIntoActionITCase extends
ActionITCaseBase {
expected);
}
+ @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+ @MethodSource("testArguments")
+ public void testRowIdColumnContainedInSource(boolean inDefault, String
invoker)
+ throws Exception {
+ String targetDb = inDefault ? database : "test_db";
+ if (!inDefault) {
+ // create target table in a new database
+ sEnv.executeSql("DROP TABLE T");
+ sEnv.executeSql("CREATE DATABASE test_db");
+ sEnv.executeSql("USE test_db");
+ bEnv.executeSql("USE test_db");
+ prepareTargetTable();
+ }
+
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 2, "new_name1", 100.1),
+ changelogRow("+I", 3, "name3", 0.3),
+ changelogRow("+I", 4, "name4", 0.4),
+ changelogRow("+I", 8, "new_name7", null),
+ changelogRow("+I", 9, "name9", 0.9),
+ changelogRow("+I", 12, "new_name11", 101.1),
+ changelogRow("+I", 16, null, 101.1),
+ changelogRow("+I", 19, "new_name18", 101.8));
+
+ if (invoker.equals("action")) {
+ DataEvolutionMergeIntoActionBuilder builder =
+ builder(warehouse, targetDb, "T")
+ .withMergeCondition("T._ROW_ID=S.id")
+
.withMatchedUpdateSet("T.name=S.name,T.value=S.`value`")
+ .withSourceTable("S")
+ .withSinkParallelism(2);
+
+ builder.build().run();
+ } else {
+ String procedureStatement =
+ String.format(
+ "CALL sys.data_evolution_merge_into('%s.T', '',
'', 'S', 'T._ROW_ID=S.id', 'name=S.name,value=S.`value`', 2)",
+ targetDb);
+
+ executeSQL(procedureStatement, false, true);
+ }
+
+ testBatchRead(
+ "SELECT id, name, `value` FROM T$row_tracking where _ROW_ID in
(1, 2, 3, 7, 8, 11, 15, 18)",
+ expected);
+ }
+
@Test
- public void testRewriteMergeCondition() throws Exception {
- Map<String, String> config = new HashMap<>();
- config.put("warehouse", warehouse);
- DataEvolutionMergeIntoAction action =
- new DataEvolutionMergeIntoAction(database, "T", config);
+ public void testUpdateAction() throws Exception {
+
+ // create index on 01-22 partition
+ executeSQL(
+ "CALL sys.create_global_index(`table` => 'default.T',
index_column => 'id', index_type => 'btree')",
+ false,
+ true);
+
+ assertTrue(indexFileExists("T"));
+
+ // 1. update indexed columns should throw an error by default
+ assertThatThrownBy(
+ () ->
+ executeSQL(
+ String.format(
+ "CALL
sys.data_evolution_merge_into('%s.T', '', '', 'S', 'T._ROW_ID=S.id',
'name=S.name,id=1', 2)",
+ database),
+ false,
+ true))
+ .rootCause()
+ .hasMessageContaining(
+ "MergeInto: update columns contain globally indexed
columns, not supported now.");
+
+ insertInto(
+ "T",
+ "(31, 'name31', 3.1, '01-23')",
+ "(32, 'name32', 3.2, '01-23')",
+ "(33, 'name33', 3.3, '01-23')",
+ "(34, 'name34', 3.4, '01-23')",
+ "(35, 'name35', 3.5, '01-23')",
+ "(36, 'name36', 3.6, '01-23')",
+ "(37, 'name37', 3.7, '01-23')",
+ "(38, 'name38', 3.8, '01-23')",
+ "(39, 'name39', 3.9, '01-23')",
+ "(40, 'name30', 3.0, '01-23')");
+
+ insertInto("S", "(35, 'new_name25', 125.1)");
+
+ // 2. updating unindexed partitions is not affected
+ assertDoesNotThrow(
+ () ->
+ executeSQL(
+ String.format(
+ "CALL
sys.data_evolution_merge_into('%s.T', 'TempT', "
+ + "'CREATE TEMPORARY VIEW SS
AS SELECT id, name, `value` FROM S WHERE id > 20',"
+ + " 'SS', 'TempT.id=SS.id',
'id=SS.id,value=SS.`value`', 2)",
+ database),
+ false,
+ true));
+
+ // 3. alter table's UpdateAction option to DROP_INDEX
+ executeSQL(
+ "ALTER TABLE T SET ('global-index.column-update-action' =
'DROP_PARTITION_INDEX')",
+ false,
+ true);
+
+ assertDoesNotThrow(
+ () ->
+ executeSQL(
+ String.format(
+ "CALL
sys.data_evolution_merge_into('%s.T', '', '', 'S', 'T._ROW_ID=S.id',
'name=S.name,id=1', 2)",
+ database),
+ false,
+ true));
+
+ assertFalse(indexFileExists("T"));
+ }
+
+ private boolean indexFileExists(String tableName) throws Exception {
+ FileStoreTable table = getFileStoreTable(tableName);
- String mergeCondition = "T.id=S.id";
- assertEquals("`RT`.id=S.id",
action.rewriteMergeCondition(mergeCondition));
+ List<IndexManifestEntry> entries =
table.store().newIndexFileHandler().scan("btree");
- mergeCondition = "`T`.id=S.id";
- assertEquals("`RT`.id=S.id",
action.rewriteMergeCondition(mergeCondition));
+ return !entries.isEmpty();
+ }
- mergeCondition = "t.id = s.id AND T.pt = s.pt";
+ @Test
+ public void mergeConditionParserTest() throws Exception {
+ // 1. test rewrite table names
+ // basic rewrite
+ String mergeCondition = "T.id = S.id";
+ DataEvolutionMergeIntoAction.MergeConditionParser parser =
createParser(mergeCondition);
+ assertEquals("`RT`.`id` = `S`.`id`", parser.rewriteSqlNode("T",
"RT").toString());
+
+ // should recognize quotes
+ mergeCondition = "T.id = s.id AND T.pt = s.pt";
+ parser = createParser(mergeCondition);
assertEquals(
- "`RT`.id = s.id AND `RT`.pt = s.pt",
action.rewriteMergeCondition(mergeCondition));
+ "`RT`.`id` = `s`.`id` AND `RT`.`pt` = `s`.`pt`",
+ parser.rewriteSqlNode("T", "RT").toString());
- mergeCondition = "TT.id = 1 AND T.id = 2";
- assertEquals("TT.id = 1 AND `RT`.id = 2",
action.rewriteMergeCondition(mergeCondition));
+ // should not rewrite column names
+ mergeCondition = "`T`.id = `T1`.`T` and T.`T.T` = S.a";
+ parser = createParser(mergeCondition);
+ assertEquals(
+ "`RT`.`id` = `T1`.`T` AND `RT`.`T.T` = `S`.`a`",
+ parser.rewriteSqlNode("T", "RT").toString());
- mergeCondition = "TT.id = 'T.id' AND T.id = \"T.id\"";
+ // should not rewrite literals
+ mergeCondition = "T.id = S.id AND S.str_col = 'T.id' AND T.`value` =
1";
+ parser = createParser(mergeCondition);
assertEquals(
- "TT.id = 'T.id' AND `RT`.id = \"T.id\"",
- action.rewriteMergeCondition(mergeCondition));
+ "`RT`.`id` = `S`.`id` AND `S`.`str_col` = 'T.id' AND
`RT`.`value` = 1",
+ parser.rewriteSqlNode("T", "RT").toString());
+
+ // 2. test extract row id condition
+ Optional<String> rowIdColumn;
+
+ mergeCondition = "T._ROW_ID = S.id";
+ parser = createParser(mergeCondition);
+ rowIdColumn = parser.extractRowIdFieldFromSource("T");
+ assertTrue(rowIdColumn.isPresent());
+ assertEquals("id", rowIdColumn.get());
+
+ mergeCondition = "S.id = T._ROW_ID";
+ parser = createParser(mergeCondition);
+ rowIdColumn = parser.extractRowIdFieldFromSource("T");
+ assertTrue(rowIdColumn.isPresent());
+ assertEquals("id", rowIdColumn.get());
+
+ // target table not matches
+ mergeCondition = "S.id = T._ROW_ID";
+ parser = createParser(mergeCondition);
+ rowIdColumn = parser.extractRowIdFieldFromSource("S");
+ assertFalse(rowIdColumn.isPresent());
+
+ // for simplicity, compounded condition is not considered now.
+ mergeCondition = "S.id = T._ROW_ID AND T.id = 1";
+ parser = createParser(mergeCondition);
+ rowIdColumn = parser.extractRowIdFieldFromSource("T");
+ assertFalse(rowIdColumn.isPresent());
+ }
+
+ private DataEvolutionMergeIntoAction.MergeConditionParser
createParser(String mergeCondition)
+ throws Exception {
+ return new
DataEvolutionMergeIntoAction.MergeConditionParser(mergeCondition);
}
@Test