EricJoy2048 commented on code in PR #3388:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3388#discussion_r1019766421


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class KafkaSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Kafka";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(Config.TOPIC, 
Config.BOOTSTRAP_SERVERS).build();

Review Comment:
   You may lost the option options.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java:
##########
@@ -17,92 +17,101 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.config;
 
-public class Config {
-    /**
-     * The topic of kafka.
-     */
-    public static final String TOPIC = "topic";
-
-    /**
-     * The topic of kafka is java pattern or list.
-     */
-    public static final String PATTERN = "pattern";
-
-    /**
-     * The server address of kafka cluster.
-     */
-    public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
-
-    public static final String KAFKA_CONFIG_PREFIX = "kafka.";
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
 
-    /**
-     * consumer group of kafka client consume message.
-     */
-    public static final String CONSUMER_GROUP = "consumer.group";
-
-    /**
-     * consumer offset will be periodically committed in the background.
-     */
-    public static final String COMMIT_ON_CHECKPOINT = "commit_on_checkpoint";
-
-    /**
-     * The prefix of kafka's transactionId, make sure different job use 
different prefix.
-     */
-    public static final String TRANSACTION_PREFIX = "transaction_prefix";
-
-    /**
-     * User-defined schema
-     */
-    public static final String SCHEMA = "schema";
+import java.util.List;
 
-    /**
-     * data format
-     */
-    public static final String FORMAT = "format";
+public class Config {
 
     /**
      * The default data format is JSON
      */
     public static final String DEFAULT_FORMAT = "json";
 
-    /**
-     * field delimiter
-     */
-    public static final String FIELD_DELIMITER = "field_delimiter";
-
     /**
      * The default field delimiter is “,”
      */
     public static final String DEFAULT_FIELD_DELIMITER = ",";
 
     /**
-     * Send information according to the specified partition.
+     * Kafka config prefix
      */
-    public static final String PARTITION = "partition";
-
-    /**
-     * Determine the partition to send based on the content of the message.
-     */
-    public static final String ASSIGN_PARTITIONS = "assign_partitions";
-
-    /**
-     * Determine the key of the kafka send partition
-     */
-    public static final String PARTITION_KEY = "partition_key";
-
-    /**
-     * The initial consumption pattern of consumers
-     */
-    public static final String  START_MODE = "start_mode";
-
-    /**
-     * The time required for consumption mode to be timestamp
-     */
-    public static final String  START_MODE_TIMESTAMP  = "start_mode.timestamp";
+    public static final String KAFKA_CONFIG_PREFIX = "kafka.";
 
-    /**
-     * The offset required for consumption mode to be specific_offsets
-     */
-    public static final String  START_MODE_OFFSETS  = "start_mode.offsets";
+    public static final Option<String> TOPIC = Options.key("topic")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The topic of kafka");
+
+    public static final Option<Boolean> PATTERN = Options.key("pattern")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("The topic of kafka is java pattern or list");

Review Comment:
   Please add the full description of this option reference 
https://seatunnel.apache.org/docs/2.3.0-beta/connector-v2/source/kafka#pattern-boolean.
   
   And other options is same to this.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class KafkaSourceFactory implements TableSourceFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "Kafka";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(Config.TOPIC, 
Config.BOOTSTRAP_SERVERS).build();

Review Comment:
   You lost the option options.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to