This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b39063241f [INLONG-8382][Sort] Provide unsupported operation for ddl
that is not parseable (#8383)
b39063241f is described below
commit b39063241f4f94895e9ecb0a08ae00c515aa7f40
Author: Sting <[email protected]>
AuthorDate: Fri Jun 30 17:22:45 2023 +0800
[INLONG-8382][Sort] Provide unsupported operation for ddl that is not
parseable (#8383)
---
.../sort/protocol/ddl/operations/Operation.java | 3 +-
.../{Operation.java => UnsupportedOperation.java} | 35 ++++++++--------------
.../sort/cdc/mysql/utils/OperationUtils.java | 7 +++--
.../org/apache/inlong/sort/cdc/TestOperation.java | 11 +++++++
4 files changed, 29 insertions(+), 27 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
index 6e6fe21715..1b7fc4d4c5 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
@@ -34,7 +34,8 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
@JsonSubTypes.Type(value = CreateTableOperation.class, name =
"createTableOperation"),
@JsonSubTypes.Type(value = DropTableOperation.class, name =
"dropTableOperation"),
@JsonSubTypes.Type(value = TruncateTableOperation.class, name =
"truncateTableOperation"),
- @JsonSubTypes.Type(value = RenameTableOperation.class, name =
"renameTableOperation")
+ @JsonSubTypes.Type(value = RenameTableOperation.class, name =
"renameTableOperation"),
+ @JsonSubTypes.Type(value = UnsupportedOperation.class, name =
"unsupportedOperation")
})
@Data
@NoArgsConstructor
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/UnsupportedOperation.java
similarity index 54%
copy from
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/UnsupportedOperation.java
index 6e6fe21715..b320eb9e24 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/UnsupportedOperation.java
@@ -20,31 +20,20 @@ package org.apache.inlong.sort.protocol.ddl.operations;
import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
import lombok.Data;
-import lombok.NoArgsConstructor;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import lombok.EqualsAndHashCode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-/**
- * Operation represents a ddl operation.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
-@JsonSubTypes({
- @JsonSubTypes.Type(value = AlterOperation.class, name =
"alterOperation"),
- @JsonSubTypes.Type(value = CreateTableOperation.class, name =
"createTableOperation"),
- @JsonSubTypes.Type(value = DropTableOperation.class, name =
"dropTableOperation"),
- @JsonSubTypes.Type(value = TruncateTableOperation.class, name =
"truncateTableOperation"),
- @JsonSubTypes.Type(value = RenameTableOperation.class, name =
"renameTableOperation")
-})
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("unsupportedOperation")
+@JsonInclude(Include.NON_NULL)
@Data
-@NoArgsConstructor
-public abstract class Operation {
-
- @JsonProperty("operationType")
- private OperationType operationType;
+public class UnsupportedOperation extends Operation {
- public Operation(@JsonProperty("operationType") OperationType type) {
- this.operationType = type;
+ @JsonCreator
+ public UnsupportedOperation() {
+ super(OperationType.OTHER);
}
-
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
index e87201383e..028a98c912 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
@@ -29,6 +29,7 @@ import
org.apache.inlong.sort.protocol.ddl.operations.DropTableOperation;
import org.apache.inlong.sort.protocol.ddl.operations.Operation;
import org.apache.inlong.sort.protocol.ddl.operations.RenameTableOperation;
import org.apache.inlong.sort.protocol.ddl.operations.TruncateTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.UnsupportedOperation;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
@@ -89,12 +90,12 @@ public class OperationUtils {
} else if (statement instanceof RenameTableStatement) {
return new RenameTableOperation();
} else {
- LOG.warn("doesn't support sql {}, statement {}", sql,
statement);
+ LOG.error("doesn't support sql {}, statement {}", sql,
statement);
}
} catch (Exception e) {
- LOG.error("parse ddl error: {}, set ddl to null", sql, e);
+ LOG.error("parse ddl in sql {} error", sql, e);
}
- return null;
+ return new UnsupportedOperation();
}
/**
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
index 158d464d1e..baf3de8f68 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
@@ -24,6 +24,7 @@ import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.ddl.operations.UnsupportedOperation;
import org.junit.Assert;
import org.junit.Test;
@@ -84,4 +85,14 @@ public class TestOperation {
Assert.assertEquals(alterColumn.getOldColumn().getName(), "b");
}
+ @Test
+ public void testUnsupportedOperation() {
+ String sql = "alter table a drop key 'b'";
+ HashMap<String, Integer> sqlType = new HashMap<>();
+ Operation operation = OperationUtils.generateOperation(sql, sqlType);
+ assert operation != null;
+ Assert.assertTrue(operation instanceof UnsupportedOperation);
+ Assert.assertEquals(operation.getOperationType(), OperationType.OTHER);
+ }
+
}