This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ef9e214468 [cdc] Support computed columns when sync_database (#5816)
ef9e214468 is described below
commit ef9e214468d55b98dbe95d8e2e9c569b3e32ccfe
Author: JackeyLee007 <[email protected]>
AuthorDate: Tue Jul 1 16:21:35 2025 +0800
[cdc] Support computed columns when sync_database (#5816)
---
docs/content/cdc-ingestion/kafka-cdc.md | 4 +-
.../shortcodes/generated/kafka_sync_database.html | 2 +-
.../paimon/flink/action/cdc/ComputedColumn.java | 14 ++
.../flink/action/cdc/ComputedColumnUtils.java | 91 +++++++++++-
.../apache/paimon/flink/action/cdc/Expression.java | 154 +++++++++++++++++----
.../action/cdc/format/AbstractRecordParser.java | 17 ++-
.../apache/paimon/flink/sink/cdc/CdcSchema.java | 20 ++-
.../flink/action/cdc/ComputedColumnUtilsTest.java | 61 ++++++++
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 26 +++-
9 files changed, 345 insertions(+), 44 deletions(-)
diff --git a/docs/content/cdc-ingestion/kafka-cdc.md
b/docs/content/cdc-ingestion/kafka-cdc.md
index dfad34fad3..7ca3b2728c 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -214,6 +214,7 @@ To use this feature through `flink run`, run the following
shell command.
[--type_mapping to-string] \
[--partition_keys <partition_keys>] \
[--primary_keys <primary-keys>] \
+ [--computed_column <'column-name=expr-name(args[, ...])'>
[--computed_column ...]] \
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf
<paimon-table-sink-conf> ...]]
@@ -244,7 +245,8 @@ Synchronization from one Kafka topic to Paimon database.
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
- --table_conf sink.parallelism=4
+ --table_conf sink.parallelism=4 \
+ --computed_column 'pt=date_format(event_tm, yyyyMMdd)'
```
Synchronization from multiple Kafka topics to Paimon database.
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index d24b7cf25a..4b6ee3e38d 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -92,7 +92,7 @@ under the License.
</tr>
<tr>
<td><h5>--computed_column</h5></td>
- <td>The definitions of computed columns. The argument field is from
Kafka topic's table field name. See <a
href="../overview/#computed-functions">here</a> for a complete list of
configurations. </td>
+ <td>The definitions of computed columns. The argument field is from
Kafka topic's table field name. See <a
href="../overview/#computed-functions">here</a> for a complete list of
configurations. NOTICE: It returns null if the referenced column does not exist
in the source table.</td>
</tr>
<tr>
<td><h5>--eager_init</h5></td>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
index 5e9041a120..dddb909cd7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
@@ -53,6 +53,11 @@ public class ComputedColumn implements Serializable {
return expression.fieldReference();
}
+ @Nullable
+ public DataType fieldReferenceType() {
+ return expression.fieldReferenceType();
+ }
+
/** Compute column's value from given argument. Return null if input is
null. */
@Nullable
public String eval(@Nullable String input) {
@@ -61,4 +66,13 @@ public class ComputedColumn implements Serializable {
}
return expression.eval(input);
}
+
+ /** Compute column's value from given argument. Return null if input is
null. */
+ @Nullable
+ public String eval(@Nullable String input, DataType inputType) {
+ if (fieldReference() != null && input == null) {
+ return null;
+ }
+ return expression.eval(input, inputType);
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index 24ca0599bf..28fbef9455 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -18,13 +18,18 @@
package org.apache.paimon.flink.action.cdc;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -75,6 +80,90 @@ public class ComputedColumnUtils {
Expression.create(typeMapping, caseSensitive,
exprName, args)));
}
- return computedColumns;
+ return sortComputedColumns(computedColumns);
+ }
+
+ @VisibleForTesting
+ public static List<ComputedColumn>
sortComputedColumns(List<ComputedColumn> columns) {
+ Set<String> columnNames = new HashSet<>();
+ for (ComputedColumn col : columns) {
+ columnNames.add(col.columnName());
+ }
+
+ // For simple processing, no reference or referring to another
computed column, means
+ // independent
+ List<ComputedColumn> independent = new ArrayList<>();
+ List<ComputedColumn> dependent = new ArrayList<>();
+
+ for (ComputedColumn col : columns) {
+ if (col.fieldReference() == null ||
!columnNames.contains(col.fieldReference())) {
+ independent.add(col);
+ } else {
+ dependent.add(col);
+ }
+ }
+
+ // Sort dependent columns with topological sort
+ Map<String, ComputedColumn> columnMap = new HashMap<>();
+ Map<String, Set<String>> reverseDependencies = new HashMap<>();
+
+ for (ComputedColumn col : dependent) {
+ columnMap.put(col.columnName(), col);
+ reverseDependencies
+ .computeIfAbsent(col.fieldReference(), k -> new
HashSet<>())
+ .add(col.columnName());
+ }
+
+ List<ComputedColumn> sortedDependent = new ArrayList<>();
+ Set<String> visited = new HashSet<>();
+ Set<String> tempMark = new HashSet<>(); // For cycle detection
+
+ for (ComputedColumn col : dependent) {
+ if (!visited.contains(col.columnName())) {
+ dfs(
+ col.columnName(),
+ reverseDependencies,
+ columnMap,
+ sortedDependent,
+ visited,
+ tempMark);
+ }
+ }
+
+ Collections.reverse(sortedDependent);
+
+ // Independent should precede dependent
+ List<ComputedColumn> result = new ArrayList<>();
+ result.addAll(independent);
+ result.addAll(sortedDependent);
+
+ return result;
+ }
+
+ private static void dfs(
+ String node,
+ Map<String, Set<String>> reverseDependencies,
+ Map<String, ComputedColumn> columnMap,
+ List<ComputedColumn> sorted,
+ Set<String> visited,
+ Set<String> tempMark) {
+ if (tempMark.contains(node)) {
+ throw new IllegalArgumentException("Cycle detected: " + node);
+ }
+ if (visited.contains(node)) {
+ return;
+ }
+
+ tempMark.add(node);
+ ComputedColumn current = columnMap.get(node);
+
+ // Process the dependencies
+ for (String dependent : reverseDependencies.getOrDefault(node,
Collections.emptySet())) {
+ dfs(dependent, reverseDependencies, columnMap, sorted, visited,
tempMark);
+ }
+
+ tempMark.remove(node);
+ visited.add(node);
+ sorted.add(current);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 087fe15e67..9fdc4606a0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -24,6 +24,7 @@ import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypeJsonParser;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.StringUtils;
@@ -50,12 +51,18 @@ public interface Expression extends Serializable {
/** Return name of referenced field. */
String fieldReference();
+ /** Return {@link DataType} of referenced field. */
+ DataType fieldReferenceType();
+
/** Return {@link DataType} of computed value. */
DataType outputType();
/** Compute value from given input. Input and output are serialized to
string. */
String eval(String input);
+ /** Compute value from given input. Input and output are serialized to
string. */
+ String eval(String input, DataType inputType);
+
/** Return name of this expression. */
default String name() {
return null;
@@ -66,7 +73,7 @@ public interface Expression extends Serializable {
YEAR(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -76,7 +83,7 @@ public interface Expression extends Serializable {
MONTH(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -86,7 +93,7 @@ public interface Expression extends Serializable {
DAY(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -96,7 +103,7 @@ public interface Expression extends Serializable {
HOUR(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -106,7 +113,7 @@ public interface Expression extends Serializable {
MINUTE(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -116,7 +123,7 @@ public interface Expression extends Serializable {
SECOND(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -126,7 +133,7 @@ public interface Expression extends Serializable {
DATE_FORMAT(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return DateFormat.create(
referencedField.field(),
referencedField.fieldType(),
@@ -135,13 +142,13 @@ public interface Expression extends Serializable {
SUBSTRING(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return substring(referencedField.field(),
referencedField.literals());
}),
TRUNCATE(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return truncate(
referencedField.field(),
referencedField.fieldType(),
@@ -152,7 +159,7 @@ public interface Expression extends Serializable {
UPPER(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return new UpperExpression(
referencedField.field(),
referencedField.fieldType(),
@@ -161,7 +168,7 @@ public interface Expression extends Serializable {
LOWER(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return new LowerExpression(
referencedField.field(),
referencedField.fieldType(),
@@ -170,7 +177,7 @@ public interface Expression extends Serializable {
TRIM(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ ReferencedField.create(typeMapping, caseSensitive,
args);
return new TrimExpression(
referencedField.field(),
referencedField.fieldType(),
@@ -208,16 +215,16 @@ public interface Expression extends Serializable {
/** Referenced field in expression input parameters. */
class ReferencedField {
private final String field;
- private final DataType fieldType;
+ @Nullable private final DataType fieldType;
private final String[] literals;
- private ReferencedField(String field, DataType fieldType, String[]
literals) {
+ private ReferencedField(String field, @Nullable DataType fieldType,
String[] literals) {
this.field = field;
this.fieldType = fieldType;
this.literals = literals;
}
- public static ReferencedField checkArgument(
+ public static ReferencedField create(
Map<String, DataType> typeMapping, boolean caseSensitive,
String... args) {
String referencedField = args[0].trim();
String[] literals =
@@ -226,11 +233,13 @@ public interface Expression extends Serializable {
StringUtils.toLowerCaseIfNeed(referencedField,
caseSensitive);
DataType fieldType =
- checkNotNull(
- typeMapping.get(referencedFieldCheckForm),
- String.format(
- "Referenced field '%s' is not in given
fields: %s.",
- referencedFieldCheckForm,
typeMapping.keySet()));
+ typeMapping.isEmpty()
+ ? null
+ : checkNotNull(
+ typeMapping.get(referencedFieldCheckForm),
+ String.format(
+ "Referenced field '%s' is not in
given fields: %s.",
+ referencedFieldCheckForm,
typeMapping.keySet()));
return new ReferencedField(referencedField, fieldType, literals);
}
@@ -326,16 +335,22 @@ public interface Expression extends Serializable {
private static final List<Integer> SUPPORTED_PRECISION =
Arrays.asList(0, 3, 6, 9);
private final String fieldReference;
- @Nullable private final Integer precision;
+ @Nullable private DataType fieldReferenceType;
+ @Nullable private Integer precision;
private transient Function<LocalDateTime, T> converter;
private TemporalExpressionBase(
- String fieldReference, DataType fieldType, @Nullable Integer
precision) {
+ String fieldReference, @Nullable DataType fieldType, @Nullable
Integer precision) {
this.fieldReference = fieldReference;
+ this.fieldReferenceType = fieldType;
// when the input is INTEGER_NUMERIC, the precision must be set
- if
(fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)
+ if (fieldType != null
+ && fieldType
+ .getTypeRoot()
+ .getFamilies()
+ .contains(DataTypeFamily.INTEGER_NUMERIC)
&& precision == null) {
precision = 0;
}
@@ -354,6 +369,11 @@ public interface Expression extends Serializable {
return fieldReference;
}
+ @Override
+ public DataType fieldReferenceType() {
+ return fieldReferenceType;
+ }
+
/** If not, this must be overridden! */
@Override
public DataType outputType() {
@@ -370,6 +390,21 @@ public interface Expression extends Serializable {
return String.valueOf(result);
}
+ @Override
+ public String eval(String input, DataType inputType) {
+ if (this.fieldReferenceType == null) {
+ this.fieldReferenceType = inputType;
+
+ // when the input is INTEGER_NUMERIC, the precision must be set
+ if
(inputType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)
+ && precision == null) {
+ this.precision = 0;
+ }
+ }
+
+ return eval(input);
+ }
+
private LocalDateTime toLocalDateTime(String input) {
if (precision == null) {
return DateTimeUtils.toLocalDateTime(input, 9);
@@ -425,7 +460,7 @@ public interface Expression extends Serializable {
private static TemporalToIntConverter create(
String fieldReference,
- DataType fieldType,
+ @Nullable DataType fieldType,
SerializableSupplier<Function<LocalDateTime, Integer>>
converterSupplier,
String... literals) {
checkArgument(
@@ -504,6 +539,11 @@ public interface Expression extends Serializable {
return fieldReference;
}
+ @Override
+ public DataType fieldReferenceType() {
+ return new VarCharType();
+ }
+
@Override
public DataType outputType() {
return DataTypes.STRING();
@@ -524,6 +564,11 @@ public interface Expression extends Serializable {
input, beginInclusive, endExclusive));
}
}
+
+ @Override
+ public String eval(String input, DataType inputType) {
+ return eval(input);
+ }
}
/** Truncate numeric/decimal/string value. */
@@ -532,11 +577,11 @@ public interface Expression extends Serializable {
private final String fieldReference;
- private final DataType fieldType;
+ @Nullable private DataType fieldType;
private final int width;
- TruncateComputer(String fieldReference, DataType fieldType, String
literal) {
+ TruncateComputer(String fieldReference, @Nullable DataType fieldType,
String literal) {
this.fieldReference = fieldReference;
this.fieldType = fieldType;
try {
@@ -554,6 +599,11 @@ public interface Expression extends Serializable {
return fieldReference;
}
+ @Override
+ public DataType fieldReferenceType() {
+ return fieldType;
+ }
+
@Override
public DataType outputType() {
return fieldType;
@@ -588,6 +638,14 @@ public interface Expression extends Serializable {
}
}
+ @Override
+ public String eval(String input, DataType inputType) {
+ if (this.fieldType == null) {
+ this.fieldType = inputType;
+ }
+ return eval(input);
+ }
+
private short truncateShort(int width, short value) {
return (short) (value - (((value % width) + width) % width));
}
@@ -632,6 +690,11 @@ public interface Expression extends Serializable {
return null;
}
+ @Override
+ public DataType fieldReferenceType() {
+ return null;
+ }
+
@Override
public DataType outputType() {
return dataType;
@@ -641,6 +704,11 @@ public interface Expression extends Serializable {
public String eval(String input) {
return value;
}
+
+ @Override
+ public String eval(String input, DataType inputType) {
+ return value;
+ }
}
/** Get current timestamp. */
@@ -650,6 +718,11 @@ public interface Expression extends Serializable {
return null;
}
+ @Override
+ public DataType fieldReferenceType() {
+ return null;
+ }
+
@Override
public DataType outputType() {
return DataTypes.TIMESTAMP(3);
@@ -659,6 +732,11 @@ public interface Expression extends Serializable {
public String eval(String input) {
return DateTimeUtils.formatLocalDateTime(LocalDateTime.now(), 3);
}
+
+ @Override
+ public String eval(String input, DataType inputType) {
+ return eval(input);
+ }
}
/** Convert string to upper case. */
@@ -719,12 +797,14 @@ public interface Expression extends Serializable {
abstract class NoLiteralsStringExpressionBase implements Expression {
private final String fieldReference;
+ @Nullable protected DataType fieldReferenceType;
public NoLiteralsStringExpressionBase(
- String fieldReference, DataType fieldType, String... literals)
{
+ String fieldReference, @Nullable DataType fieldType, String...
literals) {
this.fieldReference = fieldReference;
+ this.fieldReferenceType = fieldType;
checkArgument(
- fieldType.getTypeRoot() == DataTypeRoot.VARCHAR,
+ fieldType == null || fieldType.getTypeRoot() ==
DataTypeRoot.VARCHAR,
String.format(
"'%s' expression only supports type root of '%s',
but found '%s'.",
name(), DataTypeRoot.VARCHAR,
fieldType.getTypeRoot()));
@@ -744,5 +824,23 @@ public interface Expression extends Serializable {
public String fieldReference() {
return fieldReference;
}
+
+ @Override
+ public DataType fieldReferenceType() {
+ return fieldReferenceType;
+ }
+
+ @Override
+ public String eval(String input, DataType inputType) {
+ if (this.fieldReferenceType == null) {
+ checkArgument(
+ inputType.getTypeRoot() == DataTypeRoot.VARCHAR,
+ String.format(
+ "'%s' expression only supports type root of
'%s', but found '%s'.",
+ name(), DataTypeRoot.VARCHAR,
inputType.getTypeRoot()));
+ this.fieldReferenceType = inputType;
+ }
+ return eval(input);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
index 1834444afa..8008cd7ca9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -103,9 +104,19 @@ public abstract class AbstractRecordParser
Map<String, String> rowData, CdcSchema.Builder schemaBuilder) {
computedColumns.forEach(
computedColumn -> {
- rowData.put(
- computedColumn.columnName(),
-
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
+ String result;
+ if (computedColumn.fieldReference() != null
+ && computedColumn.fieldReferenceType() == null) {
+ DataType inputType =
+
schemaBuilder.getFieldType(computedColumn.fieldReference());
+ result =
+ computedColumn.eval(
+
rowData.get(computedColumn.fieldReference()), inputType);
+ } else {
+ result =
computedColumn.eval(rowData.get(computedColumn.fieldReference()));
+ }
+
+ rowData.put(computedColumn.columnName(), result);
schemaBuilder.column(computedColumn.columnName(),
computedColumn.columnType());
});
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
index b5199d3a0b..7a615fdbc8 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
@@ -28,7 +28,9 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
@@ -100,7 +102,7 @@ public class CdcSchema implements Serializable {
/** A builder for constructing an immutable but still unresolved {@link
CdcSchema}. */
public static final class Builder {
- private final List<DataField> columns = new ArrayList<>();
+ private final Map<String, DataField> columns = new LinkedHashMap<>();
private List<String> primaryKeys = new ArrayList<>();
@@ -121,7 +123,7 @@ public class CdcSchema implements Serializable {
Preconditions.checkNotNull(dataField, "Data field must not be
null.");
Preconditions.checkNotNull(dataField.name(), "Column name must not
be null.");
Preconditions.checkNotNull(dataField.type(), "Data type must not
be null.");
- columns.add(dataField);
+ columns.put(dataField.name(), dataField);
return this;
}
@@ -148,7 +150,7 @@ public class CdcSchema implements Serializable {
int id = highestFieldId.incrementAndGet();
DataType reassignDataType = ReassignFieldId.reassign(dataType,
highestFieldId);
- columns.add(new DataField(id, columnName, reassignDataType,
description));
+ columns.put(columnName, new DataField(id, columnName,
reassignDataType, description));
return this;
}
@@ -179,9 +181,19 @@ public class CdcSchema implements Serializable {
return this;
}
+ /** Returns the data type of the specified field. */
+ public DataType getFieldType(String fieldName) {
+ DataField field = columns.get(fieldName);
+ if (field == null) {
+ throw new IllegalArgumentException("Field " + fieldName + "
not found in schema.");
+ }
+ return field.type();
+ }
+
/** Returns an instance of an unresolved {@link CdcSchema}. */
public CdcSchema build() {
- return new CdcSchema(columns, primaryKeys, comment);
+ List<DataField> fields = new ArrayList<>(columns.values());
+ return new CdcSchema(fields, primaryKeys, comment);
}
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
new file mode 100644
index 0000000000..5ab3f48817
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.sortComputedColumns;
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for ComputedColumnUtils. */
+public class ComputedColumnUtilsTest {
+ @Test
+ public void testSortComputedColumns() {
+ List<ComputedColumn> columns =
+ Arrays.asList(
+ new ComputedColumn("A", Expression.substring("B",
"1")),
+ new ComputedColumn("B",
Expression.substring("ExistedColumn", "1")),
+ new ComputedColumn("C", Expression.cast("No
Reference")),
+ new ComputedColumn("D", Expression.substring("A",
"1")),
+ new ComputedColumn("E", Expression.substring("C",
"1")));
+
+ List<ComputedColumn> sortedColumns = sortComputedColumns(columns);
+ assertEquals(
+ Arrays.asList("B", "C", "E", "A", "D"),
+ sortedColumns.stream()
+ .map(ComputedColumn::columnName)
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testCycleReference() {
+ List<ComputedColumn> columns =
+ Arrays.asList(
+ new ComputedColumn("A", Expression.substring("B",
"1")),
+ new ComputedColumn("B", Expression.substring("C",
"1")),
+ new ComputedColumn("C", Expression.substring("A",
"1")));
+
+ assertThrows(IllegalArgumentException.class, () ->
sortComputedColumns(columns));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 53daeebc25..5bd1ffbfa7 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.options.CatalogOptions;
@@ -31,6 +32,7 @@ import org.junit.jupiter.api.Timeout;
import javax.annotation.Nullable;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -660,7 +662,10 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
.withTableConfig(getBasicTableConfig())
.withPrimaryKeys("k")
.withComputedColumnArgs(
- Arrays.asList("etl_create_time=now()",
"etl_update_time=now()"))
+ Arrays.asList(
+ "etl_create_time=now()",
+ "etl_update_time=now()",
+
"pt=date_format(etl_update_time,yyyy-MM-dd)"))
.build();
runActionWithDefaultEnv(action);
@@ -675,15 +680,16 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
DataTypes.INT().notNull(),
DataTypes.VARCHAR(10),
DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(3)
+ DataTypes.TIMESTAMP(3),
+ DataTypes.STRING()
},
- new String[] {"k", "v1", "etl_create_time",
"etl_update_time"});
+ new String[] {"k", "v1", "etl_create_time",
"etl_update_time", "pt"});
// INSERT
waitForResult(
true,
Collections.singletonList(
- "\\+I\\[1, A,
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3},
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+ "\\+I\\[1, A,
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3},
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, \\d{4}-\\d{2}-\\d{2}\\]"),
table1,
rowType1,
Collections.singletonList("k"));
@@ -691,9 +697,13 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
List<InternalRow> data = getData("t1");
Timestamp createTime1 = data.get(0).getTimestamp(2, 3);
Timestamp updateTime1 = data.get(0).getTimestamp(3, 3);
+ BinaryString pt1 = data.get(0).getString(4);
+
+ DateTimeFormatter ptFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
assertThat(createTime1.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
assertThat(updateTime1.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+
assertThat(updateTime1.toLocalDateTime().format(ptFormatter)).isEqualTo(pt1.toString());
Thread.sleep(1000);
@@ -702,7 +712,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
waitForResult(
true,
Collections.singletonList(
- "\\+I\\[1, B,
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3},
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+ "\\+I\\[1, B,
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3},
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, \\d{4}-\\d{2}-\\d{2}\\]"),
table1,
rowType1,
Collections.singletonList("k"));
@@ -710,10 +720,12 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
data = getData("t1");
Timestamp createTime2 = data.get(0).getTimestamp(2, 3);
Timestamp updateTime2 = data.get(0).getTimestamp(3, 3);
+ BinaryString pt2 = data.get(0).getString(4);
assertThat(createTime2.toLocalDateTime()).isAfter(createTime1.toLocalDateTime());
assertThat(updateTime2.toLocalDateTime()).isAfter(updateTime1.toLocalDateTime());
assertThat(updateTime2.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+
assertThat(updateTime2.toLocalDateTime().format(ptFormatter)).isEqualTo(pt2.toString());
Thread.sleep(1000);
@@ -722,7 +734,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
waitForResult(
true,
Collections.singletonList(
- "\\+I\\[1, C,
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3},
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+ "\\+I\\[1, C,
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3},
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, \\d{4}-\\d{2}-\\d{2}\\]"),
table1,
rowType1,
Collections.singletonList("k"));
@@ -730,10 +742,12 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
data = getData("t1");
Timestamp createTime3 = data.get(0).getTimestamp(2, 3);
Timestamp updateTime3 = data.get(0).getTimestamp(3, 3);
+ BinaryString pt3 = data.get(0).getString(4);
assertThat(createTime3.toLocalDateTime()).isAfter(createTime1.toLocalDateTime());
assertThat(updateTime3.toLocalDateTime()).isAfter(updateTime2.toLocalDateTime());
assertThat(updateTime3.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+
assertThat(updateTime3.toLocalDateTime().format(ptFormatter)).isEqualTo(pt3.toString());
}
@Test