This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 885a0591a [FLINK-39204][pipeline-connector/fluss] Fluss yaml sink
support add column at last
885a0591a is described below
commit 885a0591a23e3ccc20c45d1ad638bec577e0d2a5
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Mar 6 18:55:43 2026 +0800
[FLINK-39204][pipeline-connector/fluss] Fluss yaml sink support add column
at last
This closes #4305.
---
.../fluss/sink/FlussEventSerializationSchema.java | 55 ++++--
.../fluss/sink/FlussMetaDataApplier.java | 73 ++++---
.../fluss/sink/row/{ => row}/CdcAsFlussRow.java | 0
.../connectors/fluss/utils/FlussConversions.java | 30 ++-
.../cdc/connectors/fluss/FlussPipelineITCase.java | 211 ++++++++++++++++++---
.../sink/FlussEventSerializationSchemaTest.java | 76 +++++++-
.../fluss/sink/FlussMetadataApplierTest.java | 91 ++++++++-
.../connectors/fluss/sink/v2/FlussSinkITCase.java | 43 ++++-
.../fluss/utils/FlussConversionsTest.java | 12 +-
9 files changed, 488 insertions(+), 103 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
index 086da5894..c8ac822c2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
@@ -17,13 +17,16 @@
package org.apache.flink.cdc.connectors.fluss.sink;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEventSerializer;
@@ -43,19 +46,19 @@ import java.util.Map;
import static
org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.APPEND;
import static
org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.DELETE;
import static
org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.UPSERT;
-import static
org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue;
+import static
org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.sameSchemaIgnoreCommentAndDefaultValue;
import static
org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussSchema;
/** Serialization schema that converts a CDC data record to a Fluss event. */
public class FlussEventSerializationSchema implements
FlussEventSerializer<Event> {
private static final long serialVersionUID = 1L;
- private transient Map<TableId, TableSchemaInfo> tableInfoMap;
+ private transient Map<TableId, TableSchemaInfo> schemaMaps;
private transient Connection connection;
@Override
public void open(Connection connection) {
- this.tableInfoMap = new HashMap<>();
+ this.schemaMaps = new HashMap<>();
this.connection = connection;
}
@@ -82,29 +85,45 @@ public class FlussEventSerializationSchema implements
FlussEventSerializer<Event
org.apache.flink.cdc.common.schema.Schema newSchema =
((CreateTableEvent) event).getSchema();
// if the table is not exist or the schema is changed, update the
table info.
- if (!tableInfoMap.containsKey(tableId)
- || !sameCdcColumnsIgnoreCommentAndDefaultValue(
- tableInfoMap.get(tableId).upstreamCdcSchema,
newSchema)) {
+ if (!schemaMaps.containsKey(tableId)
+ || !sameSchemaIgnoreCommentAndDefaultValue(
+ schemaMaps.get(tableId).upstreamCdcSchema,
newSchema)) {
Table table = connection.getTable(getTablePath(tableId));
TableSchemaInfo newSchemaInfo =
new TableSchemaInfo(newSchema,
table.getTableInfo().getSchema());
- tableInfoMap.put(tableId, newSchemaInfo);
+ schemaMaps.put(tableId, newSchemaInfo);
+ }
+ } else if (event instanceof AddColumnEvent) {
+ TableSchemaInfo schemaInfo = schemaMaps.get(event.tableId());
+ if (schemaInfo == null) {
+ throw new IllegalStateException(
+ "Cannot apply AddColumnEvent for table "
+ + event.tableId()
+ + ": table schema not found. Ensure
CreateTableEvent is processed before AddColumnEvent.");
+ }
+ Schema schema = schemaInfo.upstreamCdcSchema;
+ if (!SchemaUtils.isSchemaChangeEventRedundant(schema, event)) {
+ Table table = connection.getTable(getTablePath(tableId));
+ TableSchemaInfo newSchemaInfo =
+ new TableSchemaInfo(
+ SchemaUtils.applySchemaChangeEvent(schema,
event),
+ table.getTableInfo().getSchema());
+ schemaMaps.put(tableId, newSchemaInfo);
}
} else {
- // TODO: Logics for altering tables are not supported yet.
- // This is anticipated to be supported in Fluss version 0.8.0.
- throw new RuntimeException(
- "Schema change type not supported. Only CreateTableEvent
is allowed at the moment.");
+ throw new UnsupportedOperationException(
+ String.format(
+ "Schema change type %s not supported. Only
CreateTableEvent and AddColumnEvent are allowed at the moment.",
+ event.getClass()));
}
}
private FlussRowWithOp applyDataChangeEvent(DataChangeEvent record) {
OperationType op = record.op();
- TableSchemaInfo tableSchemaInfo = tableInfoMap.get(record.tableId());
+ TableSchemaInfo tableSchemaInfo = schemaMaps.get(record.tableId());
Preconditions.checkNotNull(
tableSchemaInfo, "Table schema not found for table " +
record.tableId());
- int flussFieldCount =
-
tableSchemaInfo.downStreamFlusstreamSchema.getRowType().getFieldCount();
+ int flussFieldCount =
tableSchemaInfo.downstreamFlussSchema.getRowType().getFieldCount();
boolean hasPrimaryKey =
!tableSchemaInfo.upstreamCdcSchema.primaryKeys().isEmpty();
switch (op) {
case INSERT:
@@ -130,17 +149,17 @@ public class FlussEventSerializationSchema implements
FlussEventSerializer<Event
private static class TableSchemaInfo {
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema;
- org.apache.fluss.metadata.Schema downStreamFlusstreamSchema;
+ org.apache.fluss.metadata.Schema downstreamFlussSchema;
Map<Integer, Integer> indexMapping;
private TableSchemaInfo(
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema,
- org.apache.fluss.metadata.Schema downStreamFlusstreamSchema) {
+ org.apache.fluss.metadata.Schema downstreamFlussSchema) {
this.upstreamCdcSchema = upstreamCdcSchema;
- this.downStreamFlusstreamSchema = downStreamFlusstreamSchema;
+ this.downstreamFlussSchema = downstreamFlussSchema;
this.indexMapping =
sanityCheckAndGenerateIndexMapping(
- toFlussSchema(upstreamCdcSchema),
downStreamFlusstreamSchema);
+ toFlussSchema(upstreamCdcSchema),
downstreamFlussSchema);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
index 4e28623e2..e2cda4bd0 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
@@ -17,12 +17,14 @@
package org.apache.flink.cdc.connectors.fluss.sink;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.table.api.ValidationException;
@@ -31,12 +33,14 @@ import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -47,6 +51,7 @@ import java.util.stream.Collectors;
import static
org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static
org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static
org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussTable;
+import static
org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussType;
/** {@link MetadataApplier} for fluss. */
public class FlussMetaDataApplier implements MetadataApplier {
@@ -58,9 +63,6 @@ public class FlussMetaDataApplier implements MetadataApplier {
private Set<SchemaChangeEventType> enabledEventTypes =
new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));
- private transient Connection connection;
- private transient Admin admin;
-
public FlussMetaDataApplier(
Configuration flussClientConfig,
Map<String, String> tableProperties,
@@ -92,22 +94,25 @@ public class FlussMetaDataApplier implements
MetadataApplier {
@Override
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
LOG.info("fluss metadata applier receive schemaChangeEvent {}",
schemaChangeEvent);
- Admin admin = getAdmin();
if (schemaChangeEvent instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent)
schemaChangeEvent;
- applyCreateTable(admin, createTableEvent);
+ applyCreateTable(createTableEvent);
} else if (schemaChangeEvent instanceof DropTableEvent) {
DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
- applyDropTable(admin, dropTableEvent);
+ applyDropTable(dropTableEvent);
+ } else if (schemaChangeEvent instanceof AddColumnEvent) {
+ AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent;
+ applyAddColumnTable(addColumnEvent);
} else {
throw new IllegalArgumentException(
- "fluss metadata applier only support CreateTableEvent now
but receives "
+ "fluss metadata applier only supports CreateTableEvent and
AddColumnEvent now but receives "
+ schemaChangeEvent);
}
}
- private void applyCreateTable(Admin admin, CreateTableEvent event) {
- try {
+ private void applyCreateTable(CreateTableEvent event) {
+ try (Connection connection =
ConnectionFactory.createConnection(flussClientConfig);
+ Admin admin = connection.getAdmin()) {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(),
tableId.getTableName());
String tableIdentifier = tablePath.getDatabaseName() + "." +
tablePath.getTableName();
@@ -129,8 +134,9 @@ public class FlussMetaDataApplier implements
MetadataApplier {
}
}
- private void applyDropTable(Admin admin, DropTableEvent event) {
- try {
+ private void applyDropTable(DropTableEvent event) {
+ try (Connection connection =
ConnectionFactory.createConnection(flussClientConfig);
+ Admin admin = connection.getAdmin()) {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(),
tableId.getTableName());
admin.dropTable(tablePath, true).get();
@@ -140,21 +146,36 @@ public class FlussMetaDataApplier implements
MetadataApplier {
}
}
- private Admin getAdmin() {
- if (connection == null) {
- connection = ConnectionFactory.createConnection(flussClientConfig);
- admin = connection.getAdmin();
- }
- return admin;
- }
-
- @Override
- public void close() throws Exception {
- if (admin != null) {
- admin.close();
- }
- if (connection != null) {
- connection.close();
+ private void applyAddColumnTable(AddColumnEvent event) {
+ List<TableChange> tableChanges = new ArrayList<>();
+ event.getAddedColumns()
+ .forEach(
+ columnWithPosition -> {
+ if (columnWithPosition.getPosition()
+ != AddColumnEvent.ColumnPosition.LAST) {
+ throw new IllegalArgumentException(
+ "Fluss metadata applier only supports
LAST position for adding columns now but receives "
+ +
columnWithPosition.getPosition()
+ + ". Consider using
'schema.change.behavior' configuration with 'LENIENT' mode to handle schema
changes more flexibly.");
+ }
+
+ Column column = columnWithPosition.getAddColumn();
+ tableChanges.add(
+ TableChange.addColumn(
+ column.getName(),
+ toFlussType(column.getType()),
+ column.getComment(),
+
TableChange.ColumnPosition.last()));
+ });
+
+ try (Connection connection =
ConnectionFactory.createConnection(flussClientConfig);
+ Admin admin = connection.getAdmin()) {
+ TableId tableId = event.tableId();
+ TablePath tablePath = new TablePath(tableId.getSchemaName(),
tableId.getTableName());
+ admin.alterTable(tablePath, tableChanges, true).get();
+ } catch (Exception e) {
+ LOG.error("Failed to apply schema change {}", event, e);
+ throw new RuntimeException(e);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/row/CdcAsFlussRow.java
similarity index 100%
rename from
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java
rename to
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/row/CdcAsFlussRow.java
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
index 25a9f53b6..6858c3e65 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
@@ -41,7 +41,6 @@ import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.util.CollectionUtil;
-import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableDescriptor;
@@ -94,28 +93,25 @@ public class FlussConversions {
schemBuilder.primaryKey(cdcSchema.primaryKeys());
}
- Schema schema =
- schemBuilder
- .fromColumns(
- cdcSchema.getColumns().stream()
- .map(
- column ->
- new Schema.Column(
-
column.getName(),
-
toFlussType(column.getType()),
-
column.getComment()))
- .collect(Collectors.toList()))
- .build();
- return schema;
+ // use schemBuilder.column rather than schemBuilder.fromColumns to
reassign nested row id.
+ cdcSchema
+ .getColumns()
+ .forEach(
+ column ->
+ schemBuilder
+ .column(
+ column.getName(),
+
column.getType().accept(TO_FLUSS_TYPE_INSTANCE))
+ .withComment(column.getComment()));
+ return schemBuilder.build();
}
- @VisibleForTesting
- private static org.apache.fluss.types.DataType toFlussType(
+ public static org.apache.fluss.types.DataType toFlussType(
org.apache.flink.cdc.common.types.DataType flinkDataType) {
return flinkDataType.accept(TO_FLUSS_TYPE_INSTANCE);
}
- public static Boolean sameCdcColumnsIgnoreCommentAndDefaultValue(
+ public static Boolean sameSchemaIgnoreCommentAndDefaultValue(
org.apache.flink.cdc.common.schema.Schema oldSchema,
org.apache.flink.cdc.common.schema.Schema newSchema) {
List<org.apache.flink.cdc.common.schema.Column> upstreamColumns =
oldSchema.getColumns();
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
index bfc71a1ad..602b32680 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
@@ -19,12 +19,13 @@ package org.apache.flink.cdc.connectors.fluss;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
-import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
@@ -64,6 +65,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
+import static org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior.EVOLVE;
+import static org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior.IGNORE;
+import static
org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior.LENIENT;
import static
org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
import static
org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2;
import static
org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
@@ -233,7 +238,7 @@ public class FlussPipelineITCase {
eventOfSplits.add(Collections.singletonList(updateEvent));
eventOfSplits.add(Collections.singletonList(deleteEvent));
- composeAndExecute(eventOfSplits);
+ composeAndExecuteInEvolveMode(eventOfSplits);
checkResult(TABLE_1, Arrays.asList("+I[2, b2]", "+I[3, c]"));
}
@@ -279,16 +284,114 @@ public class FlussPipelineITCase {
split1.add(insertEvent3);
eventOfSplits.add(split1);
- composeAndExecute(eventOfSplits);
+ composeAndExecuteInEvolveMode(eventOfSplits);
checkResult(TABLE_1, Arrays.asList("+I[1, a]", "+I[2, b]", "+I[3,
c]"));
}
@Test
- void testSingleLogTableWithSchemaChange() {
- assertThatThrownBy(() ->
composeAndExecute(ValuesDataSourceHelper.singleSplitSingleTable()))
+ void testSingleLogTableWithNotSupportedSchemaChange() throws Exception {
+ assertThatThrownBy(
+ () ->
+ composeAndExecuteInEvolveMode(
+
ValuesDataSourceHelper.singleSplitSingleTable()))
.rootCause()
.hasMessageContaining(
- "fluss metadata applier only support CreateTableEvent
now but receives AddColumnEvent");
+ "fluss metadata applier only supports CreateTableEvent
and AddColumnEvent now but receives RenameColumnEvent");
+ }
+
+ @Test
+ void testSingleLogTableInLenientMode() throws Exception {
+ // test add/drop/rename column in lenient mode
+ composeAndExecute(
+ ValuesDataSourceHelper.singleSplitSingleTable(),
+ new Configuration().set(PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
LENIENT));
+ checkResult(
+ TABLE_1, Arrays.asList("+I[2, null, null, null, x]", "+I[3, 3,
null, null, null]"));
+ }
+
+ @Test
+ void testSingleLogTableInIgnoreMode() throws Exception {
+ // test add/drop/rename column in ignore mode
+ composeAndExecute(
+ ValuesDataSourceHelper.singleSplitSingleTable(),
+ new Configuration().set(PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
IGNORE));
+ checkResult(TABLE_1, Arrays.asList("+I[2, null]", "+I[3, 3]"));
+ }
+
+ @Test
+ void testSingleLogTableWithAddColumn() throws Exception {
+ List<List<Event>> eventOfSplits = new ArrayList<>();
+ List<Event> split1 = new ArrayList<>();
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1,
schema);
+ split1.add(createTableEvent);
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ new BinaryRecordDataGenerator(
+ RowType.of(DataTypes.STRING(),
DataTypes.STRING()))
+ .generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("a")
+ }));
+ split1.add(insertEvent1);
+
+ // add column at last.
+ split1.add(
+ new AddColumnEvent(
+ TABLE_1,
+ Collections.singletonList(
+ AddColumnEvent.last(
+ Column.physicalColumn("newColumn",
DataTypes.STRING())))));
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ new BinaryRecordDataGenerator(
+ RowType.of(
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()))
+ .generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("b"),
+ BinaryStringData.fromString("bb")
+ }));
+ split1.add(insertEvent2);
+ split1.add(
+ new AddColumnEvent(
+ TABLE_1,
+ Collections.singletonList(
+ AddColumnEvent.last(
+ Column.physicalColumn("newColumn2",
DataTypes.STRING())))));
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ new BinaryRecordDataGenerator(
+ RowType.of(
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()))
+ .generate(
+ new Object[] {
+ BinaryStringData.fromString("3"),
+ BinaryStringData.fromString("c"),
+ BinaryStringData.fromString("cc"),
+ BinaryStringData.fromString("ccc")
+ }));
+ split1.add(insertEvent3);
+ eventOfSplits.add(split1);
+
+ composeAndExecuteInEvolveMode(eventOfSplits);
+ checkResult(
+ TABLE_1,
+ Arrays.asList("+I[1, a, null, null]", "+I[2, b, bb, null]",
"+I[3, c, cc, ccc]"));
}
@Test
@@ -298,7 +401,8 @@ public class FlussPipelineITCase {
composeAndExecute(
ValuesDataSourceHelper.singleSplitSingleTable(),
Collections.singletonMap(
- BOOTSTRAP_SERVERS.key(),
getBootstrapServers())))
+ BOOTSTRAP_SERVERS.key(),
getBootstrapServers()),
+ new Configuration()))
.rootCause()
.hasMessageContaining(
"The connection has not completed authentication yet.
This may be caused by a missing or incorrect configuration of
'client.security.protocol' on the client side.");
@@ -318,7 +422,8 @@ public class FlussPipelineITCase {
() ->
composeAndExecute(
ValuesDataSourceHelper.singleSplitSingleTable(),
- sinkOption))
+ sinkOption,
+ new Configuration()))
.rootCause()
.hasMessageContaining("'table.non-key' is not a recognized
Fluss table property");
}
@@ -398,7 +503,7 @@ public class FlussPipelineITCase {
split1.add(insertTabl2Event3);
eventOfSplits.add(split1);
- composeAndExecute(eventOfSplits);
+ composeAndExecuteInEvolveMode(eventOfSplits);
checkResult(TABLE_1, Arrays.asList("+I[1, 1]", "+I[2, 2]", "+I[3,
3]"));
checkResult(TABLE_2, Arrays.asList("+I[1, 1]", "+I[2, 2]", "+I[3,
3]"));
}
@@ -438,7 +543,7 @@ public class FlussPipelineITCase {
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("a"),
- BinaryStringData.fromString("1")
+ BinaryStringData.fromString("aa")
}));
split1.add(insertEvent1);
DataChangeEvent insertEvent2 =
@@ -447,14 +552,39 @@ public class FlussPipelineITCase {
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
- BinaryStringData.fromString("2"),
- BinaryStringData.fromString("2")
+ BinaryStringData.fromString("b"),
+ BinaryStringData.fromString("bb")
}));
split1.add(insertEvent2);
eventOfSplits.add(split1);
- composeAndExecute(eventOfSplits);
- checkResult(TABLE_1, Arrays.asList("+I[a, 1]", "+I[2, 2]"));
+ // add column at last
+ split1.add(
+ new AddColumnEvent(
+ TABLE_1,
+ Collections.singletonList(
+ AddColumnEvent.last(
+ Column.physicalColumn("newColumn",
DataTypes.STRING())))));
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ new BinaryRecordDataGenerator(
+ RowType.of(
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()))
+ .generate(
+ new Object[] {
+ BinaryStringData.fromString("3"),
+ BinaryStringData.fromString("c"),
+ BinaryStringData.fromString("cc"),
+ BinaryStringData.fromString("ccc")
+ }));
+ split1.add(insertEvent3);
+
+ composeAndExecuteInEvolveMode(eventOfSplits);
+ checkResult(TABLE_1, Arrays.asList("+I[a, 1, null]", "+I[b, 2, null]",
"+I[c, 3, ccc]"));
}
@Test
@@ -499,27 +629,64 @@ public class FlussPipelineITCase {
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
- BinaryStringData.fromString("2")
+ BinaryStringData.fromString("b")
}));
split1.add(insertEvent2);
eventOfSplits.add(split1);
- composeAndExecute(eventOfSplits);
- checkResult(TABLE_1, Arrays.asList("+I[1, a, null]", "+I[2, 2,
null]"));
+ // add column at last
+ split1.add(
+ new AddColumnEvent(
+ TABLE_1,
+ Collections.singletonList(
+ AddColumnEvent.last(
+ Column.physicalColumn("newColumn",
DataTypes.STRING())))));
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ new BinaryRecordDataGenerator(
+ RowType.of(
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()))
+ .generate(
+ new Object[] {
+ BinaryStringData.fromString("3"),
+ BinaryStringData.fromString("c"),
+ BinaryStringData.fromString("cc")
+ }));
+ split1.add(insertEvent3);
+
+ composeAndExecuteInEvolveMode(eventOfSplits);
+ checkResult(
+ TABLE_1,
+ Arrays.asList(
+ "+I[1, a, null, null]", "+I[2, b, null, null]", "+I[3,
c, null, cc]"));
}
- private void composeAndExecute(List<List<Event>> customSourceEvents)
throws Exception {
+ private void composeAndExecuteInEvolveMode(List<List<Event>>
customSourceEvents)
+ throws Exception {
+ composeAndExecute(
+ customSourceEvents,
+ new Configuration().set(PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
EVOLVE));
+ }
+
+ private void composeAndExecute(
+ List<List<Event>> customSourceEvents, Configuration
pipelineConfig) throws Exception {
Map<String, String> sinkOption = new HashMap<>();
sinkOption.put(BOOTSTRAP_SERVERS.key(), getBootstrapServers());
sinkOption.put("properties.client.security.protocol", "sasl");
sinkOption.put("properties.client.security.sasl.mechanism", "PLAIN");
sinkOption.put("properties.client.security.sasl.username", "guest");
sinkOption.put("properties.client.security.sasl.password",
"password2");
- composeAndExecute(customSourceEvents, sinkOption);
+ composeAndExecute(customSourceEvents, sinkOption, pipelineConfig);
}
private void composeAndExecute(
- List<List<Event>> customSourceEvents, Map<String, String>
sinkOption) throws Exception {
+ List<List<Event>> customSourceEvents,
+ Map<String, String> sinkOption,
+ Configuration pipelineConfig)
+ throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
@@ -535,10 +702,8 @@ public class FlussPipelineITCase {
SinkDef sinkDef = new SinkDef("fluss", "Fluss Sink",
Configuration.fromMap(sinkOption));
// Setup pipeline
- Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 4);
- pipelineConfig.set(
- PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ pipelineConfig.addAll(pipelineConfig);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
index 1af59aa97..663cb0830 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
@@ -23,12 +23,15 @@ import
org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.BooleanType;
import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DateType;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.FloatType;
@@ -97,7 +100,7 @@ public class FlussEventSerializationSchemaTest {
}
@Test
- void testMixedSchemaAndDataChanges() throws Exception {
+ void testSchemaAndDataChangesInMultiTables() throws Exception {
// 1. create table1, and insert/delete/update data
TableId table1 = TableId.parse("test.tbl1");
Schema schema1 =
@@ -223,6 +226,77 @@ public class FlussEventSerializationSchemaTest {
Objects.requireNonNull(serializer.serialize(deleteEvent2)));
}
+ @Test
+ void testAddColumn() throws Exception {
+ // 1. create table1, and insert/delete/update data
+ TableId table1 = TableId.parse("test.tbl1");
+ Schema schemaBefore =
+ Schema.newBuilder()
+ .physicalColumn("col1", new IntType())
+ .physicalColumn("col2", new BooleanType())
+ .physicalColumn("col3", new TimestampType())
+ .primaryKey("col1")
+ .build();
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ table1,
+ Collections.singletonList(
+ AddColumnEvent.last(
+ Column.physicalColumn("newColumn",
DataTypes.STRING()))));
+ Schema schemaAfter =
+ Schema.newBuilder()
+ .physicalColumn("col1", new IntType())
+ .physicalColumn("col2", new BooleanType())
+ .physicalColumn("col3", new TimestampType())
+ .physicalColumn("newColumn", new VarCharType())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent1 = new CreateTableEvent(table1,
schemaBefore);
+ flussMetaDataApplier.applySchemaChange(createTableEvent1);
+ verifySchemaChangeEvent(table1,
serializer.serialize(createTableEvent1));
+
+ BinaryRecordDataGenerator generator1 =
+ new BinaryRecordDataGenerator(
+ schemaBefore.getColumnDataTypes().toArray(new
DataType[0]));
+
+ RecordData record =
+ generator1.generate(
+ new Object[] {
+ 1,
+ true,
+ TimestampData.fromMillis(
+ Timestamp.valueOf("2023-11-27
18:00:00").getTime())
+ });
+ DataChangeEvent insertEvent1 = DataChangeEvent.insertEvent(table1,
record);
+
+ verifyDataChangeEvent(
+ table1,
+ new FlussRowWithOp(CdcAsFlussRow.replace(record),
FlussOperationType.UPSERT),
+ serializer.serialize(insertEvent1));
+
+ // 2. add column at last.
+ flussMetaDataApplier.applySchemaChange(addColumnEvent);
+ verifySchemaChangeEvent(table1, serializer.serialize(addColumnEvent));
+ BinaryRecordDataGenerator generator2 =
+ new BinaryRecordDataGenerator(
+ schemaAfter.getColumnDataTypes().toArray(new
DataType[0]));
+ RecordData newRecord =
+ generator2.generate(
+ new Object[] {
+ 3,
+ false,
+ TimestampData.fromMillis(
+ Timestamp.valueOf("2023-11-27
20:00:00").getTime()),
+ new BinaryStringData("insert new column")
+ });
+ DataChangeEvent updateEvent1 = DataChangeEvent.updateEvent(table1,
record, newRecord);
+
+ verifyDataChangeEvent(
+ table1,
+ new FlussRowWithOp(CdcAsFlussRow.replace(newRecord),
FlussOperationType.UPSERT),
+ serializer.serialize(updateEvent1));
+ }
+
private void verifySchemaChangeEvent(TableId tableId, FlussEvent
flussEvent) throws Exception {
verifySerializeResult(
new FlussEvent(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
index c95aa7868..2ff64765c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
@@ -17,9 +17,11 @@
package org.apache.flink.cdc.connectors.fluss.sink;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.IntType;
@@ -48,6 +50,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR;
+import static org.apache.fluss.types.DataTypeChecks.equalsWithFieldId;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -105,6 +108,9 @@ public class FlussMetadataApplierTest {
"time_col",
"timestamp_col",
"timestamp_ltz_col",
+ "array_col",
+ "map_col",
+ "row_col"
};
org.apache.flink.cdc.common.types.DataType[] cdcDataTypes =
@@ -126,7 +132,12 @@ public class FlussMetadataApplierTest {
DataTypes.DATE(),
DataTypes.TIME(),
DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP_LTZ(6)
+ DataTypes.TIMESTAMP_LTZ(6),
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("age", DataTypes.INT()))
};
org.apache.fluss.types.DataType[] flussDataTypes =
@@ -150,7 +161,16 @@ public class FlussMetadataApplierTest {
org.apache.fluss.types.DataTypes.DATE(),
org.apache.fluss.types.DataTypes.TIME(),
org.apache.fluss.types.DataTypes.TIMESTAMP(3),
- org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6)
+ org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6),
+
org.apache.fluss.types.DataTypes.ARRAY(org.apache.fluss.types.DataTypes.INT()),
+ org.apache.fluss.types.DataTypes.MAP(
+ org.apache.fluss.types.DataTypes.STRING(),
+ org.apache.fluss.types.DataTypes.INT()),
+ org.apache.fluss.types.DataTypes.ROW(
+ org.apache.fluss.types.DataTypes.FIELD(
+ "name",
org.apache.fluss.types.DataTypes.STRING()),
+ org.apache.fluss.types.DataTypes.FIELD(
+ "age",
org.apache.fluss.types.DataTypes.INT()))
};
try (FlussMetaDataApplier applier =
@@ -183,6 +203,21 @@ public class FlussMetadataApplierTest {
if (primaryKeyTable) {
assertThat(tableInfo.getPrimaryKeys()).containsExactly("int_col");
}
+
+ // check field of nested row.
+ assertThat(
+ equalsWithFieldId(
+
flussRowType.getTypeAt(flussRowType.getFieldCount() - 1),
+ org.apache.fluss.types.DataTypes.ROW(
+
org.apache.fluss.types.DataTypes.FIELD(
+ "name",
+
org.apache.fluss.types.DataTypes.STRING(),
+
flussRowType.getFieldCount()),
+
org.apache.fluss.types.DataTypes.FIELD(
+ "age",
+
org.apache.fluss.types.DataTypes.INT(),
+
flussRowType.getFieldCount() + 1))))
+ .isTrue();
}
}
@@ -457,6 +492,58 @@ public class FlussMetadataApplierTest {
}
}
+ @Test
+ void testAddColumnAtLast() throws Exception {
+ TablePath tablePath = new TablePath("fluss", "add_column_table");
+ TableId tableId = TableId.tableId("default_namespace", "fluss",
"add_column_table");
+ admin.createTable(
+ tablePath,
+ TableDescriptor.builder()
+ .schema(
+
org.apache.fluss.metadata.Schema.newBuilder()
+ .column(
+ "id",
+
org.apache.fluss.types.DataTypes.INT())
+ .column(
+ "name",
+
org.apache.fluss.types.DataTypes.INT())
+ .build())
+ .build(),
+ true)
+ .get();
+
+ Column oldColumn = Column.physicalColumn("name", DataTypes.STRING());
+ Column newColumn = Column.physicalColumn("newColumn",
DataTypes.STRING());
+
+ try (FlussMetaDataApplier applier =
+ new FlussMetaDataApplier(
+ FLUSS_CLUSTER_EXTENSION.getClientConfig(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap())) {
+ assertThatThrownBy(
+ () ->
+ applier.applySchemaChange(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+
AddColumnEvent.last(oldColumn)))))
+ .rootCause()
+ .hasMessageContaining("Column name already exists.");
+ applier.applySchemaChange(
+ new AddColumnEvent(
+ tableId,
Collections.singletonList(AddColumnEvent.last(newColumn))));
+ TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+ assertThat(tableInfo.getSchema())
+ .isEqualTo(
+ org.apache.fluss.metadata.Schema.newBuilder()
+ .column("id",
org.apache.fluss.types.DataTypes.INT())
+ .column("name",
org.apache.fluss.types.DataTypes.INT())
+ .column("newColumn",
org.apache.fluss.types.DataTypes.STRING())
+ .build());
+ }
+ }
+
@Test
void testRecreateTableWithDifferentSchema() throws Exception {
TableId tableId = TableId.tableId("default_namespace", DATABASE_NAME,
"table1");
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
index 1cd80c6a6..2e408e40a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
@@ -17,9 +17,11 @@
package org.apache.flink.cdc.connectors.fluss.sink.v2;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -31,6 +33,7 @@ import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import
org.apache.flink.cdc.connectors.fluss.sink.FlussEventSerializationSchema;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
@@ -147,7 +150,10 @@ public class FlussSinkITCase extends AbstractTestBase {
+ " date_type DATE,\n"
+ " time_type TIME,\n"
+ " timestamp_type TIMESTAMP,\n"
- + " timestamp_ltz_type
TIMESTAMP_LTZ(8)\n"
+ + " timestamp_ltz_type
TIMESTAMP_LTZ(8),\n"
+ + " array_type ARRAY<INT>,\n"
+ + " map_type MAP<STRING, INT>,\n"
+ + " row_type ROW< f0 INT, f1 STRING
>\n"
+ (primaryKeyTable
? " ,PRIMARY KEY (int_type)
NOT ENFORCED \n"
: "")
@@ -171,7 +177,10 @@ public class FlussSinkITCase extends AbstractTestBase {
"date_type",
"time_type",
"timestamp_type",
- "timestamp_ltz_type"
+ "timestamp_ltz_type",
+ "array_type",
+ "map_type",
+ "row_type"
};
String[] pkFieldNames = primaryKeyTable ? new String[] {"int_type"} :
new String[0];
@@ -191,9 +200,20 @@ public class FlussSinkITCase extends AbstractTestBase {
DataTypes.DATE(),
DataTypes.TIME(),
DataTypes.TIMESTAMP(),
- DataTypes.TIMESTAMP_LTZ(8)
+ DataTypes.TIMESTAMP_LTZ(8),
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.STRING()))
};
+ // Currently, AbstractBinaryWriter.writeRecord only support
BinaryRecordData but not
+ // GenericRecordData
+ RecordData nestedRowData =
+ new BinaryRecordDataGenerator(new DataType[] {DataTypes.INT(),
DataTypes.STRING()})
+ .generate(new Object[] {1,
BinaryStringData.fromString("hello")});
+
Object[][] insertedValues =
new Object[][] {
new Object[] {
@@ -216,22 +236,26 @@ public class FlussSinkITCase extends AbstractTestBase {
LocalZonedTimestampData.fromInstant(
LocalDateTime.of(2023, 11, 11, 11, 11, 11, 11)
.atZone(ZoneId.of("GMT+05:00"))
- .toInstant())
+ .toInstant()),
+ new GenericArrayData(new Object[] {1, 2, 3}),
+ new GenericMapData(
+
Collections.singletonMap(BinaryStringData.fromString("key"), 123)),
+ nestedRowData
},
new Object[] {
null, null, null, null, null, null, null, 0, null,
null, null, null, null,
- null, null
+ null, null, null, null, null
}
};
// default timezone is asian/shanghai
List<String> expectedRows =
Arrays.asList(
String.format(
- "+I[a, test character, test text, false,
8119.21, 1, 32767, 32768, 652482, 20.2007, 8.58965, 2023-11-12, 08:30:15, %s,
2023-11-11T06:11:11.000000011Z]",
+ "+I[a, test character, test text, false,
8119.21, 1, 32767, 32768, 652482, 20.2007, 8.58965, 2023-11-12, 08:30:15, %s,
2023-11-11T06:11:11.000000011Z, [1, 2, 3], {key=123}, +I[1, hello]]",
primaryKeyTable
? "2023-11-11T11:11:11.000000011"
: "2023-11-11T11:11:11"),
- "+I[null, null, null, null, null, null, null, 0, null,
null, null, null, null, null, null]");
+ "+I[null, null, null, null, null, null, null, 0, null,
null, null, null, null, null, null, null, null, null]");
testInsertSingleTable(
tableId,
@@ -576,8 +600,7 @@ public class FlussSinkITCase extends AbstractTestBase {
StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
- DataStreamSource<Event> source =
- environment.fromData(events, TypeInformation.of(Event.class));
+ DataStreamSource<Event> source = environment.fromData(events, new
EventTypeInfo());
FlussEventSerializationSchema flussRecordSerializer = new
FlussEventSerializationSchema();
FlussSink<Event> flussSink =
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
index eddaeb6ff..d4e1e9a33 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
@@ -294,7 +294,7 @@ class FlussConversionsTest {
.physicalColumn("name", DataTypes.STRING())
.build();
-
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1,
schema2))
.isTrue();
}
@@ -312,7 +312,7 @@ class FlussConversionsTest {
.physicalColumn("age", DataTypes.INT())
.build();
-
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1,
schema2))
.isFalse();
}
@@ -330,7 +330,7 @@ class FlussConversionsTest {
.physicalColumn("name", DataTypes.STRING())
.build();
-
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1,
schema2))
.isFalse();
}
@@ -344,7 +344,7 @@ class FlussConversionsTest {
Schema schema2 = Schema.newBuilder().physicalColumn("id",
DataTypes.INT()).build();
-
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1,
schema2))
.isFalse();
}
@@ -363,7 +363,7 @@ class FlussConversionsTest {
.build();
// Should be true because comments are ignored
-
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1,
schema2))
.isTrue();
}
@@ -382,7 +382,7 @@ class FlussConversionsTest {
.physicalColumn("name", DataTypes.STRING())
.build();
-
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1,
schema2))
.isTrue();
}
}