This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b39063241f [INLONG-8382][Sort] Provide unsupported operation for ddl 
that is not parseable (#8383)
b39063241f is described below

commit b39063241f4f94895e9ecb0a08ae00c515aa7f40
Author: Sting <[email protected]>
AuthorDate: Fri Jun 30 17:22:45 2023 +0800

    [INLONG-8382][Sort] Provide unsupported operation for ddl that is not 
parseable (#8383)
---
 .../sort/protocol/ddl/operations/Operation.java    |  3 +-
 .../{Operation.java => UnsupportedOperation.java}  | 35 ++++++++--------------
 .../sort/cdc/mysql/utils/OperationUtils.java       |  7 +++--
 .../org/apache/inlong/sort/cdc/TestOperation.java  | 11 +++++++
 4 files changed, 29 insertions(+), 27 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
index 6e6fe21715..1b7fc4d4c5 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
@@ -34,7 +34,8 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
         @JsonSubTypes.Type(value = CreateTableOperation.class, name = 
"createTableOperation"),
         @JsonSubTypes.Type(value = DropTableOperation.class, name = 
"dropTableOperation"),
         @JsonSubTypes.Type(value = TruncateTableOperation.class, name = 
"truncateTableOperation"),
-        @JsonSubTypes.Type(value = RenameTableOperation.class, name = 
"renameTableOperation")
+        @JsonSubTypes.Type(value = RenameTableOperation.class, name = 
"renameTableOperation"),
+        @JsonSubTypes.Type(value = UnsupportedOperation.class, name = 
"unsupportedOperation")
 })
 @Data
 @NoArgsConstructor
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/UnsupportedOperation.java
similarity index 54%
copy from 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/UnsupportedOperation.java
index 6e6fe21715..b320eb9e24 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/UnsupportedOperation.java
@@ -20,31 +20,20 @@ package org.apache.inlong.sort.protocol.ddl.operations;
 import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
 
 import lombok.Data;
-import lombok.NoArgsConstructor;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import lombok.EqualsAndHashCode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 
-/**
- * Operation represents a ddl operation.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
-@JsonSubTypes({
-        @JsonSubTypes.Type(value = AlterOperation.class, name = 
"alterOperation"),
-        @JsonSubTypes.Type(value = CreateTableOperation.class, name = 
"createTableOperation"),
-        @JsonSubTypes.Type(value = DropTableOperation.class, name = 
"dropTableOperation"),
-        @JsonSubTypes.Type(value = TruncateTableOperation.class, name = 
"truncateTableOperation"),
-        @JsonSubTypes.Type(value = RenameTableOperation.class, name = 
"renameTableOperation")
-})
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("unsupportedOperation")
+@JsonInclude(Include.NON_NULL)
 @Data
-@NoArgsConstructor
-public abstract class Operation {
-
-    @JsonProperty("operationType")
-    private OperationType operationType;
+public class UnsupportedOperation extends Operation {
 
-    public Operation(@JsonProperty("operationType") OperationType type) {
-        this.operationType = type;
+    @JsonCreator
+    public UnsupportedOperation() {
+        super(OperationType.OTHER);
     }
-
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
index e87201383e..028a98c912 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
@@ -29,6 +29,7 @@ import 
org.apache.inlong.sort.protocol.ddl.operations.DropTableOperation;
 import org.apache.inlong.sort.protocol.ddl.operations.Operation;
 import org.apache.inlong.sort.protocol.ddl.operations.RenameTableOperation;
 import org.apache.inlong.sort.protocol.ddl.operations.TruncateTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.UnsupportedOperation;
 
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.statement.Statement;
@@ -89,12 +90,12 @@ public class OperationUtils {
             } else if (statement instanceof RenameTableStatement) {
                 return new RenameTableOperation();
             } else {
-                LOG.warn("doesn't support sql {}, statement {}", sql, 
statement);
+                LOG.error("doesn't support sql {}, statement {}", sql, 
statement);
             }
         } catch (Exception e) {
-            LOG.error("parse ddl error: {}, set ddl to null", sql, e);
+            LOG.error("parse ddl in sql {} error", sql, e);
         }
-        return null;
+        return new UnsupportedOperation();
     }
 
     /**
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
index 158d464d1e..baf3de8f68 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
@@ -24,6 +24,7 @@ import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
 import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
 import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
 import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.ddl.operations.UnsupportedOperation;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -84,4 +85,14 @@ public class TestOperation {
         Assert.assertEquals(alterColumn.getOldColumn().getName(), "b");
     }
 
+    @Test
+    public void testUnsupportedOperation() {
+        String sql = "alter table a drop key 'b'";
+        HashMap<String, Integer> sqlType = new HashMap<>();
+        Operation operation = OperationUtils.generateOperation(sql, sqlType);
+        assert operation != null;
+        Assert.assertTrue(operation instanceof UnsupportedOperation);
+        Assert.assertEquals(operation.getOperationType(), OperationType.OTHER);
+    }
+
 }

Reply via email to