This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a39959f4b [FLINK-35072][doris] Support applying AlterColumnTypeEvent 
to Doris pipeline sink
a39959f4b is described below

commit a39959f4b3108d5337ff5f8d17f3e065f94b61b1
Author: yuxiqian <[email protected]>
AuthorDate: Tue Jul 30 17:46:49 2024 +0800

    [FLINK-35072][doris] Support applying AlterColumnTypeEvent to Doris 
pipeline sink
    
    This closes  #3473
---
 .../flink-cdc-pipeline-connector-doris/pom.xml     |  6 ++-
 .../doris/sink/DorisMetadataApplier.java           | 48 ++++++++++++++++------
 .../doris/sink/DorisMetadataApplierITCase.java     |  2 -
 3 files changed, 40 insertions(+), 16 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
index b57906b04..8aa2644b3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
@@ -26,6 +26,10 @@ limitations under the License.
     <artifactId>flink-cdc-pipeline-connector-doris</artifactId>
     <name>flink-cdc-pipeline-connector-doris</name>
 
+    <properties>
+        <doris.connector.version>1.6.2</doris.connector.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -44,7 +48,7 @@ limitations under the License.
         <dependency>
             <groupId>org.apache.doris</groupId>
             
<artifactId>flink-doris-connector-${flink.major.version}</artifactId>
-            <version>1.6.0</version>
+            <version>${doris.connector.version}</version>
         </dependency>
 
         <dependency>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index 7e107eac1..ad025c629 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypeChecks;
 import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
 import org.apache.flink.cdc.common.types.TimestampType;
@@ -79,6 +80,8 @@ public class DorisMetadataApplier implements MetadataApplier {
                 applyDropColumnEvent((DropColumnEvent) event);
             } else if (event instanceof RenameColumnEvent) {
                 applyRenameColumnEvent((RenameColumnEvent) event);
+            } else if (event instanceof AlterColumnTypeEvent) {
+                applyAlterColumnTypeEvent((AlterColumnTypeEvent) event);
             } else if (event instanceof AlterColumnTypeEvent) {
                 throw new RuntimeException("Unsupported schema change event, " 
+ event);
             }
@@ -146,26 +149,28 @@ public class DorisMetadataApplier implements 
MetadataApplier {
         return new ArrayList<>();
     }
 
+    private String buildTypeString(DataType dataType) {
+        if (dataType instanceof LocalZonedTimestampType
+                || dataType instanceof TimestampType
+                || dataType instanceof ZonedTimestampType) {
+            int precision = DataTypeChecks.getPrecision(dataType);
+            return String.format("%s(%s)", "DATETIMEV2", 
Math.min(Math.max(precision, 0), 6));
+        } else {
+            return 
DorisTypeMapper.toDorisType(DataTypeUtils.toFlinkDataType(dataType));
+        }
+    }
+
     private void applyAddColumnEvent(AddColumnEvent event)
             throws IOException, IllegalArgumentException {
         TableId tableId = event.tableId();
         List<AddColumnEvent.ColumnWithPosition> addedColumns = 
event.getAddedColumns();
         for (AddColumnEvent.ColumnWithPosition col : addedColumns) {
             Column column = col.getAddColumn();
-            String typeString;
-            if (column.getType() instanceof LocalZonedTimestampType
-                    || column.getType() instanceof TimestampType
-                    || column.getType() instanceof ZonedTimestampType) {
-                int precision = DataTypeChecks.getPrecision(column.getType());
-                typeString =
-                        String.format("%s(%s)", "DATETIMEV2", 
Math.min(Math.max(precision, 0), 6));
-            } else {
-                typeString =
-                        DorisTypeMapper.toDorisType(
-                                
DataTypeUtils.toFlinkDataType(column.getType()));
-            }
             FieldSchema addFieldSchema =
-                    new FieldSchema(column.getName(), typeString, 
column.getComment());
+                    new FieldSchema(
+                            column.getName(),
+                            buildTypeString(column.getType()),
+                            column.getComment());
             schemaChangeManager.addColumn(
                     tableId.getSchemaName(), tableId.getTableName(), 
addFieldSchema);
         }
@@ -192,4 +197,21 @@ public class DorisMetadataApplier implements 
MetadataApplier {
                     entry.getValue());
         }
     }
+
+    private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event)
+            throws IOException, IllegalArgumentException {
+        TableId tableId = event.tableId();
+        Map<String, DataType> typeMapping = event.getTypeMapping();
+
+        for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
+            schemaChangeManager.modifyColumnDataType(
+                    tableId.getSchemaName(),
+                    tableId.getTableName(),
+                    new FieldSchema(
+                            entry.getKey(),
+                            buildTypeString(entry.getValue()),
+                            null)); // Currently, AlterColumnTypeEvent carries 
no comment info. This
+            // will be fixed after FLINK-35243 got merged.
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
index 0d3c0d99c..fbaf26744 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
@@ -45,7 +45,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -367,7 +366,6 @@ public class DorisMetadataApplierITCase extends 
DorisSinkTestBase {
     }
 
     @Test
-    @Ignore("AlterColumnType is yet to be supported until we close 
FLINK-35072.")
     public void testDorisAlterColumnType() throws Exception {
         TableId tableId =
                 TableId.tableId(

Reply via email to