This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new e65e583  [Fix] Fix some bugs in commit offset to kafka, intercepting 
non-schema changes, and intercepting truncate table (#16)
e65e583 is described below

commit e65e583e6329e6be6ffe75c35e06ec22f7cc2c6e
Author: wudongliang <[email protected]>
AuthorDate: Sat May 11 11:12:35 2024 +0800

    [Fix] Fix some bugs in commit offset to kafka, intercepting non-schema 
changes, and intercepting truncate table (#16)
---
 .../writer/schema/DebeziumSchemaChange.java        | 83 ++++++++++++++++------
 1 file changed, 60 insertions(+), 23 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
index fc0823b..1eeab0e 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
@@ -21,6 +21,7 @@ package org.apache.doris.kafka.connector.writer.schema;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.debezium.data.Envelope;
+import io.debezium.util.Strings;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -45,6 +46,9 @@ import org.slf4j.LoggerFactory;
 
 public class DebeziumSchemaChange extends DorisWriter {
     private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumSchemaChange.class);
+    public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
+    public static final String TABLE_CHANGES = "tableChanges";
+    public static final String TABLE_CHANGES_TYPE = "type";
     private final Map<String, String> topic2TableMap;
     private SchemaChangeManager schemaChangeManager;
     private DorisSystemService dorisSystemService;
@@ -80,36 +84,67 @@ public class DebeziumSchemaChange extends DorisWriter {
 
     @Override
     public void insert(SinkRecord record) {
+        if (!validate(record)) {
+            processedOffset.set(record.kafkaOffset());
+            return;
+        }
         schemaChange(record);
     }
 
-    @Override
-    public void commit(int partition) {
-        // do nothing
-    }
+    private boolean validate(final SinkRecord record) {
+        if (!isSchemaChange(record)) {
+            LOG.warn(
+                    "Current topic={}, the message does not contain schema 
change change information, please check schema.topic",
+                    dorisOptions.getSchemaTopic());
+            throw new SchemaChangeException(
+                    "The message does not contain schema change change 
information, please check schema.topic");
+        }
 
-    private void schemaChange(final SinkRecord record) {
-        String tableName = resolveTableName(record);
+        tableName = resolveTableName(record);
         if (tableName == null) {
             LOG.warn(
                     "Ignored to write record from topic '{}' partition '{}' 
offset '{}'. No resolvable table name",
                     record.topic(),
                     record.kafkaPartition(),
                     record.kafkaOffset());
-            processedOffset.set(record.kafkaOffset());
-            return;
+            return false;
         }
+
+        if (!sinkTableSet.contains(tableName)) {
+            LOG.warn(
+                    "The "
+                            + tableName
+                            + " is not defined and requires synchronized data. 
If you need to synchronize the table data, please configure it in 
'doris.topic2table.map'");
+            return false;
+        }
+
         Struct recordStruct = (Struct) (record.value());
-        List<Object> tableChanges = recordStruct.getArray("tableChanges");
+        if (isTruncate(recordStruct)) {
+            LOG.warn("Truncate {} table is not supported", tableName);
+            return false;
+        }
+
+        List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES);
         Struct tableChange = (Struct) tableChanges.get(0);
-        if ("DROP".equalsIgnoreCase(tableChange.getString("type"))
-                || "CREATE".equalsIgnoreCase(tableChange.getString("type"))) {
+        if ("DROP".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))
+                || 
"CREATE".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))) {
             LOG.warn(
                     "CREATE and DROP {} tables are currently not supported. 
Please create or drop them manually.",
                     tableName);
-            processedOffset.set(record.kafkaOffset());
-            return;
+            return false;
         }
+        return true;
+    }
+
+    @Override
+    public void commit(int partition) {
+        // do nothing
+    }
+
+    private void schemaChange(final SinkRecord record) {
+        Struct recordStruct = (Struct) (record.value());
+        List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES);
+        Struct tableChange = (Struct) tableChanges.get(0);
         RecordDescriptor recordDescriptor =
                 RecordDescriptor.builder()
                         .withSinkRecord(record)
@@ -118,6 +153,17 @@ public class DebeziumSchemaChange extends DorisWriter {
         tableChange(tableName, recordDescriptor);
     }
 
+    private boolean isTruncate(final Struct record) {
+        // Generally the truncate corresponding tableChanges is empty
+        return record.getArray(TABLE_CHANGES).isEmpty();
+    }
+
+    private static boolean isSchemaChange(SinkRecord record) {
+        return record.valueSchema() != null
+                && !Strings.isNullOrEmpty(record.valueSchema().name())
+                && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
+    }
+
     private String resolveTableName(SinkRecord record) {
         if (isTombstone(record)) {
             LOG.warn(
@@ -170,15 +216,6 @@ public class DebeziumSchemaChange extends DorisWriter {
     }
 
     private void tableChange(String tableName, RecordDescriptor 
recordDescriptor) {
-        if (!sinkTableSet.contains(tableName)) {
-            processedOffset.set(recordDescriptor.getOffset());
-            LOG.warn(
-                    "The "
-                            + tableName
-                            + " is not defined and requires synchronized data. 
If you need to synchronize the table data, please configure it in 
'doris.topic2table.map'");
-            return;
-        }
-
         if (!hasTable(tableName)) {
             // TODO Table does not exist, automatically created it.
             LOG.error("{} Table does not exist, please create manually.", 
tableName);
@@ -223,7 +260,7 @@ public class DebeziumSchemaChange extends DorisWriter {
 
     public long getOffset() {
         committedOffset.set(processedOffset.get());
-        return committedOffset.get();
+        return committedOffset.get() + 1;
     }
 
     private boolean isTombstone(SinkRecord record) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to