This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new a39959f4b [FLINK-35072][doris] Support applying AlterColumnTypeEvent
to Doris pipeline sink
a39959f4b is described below
commit a39959f4b3108d5337ff5f8d17f3e065f94b61b1
Author: yuxiqian <[email protected]>
AuthorDate: Tue Jul 30 17:46:49 2024 +0800
[FLINK-35072][doris] Support applying AlterColumnTypeEvent to Doris
pipeline sink
This closes #3473
---
.../flink-cdc-pipeline-connector-doris/pom.xml | 6 ++-
.../doris/sink/DorisMetadataApplier.java | 48 ++++++++++++++++------
.../doris/sink/DorisMetadataApplierITCase.java | 2 -
3 files changed, 40 insertions(+), 16 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
index b57906b04..8aa2644b3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
@@ -26,6 +26,10 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-connector-doris</artifactId>
<name>flink-cdc-pipeline-connector-doris</name>
+ <properties>
+ <doris.connector.version>1.6.2</doris.connector.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -44,7 +48,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-${flink.major.version}</artifactId>
- <version>1.6.0</version>
+ <version>${doris.connector.version}</version>
</dependency>
<dependency>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index 7e107eac1..ad025c629 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeChecks;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
@@ -79,6 +80,8 @@ public class DorisMetadataApplier implements MetadataApplier {
applyDropColumnEvent((DropColumnEvent) event);
} else if (event instanceof RenameColumnEvent) {
applyRenameColumnEvent((RenameColumnEvent) event);
+ } else if (event instanceof AlterColumnTypeEvent) {
+ applyAlterColumnTypeEvent((AlterColumnTypeEvent) event);
} else if (event instanceof AlterColumnTypeEvent) {
throw new RuntimeException("Unsupported schema change event, "
+ event);
}
@@ -146,26 +149,28 @@ public class DorisMetadataApplier implements
MetadataApplier {
return new ArrayList<>();
}
+ private String buildTypeString(DataType dataType) {
+ if (dataType instanceof LocalZonedTimestampType
+ || dataType instanceof TimestampType
+ || dataType instanceof ZonedTimestampType) {
+ int precision = DataTypeChecks.getPrecision(dataType);
+ return String.format("%s(%s)", "DATETIMEV2",
Math.min(Math.max(precision, 0), 6));
+ } else {
+ return
DorisTypeMapper.toDorisType(DataTypeUtils.toFlinkDataType(dataType));
+ }
+ }
+
private void applyAddColumnEvent(AddColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
List<AddColumnEvent.ColumnWithPosition> addedColumns =
event.getAddedColumns();
for (AddColumnEvent.ColumnWithPosition col : addedColumns) {
Column column = col.getAddColumn();
- String typeString;
- if (column.getType() instanceof LocalZonedTimestampType
- || column.getType() instanceof TimestampType
- || column.getType() instanceof ZonedTimestampType) {
- int precision = DataTypeChecks.getPrecision(column.getType());
- typeString =
- String.format("%s(%s)", "DATETIMEV2",
Math.min(Math.max(precision, 0), 6));
- } else {
- typeString =
- DorisTypeMapper.toDorisType(
-
DataTypeUtils.toFlinkDataType(column.getType()));
- }
FieldSchema addFieldSchema =
- new FieldSchema(column.getName(), typeString,
column.getComment());
+ new FieldSchema(
+ column.getName(),
+ buildTypeString(column.getType()),
+ column.getComment());
schemaChangeManager.addColumn(
tableId.getSchemaName(), tableId.getTableName(),
addFieldSchema);
}
@@ -192,4 +197,21 @@ public class DorisMetadataApplier implements
MetadataApplier {
entry.getValue());
}
}
+
+ private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event)
+ throws IOException, IllegalArgumentException {
+ TableId tableId = event.tableId();
+ Map<String, DataType> typeMapping = event.getTypeMapping();
+
+ for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
+ schemaChangeManager.modifyColumnDataType(
+ tableId.getSchemaName(),
+ tableId.getTableName(),
+ new FieldSchema(
+ entry.getKey(),
+ buildTypeString(entry.getValue()),
+ null)); // Currently, AlterColumnTypeEvent carries
no comment info. This
+ // will be fixed after FLINK-35243 got merged.
+ }
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
index 0d3c0d99c..fbaf26744 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
@@ -45,7 +45,6 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -367,7 +366,6 @@ public class DorisMetadataApplierITCase extends
DorisSinkTestBase {
}
@Test
- @Ignore("AlterColumnType is yet to be supported until we close
FLINK-35072.")
public void testDorisAlterColumnType() throws Exception {
TableId tableId =
TableId.tableId(