This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 926c381024 [cdc] Support computed column referring to each other while sync_table (#5972) 926c381024 is described below commit 926c381024260c0f5bbcbf3d696d0362f661d23b Author: JackeyLee007 <jackeylee...@126.com> AuthorDate: Tue Aug 5 10:01:44 2025 +0800 [cdc] Support computed column referring to each other while sync_table (#5972) --- .../paimon/flink/action/cdc/ComputedColumn.java | 14 -- .../flink/action/cdc/ComputedColumnUtils.java | 145 ++++++------------- .../apache/paimon/flink/action/cdc/Expression.java | 154 ++++----------------- .../action/cdc/format/AbstractRecordParser.java | 14 +- .../flink/action/cdc/mysql/MySqlRecordParser.java | 35 ++++- .../action/cdc/postgres/PostgresRecordParser.java | 17 ++- .../paimon/flink/action/cdc/utils/DfsSort.java | 103 ++++++++++++++ .../apache/paimon/flink/sink/cdc/CdcSchema.java | 5 +- .../flink/action/cdc/ComputedColumnUtilsTest.java | 41 +++--- .../cdc/mysql/MySqlSyncTableActionITCase.java | 64 +++++++++ .../flink/action/cdc/utils/DfsSortTestTest.java | 55 ++++++++ .../src/test/resources/mysql/sync_table_setup.sql | 9 ++ 12 files changed, 366 insertions(+), 290 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java index dddb909cd7..5e9041a120 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java @@ -53,11 +53,6 @@ public class ComputedColumn implements Serializable { return expression.fieldReference(); } - @Nullable - public DataType fieldReferenceType() { - return expression.fieldReferenceType(); - } - /** Compute column's value from given argument. Return null if input is null. */ @Nullable public String eval(@Nullable String input) { @@ -66,13 +61,4 @@ public class ComputedColumn implements Serializable { } return expression.eval(input); } - - /** Compute column's value from given argument. Return null if input is null. */ - @Nullable - public String eval(@Nullable String input, DataType inputType) { - if (fieldReference() != null && input == null) { - return null; - } - return expression.eval(input, inputType); - } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java index 28fbef9455..7bd9dd561d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java @@ -18,22 +18,19 @@ package org.apache.paimon.flink.action.cdc; -import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.flink.action.cdc.utils.DfsSort; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.utils.Preconditions; +import org.apache.flink.api.java.tuple.Tuple2; + import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; -import static org.apache.paimon.utils.Preconditions.checkArgument; - /** Utility methods for {@link ComputedColumn}, such as build. */ public class ComputedColumnUtils { @@ -50,16 +47,44 @@ public class ComputedColumnUtils { .collect( Collectors.toMap(DataField::name, DataField::type, (v1, v2) -> v2)); + // sort computed column args by dependencies + LinkedHashMap<String, Tuple2<String, String[]>> sortedArgs = + sortComputedColumnArgs(computedColumnArgs, caseSensitive); + List<ComputedColumn> computedColumns = new ArrayList<>(); - for (String columnArg : computedColumnArgs) { - String[] kv = columnArg.split("="); + for (Map.Entry<String, Tuple2<String, String[]>> columnArg : sortedArgs.entrySet()) { + String columnName = columnArg.getKey().trim(); + String exprName = columnArg.getValue().f0.trim(); + String[] args = columnArg.getValue().f1; + + Expression expr = Expression.create(typeMapping, caseSensitive, exprName, args); + ComputedColumn cmpColumn = new ComputedColumn(columnName, expr); + computedColumns.add(new ComputedColumn(columnName, expr)); + + // remember the column type for later reference by other computed columns + typeMapping.put(columnName, cmpColumn.columnType()); + } + + return computedColumns; + } + + private static LinkedHashMap<String, Tuple2<String, String[]>> sortComputedColumnArgs( + List<String> computedColumnArgs, boolean caseSensitive) { + List<String> argList = + computedColumnArgs.stream() + .map(x -> caseSensitive ? x : x.toUpperCase()) + .collect(Collectors.toList()); + + LinkedHashMap<String, Tuple2<String, String[]>> eqMap = new LinkedHashMap<>(); + LinkedHashMap<String, String> refMap = new LinkedHashMap<>(); + for (String arg : argList) { + String[] kv = arg.split("="); if (kv.length != 2) { throw new IllegalArgumentException( String.format( "Invalid computed column argument: %s. Please use format 'column-name=expr-name(args, ...)'.", - columnArg)); + arg)); } - String columnName = kv[0].trim(); String expression = kv[1].trim(); // parse expression int left = expression.indexOf('('); @@ -69,101 +94,21 @@ public class ComputedColumnUtils { String.format( "Invalid expression: %s. Please use format 'expr-name(args, ...)'.", expression)); - String exprName = expression.substring(0, left); String[] args = expression.substring(left + 1, right).split(","); - checkArgument(args.length >= 1, "Computed column needs at least one argument."); - - computedColumns.add( - new ComputedColumn( - columnName, - Expression.create(typeMapping, caseSensitive, exprName, args))); - } - - return sortComputedColumns(computedColumns); - } - - @VisibleForTesting - public static List<ComputedColumn> sortComputedColumns(List<ComputedColumn> columns) { - Set<String> columnNames = new HashSet<>(); - for (ComputedColumn col : columns) { - columnNames.add(col.columnName()); - } - // For simple processing, no reference or referring to another computed column, means - // independent - List<ComputedColumn> independent = new ArrayList<>(); - List<ComputedColumn> dependent = new ArrayList<>(); - - for (ComputedColumn col : columns) { - if (col.fieldReference() == null || !columnNames.contains(col.fieldReference())) { - independent.add(col); - } else { - dependent.add(col); - } - } - - // Sort dependent columns with topological sort - Map<String, ComputedColumn> columnMap = new HashMap<>(); - Map<String, Set<String>> reverseDependencies = new HashMap<>(); - - for (ComputedColumn col : dependent) { - columnMap.put(col.columnName(), col); - reverseDependencies - .computeIfAbsent(col.fieldReference(), k -> new HashSet<>()) - .add(col.columnName()); - } - - List<ComputedColumn> sortedDependent = new ArrayList<>(); - Set<String> visited = new HashSet<>(); - Set<String> tempMark = new HashSet<>(); // For cycle detection - - for (ComputedColumn col : dependent) { - if (!visited.contains(col.columnName())) { - dfs( - col.columnName(), - reverseDependencies, - columnMap, - sortedDependent, - visited, - tempMark); - } + // args[0] may be empty string, eg. "cal_col=now()" + eqMap.put(kv[0].trim(), Tuple2.of(exprName, args)); + refMap.put(kv[0].trim(), args[0].trim()); } - Collections.reverse(sortedDependent); + List<String> sortedKeys = DfsSort.sortKeys(refMap); - // Independent should precede dependent - List<ComputedColumn> result = new ArrayList<>(); - result.addAll(independent); - result.addAll(sortedDependent); - - return result; - } - - private static void dfs( - String node, - Map<String, Set<String>> reverseDependencies, - Map<String, ComputedColumn> columnMap, - List<ComputedColumn> sorted, - Set<String> visited, - Set<String> tempMark) { - if (tempMark.contains(node)) { - throw new IllegalArgumentException("Cycle detected: " + node); - } - if (visited.contains(node)) { - return; + LinkedHashMap<String, Tuple2<String, String[]>> sortedMap = + new LinkedHashMap<>(refMap.size()); + for (String key : sortedKeys) { + sortedMap.put(key, eqMap.get(key)); } - - tempMark.add(node); - ComputedColumn current = columnMap.get(node); - - // Process the dependencies - for (String dependent : reverseDependencies.getOrDefault(node, Collections.emptySet())) { - dfs(dependent, reverseDependencies, columnMap, sorted, visited, tempMark); - } - - tempMark.remove(node); - visited.add(node); - sorted.add(current); + return sortedMap; } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java index 9fdc4606a0..087fe15e67 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java @@ -24,7 +24,6 @@ import org.apache.paimon.types.DataTypeFamily; import org.apache.paimon.types.DataTypeJsonParser; import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.SerializableSupplier; import org.apache.paimon.utils.StringUtils; @@ -51,18 +50,12 @@ public interface Expression extends Serializable { /** Return name of referenced field. */ String fieldReference(); - /** Return {@link DataType} of referenced field. */ - DataType fieldReferenceType(); - /** Return {@link DataType} of computed value. */ DataType outputType(); /** Compute value from given input. Input and output are serialized to string. */ String eval(String input); - /** Compute value from given input. Input and output are serialized to string. */ - String eval(String input, DataType inputType); - /** Return name of this expression. */ default String name() { return null; @@ -73,7 +66,7 @@ public interface Expression extends Serializable { YEAR( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return TemporalToIntConverter.create( referencedField.field(), referencedField.fieldType(), @@ -83,7 +76,7 @@ public interface Expression extends Serializable { MONTH( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return TemporalToIntConverter.create( referencedField.field(), referencedField.fieldType(), @@ -93,7 +86,7 @@ public interface Expression extends Serializable { DAY( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return TemporalToIntConverter.create( referencedField.field(), referencedField.fieldType(), @@ -103,7 +96,7 @@ public interface Expression extends Serializable { HOUR( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return TemporalToIntConverter.create( referencedField.field(), referencedField.fieldType(), @@ -113,7 +106,7 @@ public interface Expression extends Serializable { MINUTE( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return TemporalToIntConverter.create( referencedField.field(), referencedField.fieldType(), @@ -123,7 +116,7 @@ public interface Expression extends Serializable { SECOND( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return TemporalToIntConverter.create( referencedField.field(), referencedField.fieldType(), @@ -133,7 +126,7 @@ public interface Expression extends Serializable { DATE_FORMAT( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return DateFormat.create( referencedField.field(), referencedField.fieldType(), @@ -142,13 +135,13 @@ public interface Expression extends Serializable { SUBSTRING( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return substring(referencedField.field(), referencedField.literals()); }), TRUNCATE( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return truncate( referencedField.field(), referencedField.fieldType(), @@ -159,7 +152,7 @@ public interface Expression extends Serializable { UPPER( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return new UpperExpression( referencedField.field(), referencedField.fieldType(), @@ -168,7 +161,7 @@ public interface Expression extends Serializable { LOWER( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return new LowerExpression( referencedField.field(), referencedField.fieldType(), @@ -177,7 +170,7 @@ public interface Expression extends Serializable { TRIM( (typeMapping, caseSensitive, args) -> { ReferencedField referencedField = - ReferencedField.create(typeMapping, caseSensitive, args); + ReferencedField.checkArgument(typeMapping, caseSensitive, args); return new TrimExpression( referencedField.field(), referencedField.fieldType(), @@ -215,16 +208,16 @@ public interface Expression extends Serializable { /** Referenced field in expression input parameters. */ class ReferencedField { private final String field; - @Nullable private final DataType fieldType; + private final DataType fieldType; private final String[] literals; - private ReferencedField(String field, @Nullable DataType fieldType, String[] literals) { + private ReferencedField(String field, DataType fieldType, String[] literals) { this.field = field; this.fieldType = fieldType; this.literals = literals; } - public static ReferencedField create( + public static ReferencedField checkArgument( Map<String, DataType> typeMapping, boolean caseSensitive, String... args) { String referencedField = args[0].trim(); String[] literals = @@ -233,13 +226,11 @@ public interface Expression extends Serializable { StringUtils.toLowerCaseIfNeed(referencedField, caseSensitive); DataType fieldType = - typeMapping.isEmpty() - ? null - : checkNotNull( - typeMapping.get(referencedFieldCheckForm), - String.format( - "Referenced field '%s' is not in given fields: %s.", - referencedFieldCheckForm, typeMapping.keySet())); + checkNotNull( + typeMapping.get(referencedFieldCheckForm), + String.format( + "Referenced field '%s' is not in given fields: %s.", + referencedFieldCheckForm, typeMapping.keySet())); return new ReferencedField(referencedField, fieldType, literals); } @@ -335,22 +326,16 @@ public interface Expression extends Serializable { private static final List<Integer> SUPPORTED_PRECISION = Arrays.asList(0, 3, 6, 9); private final String fieldReference; - @Nullable private DataType fieldReferenceType; - @Nullable private Integer precision; + @Nullable private final Integer precision; private transient Function<LocalDateTime, T> converter; private TemporalExpressionBase( - String fieldReference, @Nullable DataType fieldType, @Nullable Integer precision) { + String fieldReference, DataType fieldType, @Nullable Integer precision) { this.fieldReference = fieldReference; - this.fieldReferenceType = fieldType; // when the input is INTEGER_NUMERIC, the precision must be set - if (fieldType != null - && fieldType - .getTypeRoot() - .getFamilies() - .contains(DataTypeFamily.INTEGER_NUMERIC) + if (fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC) && precision == null) { precision = 0; } @@ -369,11 +354,6 @@ public interface Expression extends Serializable { return fieldReference; } - @Override - public DataType fieldReferenceType() { - return fieldReferenceType; - } - /** If not, this must be overridden! */ @Override public DataType outputType() { @@ -390,21 +370,6 @@ public interface Expression extends Serializable { return String.valueOf(result); } - @Override - public String eval(String input, DataType inputType) { - if (this.fieldReferenceType == null) { - this.fieldReferenceType = inputType; - - // when the input is INTEGER_NUMERIC, the precision must be set - if (inputType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC) - && precision == null) { - this.precision = 0; - } - } - - return eval(input); - } - private LocalDateTime toLocalDateTime(String input) { if (precision == null) { return DateTimeUtils.toLocalDateTime(input, 9); @@ -460,7 +425,7 @@ public interface Expression extends Serializable { private static TemporalToIntConverter create( String fieldReference, - @Nullable DataType fieldType, + DataType fieldType, SerializableSupplier<Function<LocalDateTime, Integer>> converterSupplier, String... literals) { checkArgument( @@ -539,11 +504,6 @@ public interface Expression extends Serializable { return fieldReference; } - @Override - public DataType fieldReferenceType() { - return new VarCharType(); - } - @Override public DataType outputType() { return DataTypes.STRING(); @@ -564,11 +524,6 @@ public interface Expression extends Serializable { input, beginInclusive, endExclusive)); } } - - @Override - public String eval(String input, DataType inputType) { - return eval(input); - } } /** Truncate numeric/decimal/string value. */ @@ -577,11 +532,11 @@ public interface Expression extends Serializable { private final String fieldReference; - @Nullable private DataType fieldType; + private final DataType fieldType; private final int width; - TruncateComputer(String fieldReference, @Nullable DataType fieldType, String literal) { + TruncateComputer(String fieldReference, DataType fieldType, String literal) { this.fieldReference = fieldReference; this.fieldType = fieldType; try { @@ -599,11 +554,6 @@ public interface Expression extends Serializable { return fieldReference; } - @Override - public DataType fieldReferenceType() { - return fieldType; - } - @Override public DataType outputType() { return fieldType; @@ -638,14 +588,6 @@ public interface Expression extends Serializable { } } - @Override - public String eval(String input, DataType inputType) { - if (this.fieldType == null) { - this.fieldType = inputType; - } - return eval(input); - } - private short truncateShort(int width, short value) { return (short) (value - (((value % width) + width) % width)); } @@ -690,11 +632,6 @@ public interface Expression extends Serializable { return null; } - @Override - public DataType fieldReferenceType() { - return null; - } - @Override public DataType outputType() { return dataType; @@ -704,11 +641,6 @@ public interface Expression extends Serializable { public String eval(String input) { return value; } - - @Override - public String eval(String input, DataType inputType) { - return value; - } } /** Get current timestamp. */ @@ -718,11 +650,6 @@ public interface Expression extends Serializable { return null; } - @Override - public DataType fieldReferenceType() { - return null; - } - @Override public DataType outputType() { return DataTypes.TIMESTAMP(3); @@ -732,11 +659,6 @@ public interface Expression extends Serializable { public String eval(String input) { return DateTimeUtils.formatLocalDateTime(LocalDateTime.now(), 3); } - - @Override - public String eval(String input, DataType inputType) { - return eval(input); - } } /** Convert string to upper case. */ @@ -797,14 +719,12 @@ public interface Expression extends Serializable { abstract class NoLiteralsStringExpressionBase implements Expression { private final String fieldReference; - @Nullable protected DataType fieldReferenceType; public NoLiteralsStringExpressionBase( - String fieldReference, @Nullable DataType fieldType, String... literals) { + String fieldReference, DataType fieldType, String... literals) { this.fieldReference = fieldReference; - this.fieldReferenceType = fieldType; checkArgument( - fieldType == null || fieldType.getTypeRoot() == DataTypeRoot.VARCHAR, + fieldType.getTypeRoot() == DataTypeRoot.VARCHAR, String.format( "'%s' expression only supports type root of '%s', but found '%s'.", name(), DataTypeRoot.VARCHAR, fieldType.getTypeRoot())); @@ -824,23 +744,5 @@ public interface Expression extends Serializable { public String fieldReference() { return fieldReference; } - - @Override - public DataType fieldReferenceType() { - return fieldReferenceType; - } - - @Override - public String eval(String input, DataType inputType) { - if (this.fieldReferenceType == null) { - checkArgument( - inputType.getTypeRoot() == DataTypeRoot.VARCHAR, - String.format( - "'%s' expression only supports type root of '%s', but found '%s'.", - name(), DataTypeRoot.VARCHAR, inputType.getTypeRoot())); - this.fieldReferenceType = inputType; - } - return eval(input); - } } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java index 8008cd7ca9..85442067b9 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java @@ -25,7 +25,6 @@ import org.apache.paimon.flink.sink.cdc.CdcRecord; import org.apache.paimon.flink.sink.cdc.CdcSchema; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; import org.apache.paimon.schema.Schema; -import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -104,17 +103,8 @@ public abstract class AbstractRecordParser Map<String, String> rowData, CdcSchema.Builder schemaBuilder) { computedColumns.forEach( computedColumn -> { - String result; - if (computedColumn.fieldReference() != null - && computedColumn.fieldReferenceType() == null) { - DataType inputType = - schemaBuilder.getFieldType(computedColumn.fieldReference()); - result = - computedColumn.eval( - rowData.get(computedColumn.fieldReference()), inputType); - } else { - result = computedColumn.eval(rowData.get(computedColumn.fieldReference())); - } + String result = + computedColumn.eval(rowData.get(computedColumn.fieldReference())); rowData.put(computedColumn.columnName(), result); schemaBuilder.column(computedColumn.columnName(), computedColumn.columnType()); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java index 0fffdf0b98..6c8f2ae324 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java @@ -32,6 +32,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -235,21 +236,40 @@ public class MySqlRecordParser implements FlatMapFunction<CdcSourceRecord, RichC Map<String, DebeziumEvent.Field> fields = schema.beforeAndAfterFields(); + ObjectMapper objectMapper = new ObjectMapper(); + + CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder(); + LinkedHashMap<String, String> resultMap = new LinkedHashMap<>(); + for (Map.Entry<String, DebeziumEvent.Field> field : fields.entrySet()) { String fieldName = field.getKey(); - String mySqlType = field.getValue().type(); + String debeziumType = field.getValue().type(); + String className = field.getValue().name(); + + // record the field data type for computed columns reference + JsonNode parametersNode = field.getValue().parameters(); + Map<String, String> parametersMap = + isNull(parametersNode) + ? Collections.emptyMap() + : JsonSerdeUtil.convertValue( + parametersNode, + new TypeReference<HashMap<String, String>>() {}); + + DataType paimonDataType = + DebeziumSchemaUtils.toDataType(debeziumType, className, parametersMap); + schemaBuilder.column(fieldName, paimonDataType); + JsonNode objectValue = recordRow.get(fieldName); if (isNull(objectValue)) { continue; } - String className = field.getValue().name(); String oldValue = objectValue.asText(); String newValue = DebeziumSchemaUtils.transformRawValue( oldValue, - mySqlType, + debeziumType, className, typeMapping, objectValue, @@ -259,9 +279,12 @@ public class MySqlRecordParser implements FlatMapFunction<CdcSourceRecord, RichC // generate values of computed columns for (ComputedColumn computedColumn : computedColumns) { - resultMap.put( - computedColumn.columnName(), - computedColumn.eval(resultMap.get(computedColumn.fieldReference()))); + String refName = computedColumn.fieldReference(); + + resultMap.put(computedColumn.columnName(), computedColumn.eval(resultMap.get(refName))); + + // remember the computed column data type for later reference by other computed columns + schemaBuilder.column(computedColumn.columnName(), computedColumn.columnType()); } for (CdcMetadataConverter metadataConverter : metadataConverters) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java index af114cb920..c2565c1f2d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java @@ -121,7 +121,7 @@ public class PostgresRecordParser extractRecords().forEach(out::collect); } - private CdcSchema extractSchema(DebeziumEvent.Field schema) { + private CdcSchema extractSchema(DebeziumEvent.Field schema, CdcSchema.Builder schemaBuilder) { Map<String, DebeziumEvent.Field> afterFields = schema.afterFields(); Preconditions.checkArgument( !afterFields.isEmpty(), @@ -129,7 +129,6 @@ public class PostgresRecordParser + "Please make sure that `includeSchema` is true " + "in the JsonDebeziumDeserializationSchema you created"); - CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder(); afterFields.forEach( (key, value) -> { DataType dataType = extractFieldType(value); @@ -207,15 +206,16 @@ public class PostgresRecordParser private List<RichCdcMultiplexRecord> extractRecords() { List<RichCdcMultiplexRecord> records = new ArrayList<>(); + CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder(); + CdcSchema schema = extractSchema(root.schema(), schemaBuilder); - Map<String, String> before = extractRow(root.payload().before()); + Map<String, String> before = extractRow(root.payload().before(), schemaBuilder); if (!before.isEmpty()) { records.add(createRecord(RowKind.DELETE, before)); } - Map<String, String> after = extractRow(root.payload().after()); + Map<String, String> after = extractRow(root.payload().after(), schemaBuilder); if (!after.isEmpty()) { - CdcSchema schema = extractSchema(root.schema()); records.add( new RichCdcMultiplexRecord( databaseName, @@ -227,7 +227,7 @@ public class PostgresRecordParser return records; } - private Map<String, String> extractRow(JsonNode recordRow) { + private Map<String, String> extractRow(JsonNode recordRow, CdcSchema.Builder schemaBuilder) { if (isNull(recordRow)) { return new HashMap<>(); } @@ -346,9 +346,8 @@ public class PostgresRecordParser // generate values of computed columns for (ComputedColumn computedColumn : computedColumns) { - resultMap.put( - computedColumn.columnName(), - computedColumn.eval(resultMap.get(computedColumn.fieldReference()))); + String refName = computedColumn.fieldReference(); + resultMap.put(computedColumn.columnName(), computedColumn.eval(resultMap.get(refName))); } for (CdcMetadataConverter metadataConverter : metadataConverters) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/utils/DfsSort.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/utils/DfsSort.java new file mode 100644 index 0000000000..555c64ee2a --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/utils/DfsSort.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.utils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** DFS sort algorithm for topological sorting of a DAG (Directed Acyclic Graph). */ +public class DfsSort { + public static <K> LinkedHashMap<K, K> sort(LinkedHashMap<K, K> depMap) { + List<K> sortedKeys = sortKeys(depMap); + LinkedHashMap<K, K> sortedMap = new LinkedHashMap<>(); + for (K key : sortedKeys) { + sortedMap.put(key, depMap.get(key)); + } + return sortedMap; + } + + public static <K> List<K> sortKeys(LinkedHashMap<K, K> depMap) { + + Map<K, Set<K>> revMap = new LinkedHashMap<>(); + + List<K> noDeps = new ArrayList<>(); + + for (Map.Entry<K, K> entry : depMap.entrySet()) { + K key = entry.getKey(); + K val = entry.getValue(); + + if (val == null || !depMap.containsKey(val)) { + noDeps.add(key); + } else { + revMap.computeIfAbsent(val, k -> new HashSet<>()).add(key); + } + } + + List<K> sorted = new ArrayList<>(); + + Set<K> visited = new HashSet<>(); + Set<K> tempMark = new HashSet<>(); // for cycle reference detection + + for (Map.Entry<K, K> entry : depMap.entrySet()) { + K key = entry.getKey(); + K val = entry.getValue(); + if (val == null || !depMap.containsKey(val)) { + continue; + } + + if (!visited.contains(key)) { + dfs(key, revMap, sorted, visited, tempMark); + } + } + + Collections.reverse(noDeps); + + sorted.addAll(noDeps); + + Collections.reverse(sorted); + + return sorted; + } + + private static <K> void dfs( + K node, Map<K, Set<K>> revMap, List<K> sorted, Set<K> visited, Set<K> tempMark) { + if (tempMark.contains(node)) { + throw new IllegalArgumentException("Cycle detected: " + node); + } + if (visited.contains(node)) { + return; + } + + tempMark.add(node); + + // Process the dependencies + for (K dependent : revMap.getOrDefault(node, Collections.emptySet())) { + dfs(dependent, revMap, sorted, visited, tempMark); + } + + tempMark.remove(node); + visited.add(node); + sorted.add(node); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java index 7a615fdbc8..229dcc69bb 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java @@ -184,10 +184,7 @@ public class CdcSchema implements Serializable { /** Returns the data type of the specified field. */ public DataType getFieldType(String fieldName) { DataField field = columns.get(fieldName); - if (field == null) { - throw new IllegalArgumentException("Field " + fieldName + " not found in schema."); - } - return field.type(); + return field == null ? null : field.type(); } /** Returns an instance of an unresolved {@link CdcSchema}. */ diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java index 5ab3f48817..d04f91053b 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java @@ -18,44 +18,47 @@ package org.apache.paimon.flink.action.cdc; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; + import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -import static org.apache.paimon.flink.action.cdc.ComputedColumnUtils.sortComputedColumns; +import static org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns; import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; /** Test for ComputedColumnUtils. */ public class ComputedColumnUtilsTest { + @Test - public void testSortComputedColumns() { - List<ComputedColumn> columns = + public void testComputedColumns() { + List<String> calColArgs = Arrays.asList( - new ComputedColumn("A", Expression.substring("B", "1")), - new ComputedColumn("B", Expression.substring("ExistedColumn", "1")), - new ComputedColumn("C", Expression.cast("No Reference")), - new ComputedColumn("D", Expression.substring("A", "1")), - new ComputedColumn("E", Expression.substring("C", "1"))); + "A=substring(B, 1)", + "B=substring(ExistedColumn,1)", + "C=now()", + "D=substring(A, 1)", + "E=substring(C,1)"); + List<DataField> physicalFields = + Arrays.asList(new DataField(1, "ExistedColumn", DataTypes.STRING())); + List<ComputedColumn> columns = buildComputedColumns(calColArgs, physicalFields); - List<ComputedColumn> sortedColumns = sortComputedColumns(columns); assertEquals( Arrays.asList("B", "C", "E", "A", "D"), - sortedColumns.stream() - .map(ComputedColumn::columnName) - .collect(Collectors.toList())); + columns.stream().map(ComputedColumn::columnName).collect(Collectors.toList())); } @Test public void testCycleReference() { - List<ComputedColumn> columns = - Arrays.asList( - new ComputedColumn("A", Expression.substring("B", "1")), - new ComputedColumn("B", Expression.substring("C", "1")), - new ComputedColumn("C", Expression.substring("A", "1"))); - - assertThrows(IllegalArgumentException.class, () -> sortComputedColumns(columns)); + List<String> calColArgs = + Arrays.asList("A=substring(B, 1)", "B=substring(C, 1)", "C=substring(A, 1)"); + assertThrows( + IllegalArgumentException.class, + () -> buildComputedColumns(calColArgs, Collections.emptyList())); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index b48a1c79cf..1200629351 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1131,6 +1131,70 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { epochSecond * 1_000_000_000 + nano)); } + @Test + @Timeout(60) + public void testComputedColumnsCrossReference() throws Exception { + Map<String, String> mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "test_computed_column2"); + + List<String> computedColumnDefs = + Arrays.asList( + "_lower_of_upper=lower(_upper)", + "_upper=upper(_value)", + "_trim_lower=trim(_lower_of_upper)", + "_constant=cast(11,INT)"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withPrimaryKeys("pk") + .withComputedColumnArgs(computedColumnDefs) + .build(); + runActionWithDefaultEnv(action); + + try (Statement statement = getStatement()) { + statement.execute("USE " + DATABASE_NAME); + statement.executeUpdate( + "INSERT INTO test_computed_column2 VALUES (1, '2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10', ' vaLUE ')"); + statement.executeUpdate( + "INSERT INTO test_computed_column2 VALUES (2, '2023-03-23', null, null, null)"); + } + + FileStoreTable table = getFileStoreTable(); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.DATE(), + DataTypes.TIMESTAMP(0), + DataTypes.TIMESTAMP(0), + DataTypes.VARCHAR(10), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] { + "pk", + "_date", + "_datetime", + "_timestamp", + "_value", + "_upper", + "_constant", + "_lower_of_upper", + "_trim_lower" + }); + List<String> expected = + Arrays.asList( + // sort according to reference + + "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, vaLUE , VALUE , 11, value , value]", + "+I[2, 19439, NULL, NULL, NULL, NULL, 11, NULL, NULL]"); + + waitForResult(expected, table, rowType, Arrays.asList("pk")); + } + @Test @Timeout(60) public void testSyncShards() throws Exception { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/utils/DfsSortTestTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/utils/DfsSortTestTest.java new file mode 100644 index 0000000000..180474d2cf --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/utils/DfsSortTestTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.utils; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** Test for {@link DfsSort}. */ +public class DfsSortTestTest { + @Test + public void testSortKeys() { + LinkedHashMap<String, String> refs = new LinkedHashMap<>(); + refs.put("A", "B"); + refs.put("B", "O"); + refs.put("C", null); + refs.put("D", "A"); + refs.put("E", "C"); + refs.put("F", ""); + + List<String> sorted = DfsSort.sortKeys(refs); + assertEquals(Arrays.asList("B", "C", "F", "E", "A", "D"), sorted); + } + + @Test + public void testCycleReference() { + LinkedHashMap<String, String> refs = new LinkedHashMap<>(); + refs.put("A", "B"); + refs.put("B", "C"); + refs.put("C", "A"); + + assertThrows(IllegalArgumentException.class, () -> DfsSort.sort(refs)); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index ae0186cf70..a3b42f9440 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -296,6 +296,15 @@ CREATE TABLE test_computed_column ( PRIMARY KEY (pk) ); +CREATE TABLE test_computed_column2 ( + pk INT, + _date DATE, + _datetime DATETIME, + _timestamp TIMESTAMP, + _value VARCHAR(10), + PRIMARY KEY (pk) +); + CREATE TABLE test_time_to_int_epoch ( pk INT, _second_val0 INT,