This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e102abd771 [flink] optimize flink Data-Evolution-Merge-Into (#7324)
e102abd771 is described below

commit e102abd771a6a1aa03362cf11e19482fef5a3812
Author: Faiz <[email protected]>
AuthorDate: Tue Mar 3 22:28:22 2026 +0800

    [flink] optimize flink Data-Evolution-Merge-Into (#7324)
    
    This PR optimize Data-Evolution-Merge-Into in several aspects:
    1. Introduce a MergeIntoUpdateChecker to check if some global-indexed
    columns are updated (This is same as Spark's implementation)
    2. Use calcite to rename target table (current implementation is based
    on regex, which is very unstable)
    3. Use calcite to find _row_id field (if exists) in source table. We can
    eliminate join process.
---
 .../flink/action/DataEvolutionMergeIntoAction.java | 255 ++++++++++++++++-----
 .../DataEvolutionPartialWriteOperator.java         |   7 -
 .../flink/dataevolution/FirstRowIdAssigner.java    |  20 +-
 .../dataevolution/MergeIntoUpdateChecker.java      | 161 +++++++++++++
 .../paimon/flink/utils/FlinkCalciteClasses.java    |  48 +++-
 .../action/DataEvolutionMergeIntoActionITCase.java | 203 ++++++++++++++--
 6 files changed, 598 insertions(+), 96 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
index 8b0901ca88..57c56f6f50 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -25,13 +25,16 @@ import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.dataevolution.DataEvolutionPartialWriteOperator;
 import org.apache.paimon.flink.dataevolution.FirstRowIdAssigner;
+import org.apache.paimon.flink.dataevolution.MergeIntoUpdateChecker;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.CommittableTypeInfo;
 import org.apache.paimon.flink.sink.CommitterOperatorFactory;
 import org.apache.paimon.flink.sink.NoopCommittableStateManager;
 import org.apache.paimon.flink.sink.StoreCommitter;
 import org.apache.paimon.flink.sorter.SortOperator;
+import org.apache.paimon.flink.utils.FlinkCalciteClasses;
 import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.types.DataField;
@@ -41,7 +44,6 @@ import org.apache.paimon.types.DataTypeFamily;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.dag.Transformation;
@@ -49,8 +51,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamFlatMap;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableResult;
@@ -69,10 +70,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
@@ -95,7 +95,6 @@ import static 
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValue
 public class DataEvolutionMergeIntoAction extends TableActionBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DataEvolutionMergeIntoAction.class);
-    public static final String IDENTIFIER_QUOTE = "`";
 
     private final CoreOptions coreOptions;
 
@@ -120,6 +119,7 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
 
     // merge condition
     private String mergeCondition;
+    private MergeConditionParser mergeConditionParser;
 
     // set statement
     private String matchedUpdateSet;
@@ -137,6 +137,17 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
                             table.getClass().getName()));
         }
 
