TaoZex commented on code in PR #3388:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3388#discussion_r1019912762


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java:
##########
@@ -17,92 +17,109 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.config;
 
-public class Config {
-    /**
-     * The topic of kafka.
-     */
-    public static final String TOPIC = "topic";
-
-    /**
-     * The topic of kafka is java pattern or list.
-     */
-    public static final String PATTERN = "pattern";
-
-    /**
-     * The server address of kafka cluster.
-     */
-    public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
-
-    public static final String KAFKA_CONFIG_PREFIX = "kafka.";
-
-    /**
-     * consumer group of kafka client consume message.
-     */
-    public static final String CONSUMER_GROUP = "consumer.group";
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
 
-    /**
-     * consumer offset will be periodically committed in the background.
-     */
-    public static final String COMMIT_ON_CHECKPOINT = "commit_on_checkpoint";
+import java.util.List;
 
-    /**
-     * The prefix of kafka's transactionId, make sure different job use 
different prefix.
-     */
-    public static final String TRANSACTION_PREFIX = "transaction_prefix";
-
-    /**
-     * User-defined schema
-     */
-    public static final String SCHEMA = "schema";
-
-    /**
-     * data format
-     */
-    public static final String FORMAT = "format";
+public class Config {
 
     /**
      * The default data format is JSON
      */
     public static final String DEFAULT_FORMAT = "json";
 
-    /**
-     * field delimiter
-     */
-    public static final String FIELD_DELIMITER = "field_delimiter";
-
     /**
      * The default field delimiter is “,”
      */
     public static final String DEFAULT_FIELD_DELIMITER = ",";
 
-    /**
-     * Send information according to the specified partition.
-     */
-    public static final String PARTITION = "partition";
-
-    /**
-     * Determine the partition to send based on the content of the message.
-     */
-    public static final String ASSIGN_PARTITIONS = "assign_partitions";
-
-    /**
-     * Determine the key of the kafka send partition
-     */
-    public static final String PARTITION_KEY = "partition_key";
-
-    /**
-     * The initial consumption pattern of consumers
-     */
-    public static final String  START_MODE = "start_mode";
-
-    /**
-     * The time required for consumption mode to be timestamp
-     */
-    public static final String  START_MODE_TIMESTAMP  = "start_mode.timestamp";
-
-    /**
-     * The offset required for consumption mode to be specific_offsets
-     */
-    public static final String  START_MODE_OFFSETS  = "start_mode.offsets";
+    public static final Option<String> KAFKA_CONFIG_PREFIX = 
Options.key("kafka.")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("In addition to the above parameters that must be 
specified by the Kafka producer or consumer client, " +
+                    "the user can also specify multiple non-mandatory 
parameters for the producer or consumer client, " +
+                    "covering all the producer parameters specified in the 
official Kafka document.");
+
+    public static final Option<String> TOPIC = Options.key("topic")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Kafka topic name. If there are multiple topics, 
use , to split, for example: \"tpc1,tpc2\".");
+
+    public static final Option<Boolean> PATTERN = Options.key("pattern")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("If pattern is set to true,the regular expression 
for a pattern of topic names to read from." +
+                    " All topics in clients with names that match the 
specified regular expression will be subscribed by the consumer.");
+
+    public static final Option<String> BOOTSTRAP_SERVERS = 
Options.key("bootstrap.servers")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Kafka cluster address, separated by \",\".");
+
+    public static final Option<String> CONSUMER_GROUP = 
Options.key("consumer.group")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Kafka consumer group id, used to distinguish 
different consumer groups.");
+
+    public static final Option<Boolean> COMMIT_ON_CHECKPOINT = 
Options.key("commit_on_checkpoint")
+            .booleanType()
+            .defaultValue(true)
+            .withDescription("If true the consumer's offset will be 
periodically committed in the background.");
+
+    public static final Option<String> TRANSACTION_PREFIX = 
Options.key("transaction_prefix")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("If semantic is specified as EXACTLY_ONCE, the 
producer will write all messages in a Kafka transaction. " +
+                    "Kafka distinguishes different transactions by different 
transactionId. " +
+                    "This parameter is prefix of kafka transactionId, make 
sure different job use different prefix.");
+
+    public static final Option<Config> SCHEMA = Options.key("schema")
+            .objectType(Config.class)
+            .noDefaultValue()
+            .withDescription("The structure of the data, including field names 
and field types.");
+
+    public static final Option<String> FORMAT = Options.key("format")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Data format. The default format is json. 
Optional text format. The default field separator is \", \". " +
+                    "If you customize the delimiter, add the 
\"field_delimiter\" option.");
+
+    public static final Option<String> FIELD_DELIMITER = 
Options.key("field_delimiter")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Customize the field delimiter for data format.");
+
+    public static final Option<Integer> PARTITION = Options.key("partition")
+            .intType()
+            .noDefaultValue()
+            .withDescription("We can specify the partition, all messages will 
be sent to this partition.");
+
+    public static final Option<List<String>> ASSIGN_PARTITIONS = 
Options.key("assign_partitions")
+            .listType()
+            .noDefaultValue()
+            .withDescription("We can decide which partition to send based on 
the content of the message. " +
+                    "The function of this parameter is to distribute 
information.");
+
+    public static final Option<String> PARTITION_KEY = 
Options.key("partition_key")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Configure which field is used as the key of the 
kafka message.");
+
+    public static final Option<String> START_MODE = Options.key("start_mode")
+            .stringType()

Review Comment:
   Thanks. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to