This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 711685673 [INLONG-7299][Sort] Optimize the options check for the Kafka 
connector (#7471)
711685673 is described below

commit 7116856732e142037fbb7056005c8513a6b2455f
Author: Charles Zhang <[email protected]>
AuthorDate: Wed Mar 1 14:34:29 2023 +0800

    [INLONG-7299][Sort] Optimize the options check for the Kafka connector 
(#7471)
---
 .../protocol/node/extract/KafkaExtractNode.java    | 72 ++++------------------
 .../sort/kafka/table/KafkaDynamicTableFactory.java |  7 ++-
 2 files changed, 16 insertions(+), 63 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 6a0501759..9713a83fc 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.sort.protocol.node.extract;
 
 import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.Map.Entry;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.commons.lang3.StringUtils;
@@ -35,14 +33,7 @@ import org.apache.inlong.sort.protocol.Metadata;
 import org.apache.inlong.sort.protocol.constant.KafkaConstant;
 import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.format.AvroFormat;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.CsvFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
-import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
-import org.apache.inlong.sort.protocol.node.format.JsonFormat;
-import org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
 
 import javax.annotation.Nonnull;
@@ -155,63 +146,22 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
         Map<String, String> options = super.tableOptions();
         options.put(KafkaConstant.TOPIC, topic);
         options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, 
bootstrapServers);
-
-        boolean wrapWithInlongMsg = format instanceof InLongMsgFormat;
-        Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat) 
format).getInnerFormat() : format;
-        if (realFormat instanceof JsonFormat
-                || realFormat instanceof AvroFormat
-                || realFormat instanceof CsvFormat) {
-            if (StringUtils.isEmpty(this.primaryKey)) {
-                options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
-                options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
-                if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
-                    options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, 
scanSpecificOffsets);
-                }
-                if (StringUtils.isNotBlank(scanTimestampMillis)) {
-                    options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, 
scanTimestampMillis);
-                }
-                
options.putAll(delegateInlongFormat(realFormat.generateOptions(false), 
wrapWithInlongMsg));
-            } else {
-                options.put(KafkaConstant.CONNECTOR, 
KafkaConstant.UPSERT_KAFKA);
-                
options.putAll(delegateInlongFormat(realFormat.generateOptions(true), 
wrapWithInlongMsg));
-            }
-        } else if (realFormat instanceof CanalJsonFormat
-                || realFormat instanceof DebeziumJsonFormat
-                || realFormat instanceof RawFormat) {
+        options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
+        if (StringUtils.isEmpty(this.primaryKey)) {
             options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
-            options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
-            if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
-                options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, 
scanSpecificOffsets);
-            }
-            if (StringUtils.isNotBlank(scanTimestampMillis)) {
-                options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, 
scanTimestampMillis);
-            }
-            
options.putAll(delegateInlongFormat(realFormat.generateOptions(false), 
wrapWithInlongMsg));
+            options.putAll(format.generateOptions(false));
         } else {
-            throw new IllegalArgumentException("kafka extract node format is 
IllegalArgument");
+            options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
+            options.putAll(format.generateOptions(true));
         }
-        if (StringUtils.isNotEmpty(groupId)) {
-            options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId);
+        if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
+            options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, 
scanSpecificOffsets);
         }
-        return options;
-    }
-
-    private Map<String, String> delegateInlongFormat(
-            Map<String, String> realOptions,
-            boolean wrapWithInlongMsg) {
-        if (!wrapWithInlongMsg) {
-            return realOptions;
+        if (StringUtils.isNotBlank(scanTimestampMillis)) {
+            options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, 
scanTimestampMillis);
         }
-        Map<String, String> options = new HashMap<>();
-        for (Entry<String, String> entry : realOptions.entrySet()) {
-            String key = entry.getKey();
-            String value = entry.getValue();
-            if ("format".equals(key)) {
-                options.put("format", "inlong-msg");
-                options.put("inlong-msg.inner.format", value);
-            } else {
-                options.put("inlong-msg." + key, value);
-            }
+        if (StringUtils.isNotEmpty(groupId)) {
+            options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId);
         }
         return options;
     }
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 0db49fd17..d92d5cfed 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -326,10 +326,13 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat =
                 getValueDecodingFormat(helper);
 
-        helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX);
+        final String valueFormatPrefix = tableOptions.getOptional(FORMAT)
+                .orElse(tableOptions.get(VALUE_FORMAT));
 
+        // Validate the option data type.
+        helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX, 
valueFormatPrefix);
+        // Validate the option values.
         validateTableSourceOptions(tableOptions);
-
         validatePKConstraints(
                 context.getObjectIdentifier(), context.getCatalogTable(), 
valueDecodingFormat);
 

Reply via email to