+        Long latestSnapshotId = ((FileStoreTable) 
table).snapshotManager().latestSnapshotId();
+        if (latestSnapshotId == null) {
+            throw new UnsupportedOperationException(
+                    "merge-into action doesn't support updating an empty 
table.");
+        }
+        table =
+                table.copy(
+                        Collections.singletonMap(
+                                
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(),
+                                latestSnapshotId.toString()));
+
         this.coreOptions = ((FileStoreTable) table).coreOptions();
 
         if (!coreOptions.dataEvolutionEnabled()) {
@@ -168,6 +179,12 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
 
     public DataEvolutionMergeIntoAction withMergeCondition(String 
mergeCondition) {
         this.mergeCondition = mergeCondition;
+        try {
+            this.mergeConditionParser = new 
MergeConditionParser(mergeCondition);
+        } catch (Exception e) {
+            LOG.error("Failed to parse merge condition: {}", mergeCondition, 
e);
+            throw new RuntimeException("Failed to parse merge condition " + 
mergeCondition, e);
+        }
         return this;
     }
 
@@ -196,7 +213,12 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
         DataStream<Committable> written =
                 writePartialColumns(shuffled, sourceWithType.f1, 
sinkParallelism);
         // 4. commit
-        DataStream<?> committed = commit(written);
+        Set<String> updatedColumns =
+                sourceWithType.f1.getFields().stream()
+                        .map(DataField::name)
+                        .filter(name -> 
!SpecialFields.ROW_ID.name().equals(name))
+                        .collect(Collectors.toSet());
+        DataStream<?> committed = commit(written, updatedColumns);
 
         // execute internal
         Transformation<?> transformations =
@@ -219,8 +241,7 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
         List<String> project;
         if (matchedUpdateSet.equals("*")) {
             // if sourceName is qualified like 'default.S', we should build a 
project like S.*
-            String[] splits = sourceTable.split("\\.");
-            project = Collections.singletonList(splits[splits.length - 1] + 
".*");
+            project = Collections.singletonList(sourceTableName() + ".*");
         } else {
             // validate upsert changes
             Map<String, String> changes = 
parseCommaSeparatedKeyValues(matchedUpdateSet);
@@ -245,16 +266,38 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
                             .collect(Collectors.toList());
         }
 
-        // use join to find matched rows and assign row id for each source row.
-        // _ROW_ID is the first field of joined table.
-        String query =
-                String.format(
-                        "SELECT %s, %s FROM %s INNER JOIN %s AS RT ON %s",
-                        "`RT`.`_ROW_ID` as `_ROW_ID`",
-                        String.join(",", project),
-                        escapedSourceName(),
-                        escapedRowTrackingTargetName(),
-                        rewriteMergeCondition(mergeCondition));
+        String query;
+        Optional<String> sourceRowIdField;
+        try {
+            sourceRowIdField = 
mergeConditionParser.extractRowIdFieldFromSource(targetTableName());
+        } catch (Exception e) {
+            LOG.error("Error happened when extract row id field from source 
table.", e);
+            throw new RuntimeException(
+                    "Error happened when extract row id field from source 
table.", e);
+        }
+
+        // if source table already contains _ROW_ID field, we could avoid join
+        if (sourceRowIdField.isPresent()) {
+            query =
+                    String.format(
+                            // cast _ROW_ID to BIGINT
+                            "SELECT CAST(`%s`.`%s` AS BIGINT) AS `_ROW_ID`, %s 
FROM %s",
+                            sourceTableName(),
+                            sourceRowIdField.get(),
+                            String.join(",", project),
+                            escapedSourceName());
+        } else {
+            // use join to find matched rows and assign row id for each source 
row.
+            // _ROW_ID is the first field of joined table.
+            query =
+                    String.format(
+                            "SELECT %s, %s FROM %s INNER JOIN %s AS RT ON %s",
+                            "`RT`.`_ROW_ID` as `_ROW_ID`",
+                            String.join(",", project),
+                            escapedSourceName(),
+                            escapedRowTrackingTargetName(),
+                            rewriteMergeCondition(mergeCondition));
+        }
 
         LOG.info("Source query: {}", query);
 
@@ -286,11 +329,15 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
         Preconditions.checkState(
                 !firstRowIds.isEmpty(), "Should not MERGE INTO an empty target 
table.");
 
+        // if firstRowIds is not empty, there must be a valid nextRowId
+        long maxRowId = table.latestSnapshot().get().nextRowId() - 1;
+
         OneInputTransformation<RowData, Tuple2<Long, RowData>> 
assignedFirstRowId =
                 new OneInputTransformation<>(
                         sourceTransformation,
                         "ASSIGN FIRST_ROW_ID",
-                        new StreamMap<>(new FirstRowIdAssigner(firstRowIds, 
sourceType)),
+                        new StreamFlatMap<>(
+                                new FirstRowIdAssigner(firstRowIds, maxRowId, 
sourceType)),
                         new TupleTypeInfo<>(
                                 BasicTypeInfo.LONG_TYPE_INFO, 
sourceTransformation.getOutputType()),
                         sourceTransformation.getParallelism(),
@@ -334,9 +381,20 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
                 .setParallelism(sinkParallelism);
     }
 
-    public DataStream<Committable> commit(DataStream<Committable> written) {
+    public DataStream<Committable> commit(
+            DataStream<Committable> written, Set<String> updatedColumns) {
         FileStoreTable storeTable = (FileStoreTable) table;
-        OneInputStreamOperatorFactory<Committable, Committable> 
committerOperator =
+
+        // Check if some global-indexed columns are updated
+        DataStream<Committable> checked =
+                written.transform(
+                                "Updated Column Check",
+                                new CommittableTypeInfo(),
+                                new MergeIntoUpdateChecker(storeTable, 
updatedColumns))
+                        .setParallelism(1)
+                        .setMaxParallelism(1);
+
+        CommitterOperatorFactory<Committable, ManifestCommittable> 
committerOperator =
                 new CommitterOperatorFactory<>(
                         false,
                         true,
@@ -348,7 +406,7 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
                                         context),
                         new NoopCommittableStateManager());
 
-        return written.transform("COMMIT OPERATOR", new CommittableTypeInfo(), 
committerOperator)
+        return checked.transform("COMMIT OPERATOR", new CommittableTypeInfo(), 
committerOperator)
                 .setParallelism(1)
                 .setMaxParallelism(1);
     }
@@ -382,28 +440,13 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
      */
     @VisibleForTesting
     public String rewriteMergeCondition(String mergeCondition) {
-        // skip single and double-quoted chunks
-        String skipQuoted = "'(?:''|[^'])*'" + "|\"(?:\"\"|[^\"])*\"";
-        String targetTableRegex =
-                "(?i)(?:\\b"
-                        + Pattern.quote(targetTableName())
-                        + "\\b|`"
-                        + Pattern.quote(targetTableName())
-                        + "`)\\s*\\.";
-
-        Pattern pattern = Pattern.compile(skipQuoted + "|(" + targetTableRegex 
+ ")");
-        Matcher matcher = pattern.matcher(mergeCondition);
-
-        StringBuffer sb = new StringBuffer();
-        while (matcher.find()) {
-            if (matcher.group(1) != null) {
-                matcher.appendReplacement(sb, 
Matcher.quoteReplacement("`RT`."));
-            } else {
-                matcher.appendReplacement(sb, 
Matcher.quoteReplacement(matcher.group(0)));
-            }
+        try {
+            Object rewrittenNode = 
mergeConditionParser.rewriteSqlNode(targetTableName(), "RT");
+            return rewrittenNode.toString();
+        } catch (Exception e) {
+            LOG.error("Failed to rewrite merge condition: {}", mergeCondition, 
e);
+            throw new RuntimeException("Failed to rewrite merge condition " + 
mergeCondition, e);
         }
-        matcher.appendTail(sb);
-        return sb.toString();
     }
 
     /**
@@ -432,7 +475,8 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
                 foundRowIdColumn = true;
                 Preconditions.checkState(
                         
flinkColumn.getDataType().getLogicalType().getTypeRoot()
-                                == LogicalTypeRoot.BIGINT);
+                                == LogicalTypeRoot.BIGINT,
+                        "_ROW_ID field should be BIGINT type.");
             } else {
                 DataField targetField = 
targetFields.get(flinkColumn.getName());
                 if (targetField == null) {
@@ -497,6 +541,11 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
         return targetAlias == null ? identifier.getObjectName() : targetAlias;
     }
 
+    private String sourceTableName() {
+        String[] splits = sourceTable.split("\\.");
+        return splits[splits.length - 1];
+    }
+
     private String escapedSourceName() {
         return Arrays.stream(sourceTable.split("\\."))
                 .map(s -> String.format("`%s`", s))
@@ -514,28 +563,108 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
                 catalogName, identifier.getDatabaseName(), 
identifier.getObjectName());
     }
 
-    private List<String> normalizeFieldName(List<String> fieldNames) {
-        return 
fieldNames.stream().map(this::normalizeFieldName).collect(Collectors.toList());
-    }
+    /** The parser to parse merge condition through calcite sql parser. */
+    static class MergeConditionParser {
+
+        private final FlinkCalciteClasses calciteClasses;
+        private final Object sqlNode;
+
+        MergeConditionParser(String mergeCondition) throws Exception {
+            this.calciteClasses = new FlinkCalciteClasses();
+            this.sqlNode = initializeSqlNode(mergeCondition);
+        }
 
-    private String normalizeFieldName(String fieldName) {
-        if (StringUtils.isNullOrWhitespaceOnly(fieldName) || 
fieldName.endsWith(IDENTIFIER_QUOTE)) {
-            return fieldName;
+        private Object initializeSqlNode(String mergeCondition) throws 
Exception {
+            Object config =
+                    calciteClasses
+                            .configDelegate()
+                            .withLex(
+                                    
calciteClasses.sqlParserDelegate().config(),
+                                    calciteClasses.lexDelegate().java());
+            Object sqlParser = 
calciteClasses.sqlParserDelegate().create(mergeCondition, config);
+            return 
calciteClasses.sqlParserDelegate().parseExpression(sqlParser);
         }
 
-        String[] splitFieldNames = fieldName.split("\\.");
-        if (!targetFieldNames.contains(splitFieldNames[splitFieldNames.length 
- 1])) {
-            return fieldName;
+        /**
+         * Rewrite the SQL node, replacing all references from the 'from' 
table to the 'to' table.
+         */
+        public Object rewriteSqlNode(String from, String to) throws Exception {
+            return rewriteNode(sqlNode, from, to);
         }
 
-        return String.join(
-                ".",
-                Arrays.stream(splitFieldNames)
-                        .map(
-                                part ->
-                                        part.endsWith(IDENTIFIER_QUOTE)
-                                                ? part
-                                                : IDENTIFIER_QUOTE + part + 
IDENTIFIER_QUOTE)
-                        .toArray(String[]::new));
+        private Object rewriteNode(Object node, String from, String to) throws 
Exception {
+            // It's a SqlBasicCall, recursively rewrite children operands
+            if 
(calciteClasses.sqlBasicCallDelegate().instanceOfSqlBasicCall(node)) {
+                List<?> operandList = 
calciteClasses.sqlBasicCallDelegate().getOperandList(node);
+                List<Object> newNodes = new java.util.ArrayList<>();
+                for (Object operand : operandList) {
+                    newNodes.add(rewriteNode(operand, from, to));
+                }
+
+                Object operator = 
calciteClasses.sqlBasicCallDelegate().getOperator(node);
+                Object parserPos = 
calciteClasses.sqlBasicCallDelegate().getParserPosition(node);
+                Object functionQuantifier =
+                        
calciteClasses.sqlBasicCallDelegate().getFunctionQuantifier(node);
+                return calciteClasses
+                        .sqlBasicCallDelegate()
+                        .create(operator, newNodes, parserPos, 
functionQuantifier);
+            } else if 
(calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(node)) {
+                // It's a sql identifier, try to replace the table name
+                List<String> names = 
calciteClasses.sqlIndentifierDelegate().getNames(node);
+                Preconditions.checkState(
+                        names.size() >= 2, "Please specify the table name for 
the column: " + node);
+                int nameLen = names.size();
+                if (names.get(nameLen - 2).equals(from)) {
+                    return 
calciteClasses.sqlIndentifierDelegate().setName(node, nameLen - 2, to);
+                }
+                return node;
+            } else {
+                return node;
+            }
+        }
+
+        /**
+         * Find the row id field in source table. This method looks for an 
equality condition like
+         * `target_table._ROW_ID = source_table.some_field` or 
`source_table.some_field =
+         * target_table._ROW_ID`, and returns the field name that is paired 
with _ROW_ID.
+         */
+        public Optional<String> extractRowIdFieldFromSource(String 
targetTable) throws Exception {
+            Object operator = 
calciteClasses.sqlBasicCallDelegate().getOperator(sqlNode);
+            Object kind = 
calciteClasses.sqlOperatorDelegate().getKind(operator);
+
+            if (kind == calciteClasses.sqlKindDelegate().equals()) {
+                List<?> operandList = 
calciteClasses.sqlBasicCallDelegate().getOperandList(sqlNode);
+
+                Object left = operandList.get(0);
+                Object right = operandList.get(1);
+
+                if 
(calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(left)
+                        && 
calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(right)) {
+
+                    List<String> leftNames = 
calciteClasses.sqlIndentifierDelegate().getNames(left);
+                    List<String> rightNames =
+                            
calciteClasses.sqlIndentifierDelegate().getNames(right);
+                    Preconditions.checkState(
+                            leftNames.size() >= 2,
+                            "Please specify the table name for the column: " + 
left);
+                    Preconditions.checkState(
+                            rightNames.size() >= 2,
+                            "Please specify the table name for the column: " + 
right);
+
+                    if (leftNames.get(leftNames.size() - 
1).equals(SpecialFields.ROW_ID.name())
+                            && leftNames.get(leftNames.size() - 
2).equals(targetTable)) {
+                        return Optional.of(rightNames.get(rightNames.size() - 
1));
+                    } else if (rightNames
+                                    .get(rightNames.size() - 1)
+                                    .equals(SpecialFields.ROW_ID.name())
+                            && rightNames.get(rightNames.size() - 
2).equals(targetTable)) {
+                        return Optional.of(leftNames.get(leftNames.size() - 
1));
+                    }
+                    return Optional.empty();
+                }
+            }
+
+            return Optional.empty();
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
index 4f9ec8c643..f09bdefe28 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
@@ -35,7 +35,6 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.CommitIncrement;
@@ -72,7 +71,6 @@ public class DataEvolutionPartialWriteOperator
 
     // dataType
     private final RowType dataType;
-    private final InternalRow.FieldGetter[] fieldGetters;
     private final int rowIdIndex;
 
     // data type excludes of _ROW_ID field.
@@ -101,11 +99,6 @@ public class DataEvolutionPartialWriteOperator
         this.dataType =
                 
SpecialFields.rowTypeWithRowId(table.rowType()).project(dataType.getFieldNames());
         this.rowIdIndex = 
this.dataType.getFieldIndex(SpecialFields.ROW_ID.name());
-        this.fieldGetters = new 
InternalRow.FieldGetter[dataType.getFieldCount()];
-        List<DataField> fields = this.dataType.getFields();
-        for (int i = 0; i < fields.size(); i++) {
-            this.fieldGetters[i] = 
InternalRow.createFieldGetter(fields.get(i).type(), i);
-        }
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
index ad87c5b016..bf1c41a871 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
@@ -24,30 +24,38 @@ import org.apache.paimon.utils.MurmurHashUtils;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
 
 import java.util.List;
 
-/** Assign first row id for each row through binary search. */
-public class FirstRowIdAssigner extends RichMapFunction<RowData, Tuple2<Long, 
RowData>> {
+/**
+ * Assign first row id for each row through binary search. Rows with invalid 
row ids are filtered
+ * out.
+ */
+public class FirstRowIdAssigner extends RichFlatMapFunction<RowData, 
Tuple2<Long, RowData>> {
 
     private final FirstRowIdLookup firstRowIdLookup;
+    private final long maxRowId;
 
     private final int rowIdFieldIndex;
 
-    public FirstRowIdAssigner(List<Long> firstRowIds, RowType rowType) {
+    public FirstRowIdAssigner(List<Long> firstRowIds, long maxRowId, RowType 
rowType) {
         this.firstRowIdLookup = new FirstRowIdLookup(firstRowIds);
+        this.maxRowId = maxRowId;
         this.rowIdFieldIndex = 
rowType.getFieldNames().indexOf(SpecialFields.ROW_ID.name());
         Preconditions.checkState(this.rowIdFieldIndex >= 0, "Do not found 
_ROW_ID column.");
     }
 
     @Override
-    public Tuple2<Long, RowData> map(RowData value) throws Exception {
+    public void flatMap(RowData value, Collector<Tuple2<Long, RowData>> out) 
throws Exception {
         long rowId = value.getLong(rowIdFieldIndex);
-        return new Tuple2<>(firstRowIdLookup.lookup(rowId), value);
+        if (rowId >= 0 && rowId <= maxRowId) {
+            out.collect(new Tuple2<>(firstRowIdLookup.lookup(rowId), value));
+        }
     }
 
     /** The Key Selector to get firstRowId from tuple2. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java
new file mode 100644
index 0000000000..94392a84ec
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The checker for merge into update result. It will check each committable to 
see if some
+ * global-indexed columns are updated. It will take some actions according to 
{@link
+ * CoreOptions#GLOBAL_INDEX_COLUMN_UPDATE_ACTION}.
+ */
+public class MergeIntoUpdateChecker extends 
BoundedOneInputOperator<Committable, Committable> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MergeIntoUpdateChecker.class);
+
+    private final FileStoreTable table;
+    private final Set<String> updatedColumns;
+
+    private transient Set<BinaryRow> affectedPartitions;
+
+    public MergeIntoUpdateChecker(FileStoreTable table, Set<String> 
updatedColumns) {
+        this.table = table;
+        this.updatedColumns = updatedColumns;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        affectedPartitions = new HashSet<>();
+
+        Preconditions.checkState(
+                
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks() == 1,
+                "Parallelism of MergeIntoUpdateChecker must be 1.");
+    }
+
+    @Override
+    public void processElement(StreamRecord<Committable> element) throws 
Exception {
+        affectedPartitions.add(element.getValue().commitMessage().partition());
+        output.collect(element);
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        checkUpdatedColumns();
+    }
+
+    private void checkUpdatedColumns() {
+        Optional<Snapshot> latestSnapshot = table.latestSnapshot();
+        RowType rowType = table.rowType();
+        Preconditions.checkState(latestSnapshot.isPresent());
+
+        List<IndexManifestEntry> affectedEntries =
+                table.store()
+                        .newIndexFileHandler()
+                        .scan(
+                                latestSnapshot.get(),
+                                entry -> {
+                                    GlobalIndexMeta globalIndexMeta =
+                                            
entry.indexFile().globalIndexMeta();
+                                    if (globalIndexMeta != null) {
+                                        String fieldName =
+                                                
rowType.getField(globalIndexMeta.indexFieldId())
+                                                        .name();
+                                        return 
updatedColumns.contains(fieldName)
+                                                && 
affectedPartitions.contains(entry.partition());
+                                    }
+                                    return false;
+                                });
+
+        if (!affectedEntries.isEmpty()) {
+            CoreOptions.GlobalIndexColumnUpdateAction updateAction =
+                    table.coreOptions().globalIndexColumnUpdateAction();
+            switch (updateAction) {
+                case THROW_ERROR:
+                    Set<String> conflictedColumns =
+                            affectedEntries.stream()
+                                    .map(file -> 
file.indexFile().globalIndexMeta().indexFieldId())
+                                    .map(id -> rowType.getField(id).name())
+                                    .collect(Collectors.toSet());
+
+                    throw new RuntimeException(
+                            String.format(
+                                    "MergeInto: update columns contain 
globally indexed columns, not supported now.\n"
+                                            + "Updated columns: %s\nConflicted 
columns: %s\n",
+                                    updatedColumns, conflictedColumns));
+                case DROP_PARTITION_INDEX:
+                    Map<BinaryRow, List<IndexFileMeta>> entriesByParts =
+                            affectedEntries.stream()
+                                    .collect(
+                                            Collectors.groupingBy(
+                                                    
IndexManifestEntry::partition,
+                                                    Collectors.mapping(
+                                                            
IndexManifestEntry::indexFile,
+                                                            
Collectors.toList())));
+
+                    for (Map.Entry<BinaryRow, List<IndexFileMeta>> entry :
+                            entriesByParts.entrySet()) {
+                        LOG.debug(
+                                "Dropping index files {} due to indexed fields 
update.",
+                                entry.getValue());
+
+                        CommitMessage commitMessage =
+                                new CommitMessageImpl(
+                                        entry.getKey(),
+                                        0,
+                                        null,
+                                        
DataIncrement.deleteIndexIncrement(entry.getValue()),
+                                        CompactIncrement.emptyIncrement());
+
+                        Committable committable = new 
Committable(Long.MAX_VALUE, commitMessage);
+
+                        output.collect(new StreamRecord<>(committable));
+                    }
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Unsupported 
option: " + updateAction);
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
index 5410dc3533..0ae93fdde6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
@@ -74,7 +74,7 @@ public class FlinkCalciteClasses {
         }
     }
 
-    public FlinkCalciteClasses() throws ClassNotFoundException {
+    public FlinkCalciteClasses() throws ClassNotFoundException, 
NoSuchFieldException {
         sqlNodeListDelegate = new SqlNodeListDelegate();
         sqlLiteralDelegate = new SqlLiteralDelegate();
         sqlBasicCallDelegate = new SqlBasicCallDelegate();
@@ -232,8 +232,11 @@ public class FlinkCalciteClasses {
         static final String CLASS_NAME = "org.apache.calcite.sql.SqlBasicCall";
         private final Class<?> clazz;
 
+        private final Class<?> sqlParserPosClass;
+
         public SqlBasicCallDelegate() throws ClassNotFoundException {
             this.clazz = loadCalciteClass(CLASS_NAME);
+            this.sqlParserPosClass = 
loadCalciteClass("org.apache.calcite.sql.parser.SqlParserPos");
         }
 
         public Object getOperator(Object basicCall) throws Exception {
@@ -244,6 +247,32 @@ public class FlinkCalciteClasses {
             return (List<?>)
                     invokeMethod(clazz, basicCall, "getOperandList", new 
Class[0], new Object[0]);
         }
+
+        public boolean instanceOfSqlBasicCall(Object sqlNode) throws Exception 
{
+            return clazz.isAssignableFrom(sqlNode.getClass());
+        }
+
+        public Object getParserPosition(Object basicCall) throws Exception {
+            return invokeMethod(clazz, basicCall, "getParserPosition", new 
Class[0], new Object[0]);
+        }
+
+        public Object getFunctionQuantifier(Object basicCall) throws Exception 
{
+            return invokeMethod(
+                    clazz, basicCall, "getFunctionQuantifier", new Class[0], 
new Object[0]);
+        }
+
+        public Object create(
+                Object operator, List<?> operands, Object parserPos, Object 
functionQuantifier)
+                throws Exception {
+            java.lang.reflect.Constructor<?> constructor =
+                    clazz.getConstructor(
+                            loadCalciteClass(SqlOperatorDelegate.SQL_OPERATOR),
+                            List.class,
+                            sqlParserPosClass,
+                            
loadCalciteClass("org.apache.calcite.sql.SqlLiteral"));
+            constructor.setAccessible(true);
+            return constructor.newInstance(operator, operands, parserPos, 
functionQuantifier);
+        }
     }
 
     /** Accessing org.apache.calcite.sql.SqlOperator by Reflection. */
@@ -288,14 +317,29 @@ public class FlinkCalciteClasses {
     public static class SqlIdentifierDelegate {
         private static final String SQL_IDENTIFIER = 
"org.apache.calcite.sql.SqlIdentifier";
         private final Class<?> identifierClazz;
+        private final Field namesField;
 
-        public SqlIdentifierDelegate() throws ClassNotFoundException {
+        public SqlIdentifierDelegate() throws ClassNotFoundException, 
NoSuchFieldException {
             this.identifierClazz = loadCalciteClass(SQL_IDENTIFIER);
+            this.namesField = identifierClazz.getField("names");
         }
 
         public boolean instanceOfSqlIdentifier(Object sqlNode) throws 
Exception {
             return identifierClazz.isAssignableFrom(sqlNode.getClass());
         }
+
+        public List<String> getNames(Object sqlIdentifier) throws 
IllegalAccessException {
+            return (List<String>) namesField.get(sqlIdentifier);
+        }
+
+        public Object setName(Object sqlIdentifier, int i, String name) throws 
Exception {
+            return invokeMethod(
+                    identifierClazz,
+                    sqlIdentifier,
+                    "setName",
+                    new Class[] {int.class, String.class},
+                    new Object[] {i, name});
+        }
     }
 
     /** Accessing org.apache.calcite.sql.SqlNodeList by Reflection. */
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
index 18fa729787..e5fac565c6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Assertions;
@@ -36,7 +38,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
@@ -48,7 +50,11 @@ import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.sEnv;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** ITCase for {@link DataEvolutionMergeIntoAction}. */
 public class DataEvolutionMergeIntoActionITCase extends ActionITCaseBase {
@@ -146,7 +152,7 @@ public class DataEvolutionMergeIntoActionITCase extends 
ActionITCaseBase {
             DataEvolutionMergeIntoActionBuilder builder =
                     builder(warehouse, targetDb, "T")
                             .withMergeCondition("T.id=S.id")
-                            
.withMatchedUpdateSet("T.name=S.name,T.value=S.`value`")
+                            
.withMatchedUpdateSet("T.value=S.`value`,T.name=S.name")
                             .withSourceTable("S")
                             .withSinkParallelism(2);
 
@@ -418,30 +424,191 @@ public class DataEvolutionMergeIntoActionITCase extends 
ActionITCaseBase {
                 expected);
     }
 
+    @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+    @MethodSource("testArguments")
+    public void testRowIdColumnContainedInSource(boolean inDefault, String 
invoker)
+            throws Exception {
+        String targetDb = inDefault ? database : "test_db";
+        if (!inDefault) {
+            // create target table in a new database
+            sEnv.executeSql("DROP TABLE T");
+            sEnv.executeSql("CREATE DATABASE test_db");
+            sEnv.executeSql("USE test_db");
+            bEnv.executeSql("USE test_db");
+            prepareTargetTable();
+        }
+
+        List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 2, "new_name1", 100.1),
+                        changelogRow("+I", 3, "name3", 0.3),
+                        changelogRow("+I", 4, "name4", 0.4),
+                        changelogRow("+I", 8, "new_name7", null),
+                        changelogRow("+I", 9, "name9", 0.9),
+                        changelogRow("+I", 12, "new_name11", 101.1),
+                        changelogRow("+I", 16, null, 101.1),
+                        changelogRow("+I", 19, "new_name18", 101.8));
+
+        if (invoker.equals("action")) {
+            DataEvolutionMergeIntoActionBuilder builder =
+                    builder(warehouse, targetDb, "T")
+                            .withMergeCondition("T._ROW_ID=S.id")
+                            
.withMatchedUpdateSet("T.name=S.name,T.value=S.`value`")
+                            .withSourceTable("S")
+                            .withSinkParallelism(2);
+
+            builder.build().run();
+        } else {
+            String procedureStatement =
+                    String.format(
+                            "CALL sys.data_evolution_merge_into('%s.T', '', 
'', 'S', 'T._ROW_ID=S.id', 'name=S.name,value=S.`value`', 2)",
+                            targetDb);
+
+            executeSQL(procedureStatement, false, true);
+        }
+
+        testBatchRead(
+                "SELECT id, name, `value` FROM T$row_tracking where _ROW_ID in 
(1, 2, 3, 7, 8, 11, 15, 18)",
+                expected);
+    }
+
     @Test
-    public void testRewriteMergeCondition() throws Exception {
-        Map<String, String> config = new HashMap<>();
-        config.put("warehouse", warehouse);
-        DataEvolutionMergeIntoAction action =
-                new DataEvolutionMergeIntoAction(database, "T", config);
+    public void testUpdateAction() throws Exception {
+
+        // create index on 01-22 partition
+        executeSQL(
+                "CALL sys.create_global_index(`table` => 'default.T', 
index_column => 'id', index_type => 'btree')",
+                false,
+                true);
+
+        assertTrue(indexFileExists("T"));
+
+        // 1. update indexed columns should throw an error by default
+        assertThatThrownBy(
+                        () ->
+                                executeSQL(
+                                        String.format(
+                                                "CALL 
sys.data_evolution_merge_into('%s.T', '', '', 'S', 'T._ROW_ID=S.id', 
'name=S.name,id=1', 2)",
+                                                database),
+                                        false,
+                                        true))
+                .rootCause()
+                .hasMessageContaining(
+                        "MergeInto: update columns contain globally indexed 
columns, not supported now.");
+
+        insertInto(
+                "T",
+                "(31, 'name31', 3.1, '01-23')",
+                "(32, 'name32', 3.2, '01-23')",
+                "(33, 'name33', 3.3, '01-23')",
+                "(34, 'name34', 3.4, '01-23')",
+                "(35, 'name35', 3.5, '01-23')",
+                "(36, 'name36', 3.6, '01-23')",
+                "(37, 'name37', 3.7, '01-23')",
+                "(38, 'name38', 3.8, '01-23')",
+                "(39, 'name39', 3.9, '01-23')",
+                "(40, 'name30', 3.0, '01-23')");
+
+        insertInto("S", "(35, 'new_name25', 125.1)");
+
+        // 2. updating unindexed partitions is not affected
+        assertDoesNotThrow(
+                () ->
+                        executeSQL(
+                                String.format(
+                                        "CALL 
sys.data_evolution_merge_into('%s.T', 'TempT', "
+                                                + "'CREATE TEMPORARY VIEW SS 
AS SELECT id, name, `value` FROM S WHERE id > 20',"
+                                                + " 'SS', 'TempT.id=SS.id', 
'id=SS.id,value=SS.`value`', 2)",
+                                        database),
+                                false,
+                                true));
+
+        // 3. alter table's UpdateAction option to DROP_INDEX
+        executeSQL(
+                "ALTER TABLE T SET ('global-index.column-update-action' = 
'DROP_PARTITION_INDEX')",
+                false,
+                true);
+
+        assertDoesNotThrow(
+                () ->
+                        executeSQL(
+                                String.format(
+                                        "CALL 
sys.data_evolution_merge_into('%s.T', '', '', 'S', 'T._ROW_ID=S.id', 
'name=S.name,id=1', 2)",
+                                        database),
+                                false,
+                                true));
+
+        assertFalse(indexFileExists("T"));
+    }
+
+    private boolean indexFileExists(String tableName) throws Exception {
+        FileStoreTable table = getFileStoreTable(tableName);
 
-        String mergeCondition = "T.id=S.id";
-        assertEquals("`RT`.id=S.id", 
action.rewriteMergeCondition(mergeCondition));
+        List<IndexManifestEntry> entries = 
table.store().newIndexFileHandler().scan("btree");
 
-        mergeCondition = "`T`.id=S.id";
-        assertEquals("`RT`.id=S.id", 
action.rewriteMergeCondition(mergeCondition));
+        return !entries.isEmpty();
+    }
 
-        mergeCondition = "t.id = s.id AND T.pt = s.pt";
+    @Test
+    public void mergeConditionParserTest() throws Exception {
+        // 1. test rewrite table names
+        // basic rewrite
+        String mergeCondition = "T.id = S.id";
+        DataEvolutionMergeIntoAction.MergeConditionParser parser = 
createParser(mergeCondition);
+        assertEquals("`RT`.`id` = `S`.`id`", parser.rewriteSqlNode("T", 
"RT").toString());
+
+        // should recognize quotes
+        mergeCondition = "T.id = s.id AND T.pt = s.pt";
+        parser = createParser(mergeCondition);
         assertEquals(
-                "`RT`.id = s.id AND `RT`.pt = s.pt", 
action.rewriteMergeCondition(mergeCondition));
+                "`RT`.`id` = `s`.`id` AND `RT`.`pt` = `s`.`pt`",
+                parser.rewriteSqlNode("T", "RT").toString());
 
-        mergeCondition = "TT.id = 1 AND T.id = 2";
-        assertEquals("TT.id = 1 AND `RT`.id = 2", 
action.rewriteMergeCondition(mergeCondition));
+        // should not rewrite column names
+        mergeCondition = "`T`.id = `T1`.`T` and T.`T.T` = S.a";
+        parser = createParser(mergeCondition);
+        assertEquals(
+                "`RT`.`id` = `T1`.`T` AND `RT`.`T.T` = `S`.`a`",
+                parser.rewriteSqlNode("T", "RT").toString());
 
-        mergeCondition = "TT.id = 'T.id' AND T.id = \"T.id\"";
+        // should not rewrite literals
+        mergeCondition = "T.id = S.id AND S.str_col = 'T.id' AND T.`value` = 
1";
+        parser = createParser(mergeCondition);
         assertEquals(
-                "TT.id = 'T.id' AND `RT`.id = \"T.id\"",
-                action.rewriteMergeCondition(mergeCondition));
+                "`RT`.`id` = `S`.`id` AND `S`.`str_col` = 'T.id' AND 
`RT`.`value` = 1",
+                parser.rewriteSqlNode("T", "RT").toString());
+
+        // 2. test extract row id condition
+        Optional<String> rowIdColumn;
+
+        mergeCondition = "T._ROW_ID = S.id";
+        parser = createParser(mergeCondition);
+        rowIdColumn = parser.extractRowIdFieldFromSource("T");
+        assertTrue(rowIdColumn.isPresent());
+        assertEquals("id", rowIdColumn.get());
+
+        mergeCondition = "S.id = T._ROW_ID";
+        parser = createParser(mergeCondition);
+        rowIdColumn = parser.extractRowIdFieldFromSource("T");
+        assertTrue(rowIdColumn.isPresent());
+        assertEquals("id", rowIdColumn.get());
+
+        // target table not matches
+        mergeCondition = "S.id = T._ROW_ID";
+        parser = createParser(mergeCondition);
+        rowIdColumn = parser.extractRowIdFieldFromSource("S");
+        assertFalse(rowIdColumn.isPresent());
+
+        // for simplicity, compounded condition is not considered now.
+        mergeCondition = "S.id = T._ROW_ID AND T.id = 1";
+        parser = createParser(mergeCondition);
+        rowIdColumn = parser.extractRowIdFieldFromSource("T");
+        assertFalse(rowIdColumn.isPresent());
+    }
+
+    private DataEvolutionMergeIntoAction.MergeConditionParser 
createParser(String mergeCondition)
+            throws Exception {
+        return new 
DataEvolutionMergeIntoAction.MergeConditionParser(mergeCondition);
     }
 
     @Test

Reply via email to