EricJoy2048 commented on code in PR #5437:
URL: https://github.com/apache/seatunnel/pull/5437#discussion_r1337138396


##########
docs/en/connector-v2/sink/Kudu.md:
##########
@@ -2,51 +2,122 @@
 
 > Kudu sink connector
 
-## Description
+## Support Those Engines
 
-Write data to Kudu.
-
-The tested kudu version is 1.11.1.
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
 
 ## Key features
 
+- [x] [batch](../../concept/connector-v2-features.md)

Review Comment:
   It seems that you have added the features of Source to Sink's documentation



##########
docs/en/connector-v2/sink/Kudu.md:
##########
@@ -2,51 +2,122 @@
 
 > Kudu sink connector
 
-## Description
+## Support Those Engines
 
-Write data to Kudu.
-
-The tested kudu version is 1.11.1.
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
 
 ## Key features
 
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
-
-## Options
-
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| kudu_master    | string | yes      | -             |
-| kudu_table     | string | yes      | -             |
-| save_mode      | string | yes      | -             |
-| common-options |        | no       | -             |
-
-### kudu_master [string]
-
-`kudu_master`  The address of kudu master,such as '192.168.88.110:7051'.
-
-### kudu_table [string]
-
-`kudu_table` The name of kudu table..
-
-### save_mode [string]
-
-Storage mode, we need support `overwrite` and `append`. `append` is now 
supported.
-
-### common options
-
-Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
-
-## Example
-
-```bash
-
- kudu {
-      kudu_master = "192.168.88.110:7051"
-      kudu_table = "studentlyhresultflink"
-      save_mode="append"
-   }
-
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Data Type Mapping
+
+| SeaTunnel Data type |      kudu Data type      |
+|---------------------|--------------------------|
+| BOOLEAN             | BOOL                     |
+| INT                 | INT8<br/>INT16<br/>INT32 |
+| BIGINT              | INT64                    |
+| DECIMAL             | DECIMAL                  |
+| FLOAT               | FLOAT                    |
+| DOUBLE              | DOUBLE                   |
+| STRING              | STRING                   |
+| TIMESTAMP           | UNIXTIME_MICROS          |
+| BYTES               | BINARY                   |
+
+## Sink Options

Review Comment:
   Format have some error:
   



##########
docs/en/connector-v2/sink/Kudu.md:
##########
@@ -2,51 +2,122 @@
 
 > Kudu sink connector
 
-## Description
+## Support Those Engines
 
-Write data to Kudu.
-
-The tested kudu version is 1.11.1.
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
 
 ## Key features
 
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
-
-## Options
-
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| kudu_master    | string | yes      | -             |
-| kudu_table     | string | yes      | -             |
-| save_mode      | string | yes      | -             |
-| common-options |        | no       | -             |
-
-### kudu_master [string]
-
-`kudu_master`  The address of kudu master,such as '192.168.88.110:7051'.
-
-### kudu_table [string]
-
-`kudu_table` The name of kudu table..
-
-### save_mode [string]
-
-Storage mode, we need support `overwrite` and `append`. `append` is now 
supported.
-
-### common options
-
-Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
-
-## Example
-
-```bash
-
- kudu {
-      kudu_master = "192.168.88.110:7051"
-      kudu_table = "studentlyhresultflink"
-      save_mode="append"
-   }
-
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Data Type Mapping
+
+| SeaTunnel Data type |      kudu Data type      |
+|---------------------|--------------------------|
+| BOOLEAN             | BOOL                     |
+| INT                 | INT8<br/>INT16<br/>INT32 |
+| BIGINT              | INT64                    |
+| DECIMAL             | DECIMAL                  |
+| FLOAT               | FLOAT                    |
+| DOUBLE              | DOUBLE                   |
+| STRING              | STRING                   |
+| TIMESTAMP           | UNIXTIME_MICROS          |
+| BYTES               | BINARY                   |
+
+## Sink Options
+
+|                   Name                    |  Type  | Required |              
      Default                     |                                             
                    Description                                                 
                |
+|-------------------------------------------|--------|----------|------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------|---|
+| kudu_masters                              | String | Yes      | -            
                                  | Kudu master address. Separated by ',',such 
as '192.168.88.110:7051'.                                                       
                 |
+| table_name                                | String | Yes      | -            
                                  | The name of kudu table.                     
                                                                                
                |   |
+| client_worker_count                       | Int    | No       | 2 * 
Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value 
is twice the current number of cpu cores.                                       
                           |
+| client_default_operation_timeout_ms       | Long   | No       | 30000        
                                  | Kudu normal operation time out.             
                                                                                
                |
+| client_default_admin_operation_timeout_ms | Long   | No       | 30000        
                                  | Kudu admin operation time out.              
                                                                                
                |
+| kerberos_principal                        | String | No       | -            
                                  | Kerberos principal.                         
                                                                                
                |
+| kerberos_keytab                           | String | No       | -            
                                  | Kerberos keytab.                            
                                                                                
                |
+| kerberos_krb5conf                         | String | No       | -            
                                  | Kerberos krb5 conf.                         
                                                                                
                |
+| save_mode                                 | String | No       | -            
                                  | Storage mode, support `overwrite` and 
`append`.                                                                       
                      |
+| session_flush_mode                        | String | No       | 
AUTO_FLUSH_SYNC                                | Kudu flush mode. Default 
AUTO_FLUSH_SYNC.                                                                
                                   |
+| session_mutation_buffer_space             | Int    | No       | 1024         
                                  | The max size of Kudu buffer which buffed 
data.                                                                           
                   |
+| batch_size                                | Int    | No       | 1024         
                                  | The flush max size (includes all append, 
upsert and delete records), over this number of records, will flush data. The 
default value is 100 |
+| buffer_flush_interval                     | Int    | No       | 10000        
                                  | The flush interval mills, over this time, 
asynchronous threads will flush data.                                           
                  |
+| ignore_not_found                          | Bool   | No       | false        
                                  | If true, ignore all not found rows.         
                                                                                
                |
+| ignore_not_duplicate                      | Bool   | No       | false        
                                  | If true, ignore all dulicate rows.          
                                                                                
                |
+| common-options                            |        | No       | -            
                                  | Source plugin common parameters, please 
refer to [Source Common Options](common-options.md) for details.                
                    |
+
+## Task Example
+
+### Simple:
+
+> The following example refers to a FakeSource named "kudu" cdc write kudu 
table "kudu_sink_table"
+
+```hocon
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+    source {
+      FakeSource {
+       result_table_name = "kudu"
+        schema = {
+          fields {
+                    id = int
+                    val_bool = boolean
+                    val_int8 = tinyint
+                    val_int16 = smallint
+                    val_int32 = int
+                    val_int64 = bigint
+                    val_float = float
+                    val_double = double
+                    val_decimal = "decimal(16, 1)"
+                    val_string = string
+                    val_unixtime_micros = timestamp
+          }
+        }
+        rows = [
+          {
+            kind = INSERT
+            fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+          },
+          {
+            kind = INSERT
+            fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+          },
+          {
+            kind = INSERT
+            fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+          },
+          {
+            kind = UPDATE_BEFORE
+            fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+          },
+          {
+            kind = UPDATE_AFTER
+           fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+          },
+          {
+            kind = DELETE
+            fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+          }
+        ]
+      }
+    }
+
+sink {
+   kudu{
+    source_table_name = "kudu"
+    kudu_masters = "kudu-master-cdc:7051"
+    table_name = "kudu_sink_table"
+ }
+}
 ```

Review Comment:
   Please add examples for kerberos enable.



##########
docs/en/connector-v2/source/Kudu.md:
##########
@@ -2,58 +2,95 @@
 
 > Kudu source connector
 
-## Description
+## Support Those Engines
 
-Used to read data from Kudu.
-
-The tested kudu version is 1.11.1.
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
 
 ## Key features
 
 - [x] [batch](../../concept/connector-v2-features.md)
-- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [column projection](../../concept/connector-v2-features.md)
-- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 
-## Options
-
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| kudu_master    | string | yes      | -             |
-| kudu_table     | string | yes      | -             |
-| columnsList    | string | yes      | -             |
-| common-options |        | no       | -             |
-
-### kudu_master [string]
-
-`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.
-
-### kudu_table [string]
-
-`kudu_table` The name of kudu table..
-
-### columnsList [string]
-
-`columnsList` Specifies the column names of the table.
+## Description
 
-### common options
+Used to read data from Kudu.
 
-Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
+The tested kudu version is 1.11.1.
 
-## Examples
+## Data Type Mapping
+
+|      kudu Data type      | SeaTunnel Data type |
+|--------------------------|---------------------|
+| BOOL                     | BOOLEAN             |
+| INT8<br/>INT16<br/>INT32 | INT                 |
+| INT64                    | BIGINT              |
+| DECIMAL                  | DECIMAL             |
+| FLOAT                    | FLOAT               |
+| DOUBLE                   | DOUBLE              |
+| STRING                   | STRING              |
+| UNIXTIME_MICROS          | TIMESTAMP           |
+| BINARY                   | BYTES               |
+
+## Source Options
+
+|                   Name                    |  Type  | Required |              
      Default                     |                                             
  Description                                                |
+|-------------------------------------------|--------|----------|------------------------------------------------|----------------------------------------------------------------------------------------------------------|---|
+| kudu_masters                              | String | Yes      | -            
                                  | Kudu master address. Separated by ',',such 
as '192.168.88.110:7051'.                                     |
+| table_name                                | String | Yes      | -            
                                  | The name of kudu table.                     
                                                             |   |
+| client_worker_count                       | Int    | No       | 2 * 
Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value 
is twice the current number of cpu cores.                               |
+| client_default_operation_timeout_ms       | Long   | No       | 30000        
                                  | Kudu normal operation time out.             
                                                             |
+| client_default_admin_operation_timeout_ms | Long   | No       | 30000        
                                  | Kudu admin operation time out.              
                                                             |
+| kerberos_principal                        | String | No       | -            
                                  | Kerberos principal.                         
                                                             |
+| kerberos_keytab                           | String | No       | -            
                                  | Kerberos keytab.                            
                                                             |
+| kerberos_krb5conf                         | String | No       | -            
                                  | Kerberos krb5 conf.                         
                                                             |
+| scan_token_query_timeout                  | Long   | No       | 30000        
                                  | The timeout for connecting scan token. If 
not set, it will be the same as operationTimeout.              |
+| scan_token_batch_size_bytes               | Int    | No       | 1024 * 1024  
                                  | Kudu scan bytes. The maximum number of 
bytes read at a time, the default is 1MB.                         |
+| filter                                    | Int    | No       | 1024 * 1024  
                                  | Kudu scan filter expressions,Not supported 
yet.                                                          |
+| schema                                    | Map    | No       | 1024 * 1024  
                                  | SeaTunnel Schema.                           
                                                             |
+| common-options                            |        | No       | -            
                                  | Source plugin common parameters, please 
refer to [Source Common Options](common-options.md) for details. |
+
+## Task Example
+
+### Simple:
+
+> The following example is for a Kudu table named "kudu_source_table", The 
goal is to print the data from this table on the console and write kudu table 
"kudu_sink_table"
 
 ```hocon
+# Defining the runtime environment
+env {
+  # You can set flink configuration here
+  execution.parallelism = 2
+  job.mode = "BATCH"
+}
+
 source {
-   Kudu {
-      result_table_name = "studentlyh2"
-      kudu_master = "192.168.88.110:7051"
-      kudu_table = "studentlyh2"
-      columnsList = "id,name,age,sex"
-    }
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  kudu{
+   kudu_masters = "kudu-master:7051"
+   table_name = "kudu_source_table"
+   result_table_name = "kudu"
+}
+}
 
+transform {
 }
+
+sink {
+  console {
+    source_table_name = "kudu"
+  }
+
+   kudu{
+    source_table_name = "kudu"
+    kudu_masters = "kudu-master:7051"
+    table_name = "kudu_sink_table"
+ }
 ```

Review Comment:
   Please add kerberos enable examples.



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java:
##########
@@ -41,29 +39,28 @@
 import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
+import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.RowResult;
-import org.apache.kudu.client.RowResultIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.auto.service.AutoService;
-import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
 import java.util.List;
 
-@Slf4j

Review Comment:
   Why replace `@Slf4j` to `private static final Logger log = 
LoggerFactory.getLogger(SeaTunnelSource.class);` ?



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java:
##########
@@ -18,109 +18,159 @@
 package org.apache.seatunnel.connectors.seatunnel.kudu.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
 
 public class KuduSourceSplitEnumerator
         implements SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> {
 
+    private static final Logger log = 
LoggerFactory.getLogger(KuduSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<KuduSourceSplit> 
enumeratorContext;
-    private PartitionParameter partitionParameter;
-    List<KuduSourceSplit> allSplit = new ArrayList<>();
-    private Long maxVal;
-    private Long minVal;
-    private Long batchSize;
-    private Integer batchNum;
+    private KuduSourceState checkpointState;
+    private KuduSourceConfig kuduSourceConfig;
+    private final Map<Integer, List<KuduSourceSplit>> pendingSplits;
+
+    private final KuduInputFormat kuduInputFormat;
+
+    private final Object stateLock = new Object();
+    private volatile boolean shouldEnumerate;
+
+    public KuduSourceSplitEnumerator(
+            Context<KuduSourceSplit> enumeratorContext,
+            KuduSourceConfig kuduSourceConfig,
+            KuduInputFormat kuduInputFormat) {
+        this(enumeratorContext, kuduSourceConfig, kuduInputFormat, null);
+    }
 
     public KuduSourceSplitEnumerator(
             SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext,
-            PartitionParameter partitionParameter) {
+            KuduSourceConfig kuduSourceConfig,
+            KuduInputFormat kuduInputFormat,
+            KuduSourceState checkpointState) {
         this.enumeratorContext = enumeratorContext;
-        this.partitionParameter = partitionParameter;
+        this.kuduSourceConfig = kuduSourceConfig;
+        this.pendingSplits = new HashMap<>();
+        this.kuduInputFormat = kuduInputFormat;
+        this.shouldEnumerate = checkpointState == null;
+        this.checkpointState = checkpointState;
     }
 
     @Override
-    public void open() {}
+    public void open() {
+        kuduInputFormat.openInputFormat();
+    }
 
     @Override
-    public void run() {}
+    public void run() throws IOException {
+        Set<Integer> readers = enumeratorContext.registeredReaders();
+        if (shouldEnumerate) {
+            Set<KuduSourceSplit> newSplits = discoverySplits();
+
+            synchronized (stateLock) {
+                addPendingSplit(newSplits);
+                shouldEnumerate = false;
+            }
 
-    @Override
-    public void close() throws IOException {}
+            assignSplit(readers);
+        }
 
-    @Override
-    public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {}
+        log.debug(
+                "No more splits to assign." + " Sending NoMoreSplitsEvent to 
reader {}.", readers);
+        readers.forEach(enumeratorContext::signalNoMoreSplits);
+    }
 
-    @Override
-    public int currentUnassignedSplitSize() {
-        return 0;
+    private Set<KuduSourceSplit> discoverySplits() throws IOException {
+        return kuduInputFormat.createInputSplits();
     }
 
     @Override
-    public void handleSplitRequest(int subtaskId) {}
+    public void close() throws IOException {
+        kuduInputFormat.closeInputFormat();
+    }
 
     @Override
-    public void registerReader(int subtaskId) {
-        int parallelism = enumeratorContext.currentParallelism();
-        if (allSplit.isEmpty()) {
-            if (null != partitionParameter) {
-                Serializable[][] parameterValues =
-                        getParameterValues(
-                                partitionParameter.minValue,
-                                partitionParameter.maxValue,
-                                parallelism);
-                for (int i = 0; i < parameterValues.length; i++) {
-                    allSplit.add(new KuduSourceSplit(parameterValues[i], i));
+    public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
+        log.debug("Add back splits {} to KuduSourceSplitEnumerator.", splits);
+        if (!splits.isEmpty()) {
+            addPendingSplit(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void assignSplit(Collection<Integer> readers) {
+        log.debug("Assign pendingSplits to readers {}", readers);
+
+        for (int reader : readers) {
+            List<KuduSourceSplit> assignmentForReader = 
pendingSplits.remove(reader);
+            if (assignmentForReader != null && !assignmentForReader.isEmpty()) 
{
+                log.info("Assign splits {} to reader {}", assignmentForReader, 
reader);
+                try {
+                    enumeratorContext.assignSplit(reader, assignmentForReader);
+                } catch (Exception e) {
+                    log.error(
+                            "Failed to assign splits {} to reader {}",
+                            assignmentForReader,
+                            reader,
+                            e);
+                    pendingSplits.put(reader, assignmentForReader);
                 }
-            } else {
-                allSplit.add(new KuduSourceSplit(null, 0));
             }
         }
-        // Filter the split that the current task needs to run
-        List<KuduSourceSplit> splits =
-                allSplit.stream()
-                        .filter(p -> p.splitId % parallelism == subtaskId)
-                        .collect(Collectors.toList());
-        enumeratorContext.assignSplit(subtaskId, splits);
-        enumeratorContext.signalNoMoreSplits(subtaskId);
     }
 
-    private Serializable[][] getParameterValues(Long minVal, Long maxVal, int 
parallelism) {
-        this.maxVal = maxVal;
-        this.minVal = minVal;
-        long maxElemCount = (maxVal - minVal) + 1;
-        batchNum = parallelism;
-        getBatchSizeAndBatchNum(parallelism);
-        long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
-
-        Serializable[][] parameters = new Serializable[batchNum][2];
-        long start = minVal;
-        for (int i = 0; i < batchNum; i++) {
-            long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
-            parameters[i] = new Long[] {start, end};
-            start = end + 1;
+    private void addPendingSplit(Collection<KuduSourceSplit> splits) {
+        int readerCount = enumeratorContext.currentParallelism();

Review Comment:
   Need `synchronized (stateLock)` too.



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.util;
+
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduTable;
+
+import lombok.extern.slf4j.Slf4j;
+import sun.security.krb5.Config;
+import sun.security.krb5.KrbException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.List;
+
+@Slf4j
+public class KuduUtil {
+
+    private static final String ERROR_MESSAGE =
+            "principal and keytab can not be null current principal %s keytab 
%s";
+
+    public static final String KRB5_CONF_KEY = "java.security.krb5.conf";
+
+    public static final String HADOOP_AUTH_KEY = 
"hadoop.security.authentication";
+
+    public static final String KRB = "kerberos";
+
+    public static KuduClient getKuduClient(CommonConfig config) {
+        try {
+            if (config.getKeytab() != null && config.getPrincipal() != null) {
+
+                UserGroupInformation ugi = loginAndReturnUgi(config);
+                return ugi.doAs(
+                        (PrivilegedExceptionAction<KuduClient>)
+                                () -> getKuduClientInternal(config));
+            }
+
+            return getKuduClientInternal(config);
+
+        } catch (IOException | InterruptedException e) {
+            throw new 
KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
+        }
+    }
+
+    private static UserGroupInformation loginAndReturnUgi(CommonConfig config) 
throws IOException {
+        if (StringUtils.isBlank(config.getPrincipal()) || 
StringUtils.isBlank(config.getKeytab())) {
+            throw new KuduConnectorException(
+                    CommonErrorCode.ILLEGAL_ARGUMENT,
+                    String.format(ERROR_MESSAGE, config.getPrincipal(), 
config.getKeytab()));
+        }
+        if (StringUtils.isNotBlank(config.getKrb5conf())) {
+            reloadKrb5conf(config.getKrb5conf());
+        }
+        Configuration conf = new Configuration();
+        conf.set(HADOOP_AUTH_KEY, KRB);
+        UserGroupInformation.setConfiguration(conf);

Review Comment:
   `UserGroupInformation ` is s static Class, If there are more than one place 
use `UserGroupInformation.setConfiguration(conf);` in one JVM, this will 
generate mutual coverage. So new a UserGroupInformation Object is more 
suggested approach.



##########
docs/en/connector-v2/source/Kudu.md:
##########
@@ -2,58 +2,95 @@
 
 > Kudu source connector
 
-## Description
+## Support Those Engines
 
-Used to read data from Kudu.
-
-The tested kudu version is 1.11.1.
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
 
 ## Key features
 
 - [x] [batch](../../concept/connector-v2-features.md)
-- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [column projection](../../concept/connector-v2-features.md)
-- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 
-## Options
-
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| kudu_master    | string | yes      | -             |
-| kudu_table     | string | yes      | -             |
-| columnsList    | string | yes      | -             |
-| common-options |        | no       | -             |
-
-### kudu_master [string]
-
-`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.
-
-### kudu_table [string]
-
-`kudu_table` The name of kudu table..
-
-### columnsList [string]
-
-`columnsList` Specifies the column names of the table.
+## Description
 
-### common options
+Used to read data from Kudu.
 
-Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
+The tested kudu version is 1.11.1.
 
-## Examples
+## Data Type Mapping
+
+|      kudu Data type      | SeaTunnel Data type |
+|--------------------------|---------------------|
+| BOOL                     | BOOLEAN             |
+| INT8<br/>INT16<br/>INT32 | INT                 |
+| INT64                    | BIGINT              |
+| DECIMAL                  | DECIMAL             |
+| FLOAT                    | FLOAT               |
+| DOUBLE                   | DOUBLE              |
+| STRING                   | STRING              |
+| UNIXTIME_MICROS          | TIMESTAMP           |
+| BINARY                   | BYTES               |
+
+## Source Options

Review Comment:
   Format have some error.
   
   ![Uploading image.png…]()
   



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/CommonConfig.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.kudu.client.AsyncKuduClient;
+
+import lombok.Getter;
+import lombok.ToString;
+
+import java.io.Serializable;
+
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+@Getter
+@ToString
+public class CommonConfig implements Serializable {
+
+    public static final Option<String> MASTER =
+            Options.key("kudu_masters")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kudu master address. Separated by ','");
+
+    public static final Option<String> TABLE_NAME =
+            Options.key("table_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kudu table name");
+
+    public static final Option<Integer> WORKER_COUNT =
+            Options.key("client_worker_count")
+                    .intType()
+                    .defaultValue(2 * 
Runtime.getRuntime().availableProcessors())
+                    .withDescription(
+                            "Kudu worker count. Default value is twice the 
current number of cpu cores");
+
+    public static final Option<Long> OPERATION_TIMEOUT =
+            Options.key("client_default_operation_timeout_ms")
+                    .longType()
+                    .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
+                    .withDescription("Kudu normal operation time out");
+
+    public static final Option<Long> ADMIN_OPERATION_TIMEOUT =
+            Options.key("client_default_admin_operation_timeout_ms")
+                    .longType()
+                    .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
+                    .withDescription("Kudu admin operation time out");
+
+    public static final Option<String> KERBEROS_PRINCIPAL =
+            Options.key("kerberos_principal")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kerberos principal");
+
+    public static final Option<String> KERBEROS_KEYTAB =
+            Options.key("kerberos_keytab")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kerberos keytab");

Review Comment:
   ```suggestion
                       .withDescription("Kerberos keytab. Note that all zeta 
nodes require have this file");
   ```



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/CommonConfig.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.kudu.client.AsyncKuduClient;
+
+import lombok.Getter;
+import lombok.ToString;
+
+import java.io.Serializable;
+
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+@Getter
+@ToString
+public class CommonConfig implements Serializable {
+
+    public static final Option<String> MASTER =
+            Options.key("kudu_masters")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kudu master address. Separated by ','");
+
+    public static final Option<String> TABLE_NAME =
+            Options.key("table_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kudu table name");
+
+    public static final Option<Integer> WORKER_COUNT =
+            Options.key("client_worker_count")
+                    .intType()
+                    .defaultValue(2 * 
Runtime.getRuntime().availableProcessors())
+                    .withDescription(
+                            "Kudu worker count. Default value is twice the 
current number of cpu cores");
+
+    public static final Option<Long> OPERATION_TIMEOUT =
+            Options.key("client_default_operation_timeout_ms")
+                    .longType()
+                    .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
+                    .withDescription("Kudu normal operation time out");
+
+    public static final Option<Long> ADMIN_OPERATION_TIMEOUT =
+            Options.key("client_default_admin_operation_timeout_ms")
+                    .longType()
+                    .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
+                    .withDescription("Kudu admin operation time out");
+
+    public static final Option<String> KERBEROS_PRINCIPAL =
+            Options.key("kerberos_principal")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kerberos principal");

Review Comment:
   ```suggestion
                       .withDescription("Kerberos principal. Note that all zeta 
nodes require have this file.");
   ```



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/CommonConfig.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.kudu.client.AsyncKuduClient;
+
+import lombok.Getter;
+import lombok.ToString;
+
+import java.io.Serializable;
+
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+@Getter
+@ToString
+public class CommonConfig implements Serializable {
+
+    public static final Option<String> MASTER =
+            Options.key("kudu_masters")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kudu master address. Separated by ','");
+
+    public static final Option<String> TABLE_NAME =
+            Options.key("table_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kudu table name");
+
+    public static final Option<Integer> WORKER_COUNT =
+            Options.key("client_worker_count")
+                    .intType()
+                    .defaultValue(2 * 
Runtime.getRuntime().availableProcessors())
+                    .withDescription(
+                            "Kudu worker count. Default value is twice the 
current number of cpu cores");
+
+    public static final Option<Long> OPERATION_TIMEOUT =
+            Options.key("client_default_operation_timeout_ms")
+                    .longType()
+                    .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
+                    .withDescription("Kudu normal operation time out");
+
+    public static final Option<Long> ADMIN_OPERATION_TIMEOUT =
+            Options.key("client_default_admin_operation_timeout_ms")
+                    .longType()
+                    .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
+                    .withDescription("Kudu admin operation time out");
+
+    public static final Option<String> KERBEROS_PRINCIPAL =
+            Options.key("kerberos_principal")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kerberos principal");
+
+    public static final Option<String> KERBEROS_KEYTAB =
+            Options.key("kerberos_keytab")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kerberos keytab");
+
+    public static final Option<String> KERBEROS_KRB5_CONF =
+            Options.key("kerberos_krb5conf")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kerberos krb5 conf");
+
+    protected String masters;
+
+    protected String table;
+
+    protected Integer workerCount;
+
+    protected Long operationTimeout;
+
+    protected Long adminOperationTimeout;
+    protected String principal;
+    protected String keytab;
+    protected String krb5conf;
+
+    public CommonConfig(ReadonlyConfig config) {
+        this.masters = checkArgumentNotNull(config.get(MASTER));

Review Comment:
   You can use `ConfigValidator.of(config).validate(optionrule)` replace 
`checkArgumentNotNull`



##########
docs/en/connector-v2/source/Kudu.md:
##########
@@ -2,58 +2,95 @@
 
 > Kudu source connector
 
-## Description
+## Support Those Engines
 
-Used to read data from Kudu.
-
-The tested kudu version is 1.11.1.
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
 
 ## Key features
 
 - [x] [batch](../../concept/connector-v2-features.md)
-- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [column projection](../../concept/connector-v2-features.md)
-- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 
-## Options
-
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| kudu_master    | string | yes      | -             |
-| kudu_table     | string | yes      | -             |
-| columnsList    | string | yes      | -             |
-| common-options |        | no       | -             |
-
-### kudu_master [string]
-
-`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.
-
-### kudu_table [string]
-
-`kudu_table` The name of kudu table..
-
-### columnsList [string]
-
-`columnsList` Specifies the column names of the table.
+## Description
 
-### common options
+Used to read data from Kudu.
 
-Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
+The tested kudu version is 1.11.1.
 
-## Examples
+## Data Type Mapping
+
+|      kudu Data type      | SeaTunnel Data type |
+|--------------------------|---------------------|
+| BOOL                     | BOOLEAN             |
+| INT8<br/>INT16<br/>INT32 | INT                 |
+| INT64                    | BIGINT              |
+| DECIMAL                  | DECIMAL             |
+| FLOAT                    | FLOAT               |
+| DOUBLE                   | DOUBLE              |
+| STRING                   | STRING              |
+| UNIXTIME_MICROS          | TIMESTAMP           |
+| BINARY                   | BYTES               |
+
+## Source Options

Review Comment:
   Format have some error.
   
   ![Uploading image.png…]()
   



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/CommonConfig.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.kudu.client.AsyncKuduClient;
+
+import lombok.Getter;
+import lombok.ToString;
+
+import java.io.Serializable;
+
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+@Getter
+@ToString
+public class CommonConfig implements Serializable {
+
+    public static final Option<String> MASTER =
+            Options.key("kudu_masters")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kudu master address. Separated by ','");
+
+    public static final Option<String> TABLE_NAME =
+            Options.key("table_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kudu table name");
+
+    public static final Option<Integer> WORKER_COUNT =
+            Options.key("client_worker_count")
+                    .intType()
+                    .defaultValue(2 * 
Runtime.getRuntime().availableProcessors())
+                    .withDescription(
+                            "Kudu worker count. Default value is twice the 
current number of cpu cores");
+
+    public static final Option<Long> OPERATION_TIMEOUT =
+            Options.key("client_default_operation_timeout_ms")
+                    .longType()
+                    .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
+                    .withDescription("Kudu normal operation time out");
+
+    public static final Option<Long> ADMIN_OPERATION_TIMEOUT =
+            Options.key("client_default_admin_operation_timeout_ms")
+                    .longType()
+                    .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
+                    .withDescription("Kudu admin operation time out");
+
+    public static final Option<String> KERBEROS_PRINCIPAL =
+            Options.key("kerberos_principal")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kerberos principal");
+
+    public static final Option<String> KERBEROS_KEYTAB =
+            Options.key("kerberos_keytab")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kerberos keytab");
+
+    public static final Option<String> KERBEROS_KRB5_CONF =
+            Options.key("kerberos_krb5conf")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kerberos krb5 conf");

Review Comment:
   ```suggestion
                       .withDescription("Kerberos krb5 conf. Note that all zeta 
nodes require have this file");
   ```



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java:
##########
@@ -34,10 +34,19 @@ public String factoryIdentifier() {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(
-                        KuduSinkConfig.KUDU_MASTER,
-                        KuduSinkConfig.KUDU_SAVE_MODE,
-                        KuduSinkConfig.KUDU_TABLE_NAME)
+                .required(KuduSinkConfig.MASTER, KuduSinkConfig.TABLE_NAME)
+                .optional(KuduSinkConfig.WORKER_COUNT)
+                .optional(KuduSinkConfig.OPERATION_TIMEOUT)
+                .optional(KuduSinkConfig.ADMIN_OPERATION_TIMEOUT)
+                .optional(KuduSinkConfig.SAVE_MODE)
+                .optional(KuduSinkConfig.FLUSH_MODE)
+                .optional(KuduSinkConfig.MUTATION_BUFFER_SPACE)
+                .optional(KuduSinkConfig.BATCH_SIZE)
+                .optional(KuduSinkConfig.BUFFER_FLUSH_INTERVAL)

Review Comment:
   We can use `BUFFER_FLUSH_INTERVAL ` only when `FLUSH_MODE ` is not equals 
`AUTO_FLUSH_SYNC `.
   So you need use `conditional` to define it.
   You can reference 
https://github.com/apache/seatunnel/blob/dev/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
   
   And please check other option too.



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java:
##########
@@ -17,41 +17,46 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduOutputFormat;
+import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.Optional;
 
 @Slf4j
-public class KuduSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class KuduSinkWriter implements SinkWriter<SeaTunnelRow, 
KuduCommitInfo, KuduSinkState> {
 
     private SeaTunnelRowType seaTunnelRowType;
-    private Config pluginConfig;
     private KuduOutputFormat fileWriter;
-    private KuduSinkConfig kuduSinkConfig;
 
     public KuduSinkWriter(
-            @NonNull SeaTunnelRowType seaTunnelRowType, @NonNull Config 
pluginConfig) {
+            @NonNull SeaTunnelRowType seaTunnelRowType, KuduSinkConfig 
kuduSinkConfig) {

Review Comment:
   Why remove `@NonNull` before `KuduSinkConfig kuduSinkConfig` ? And I found 
`kuduSinkConfig` can not be null in `KuduOutputFormat `:
   
   ```
   public KuduOutputFormat(KuduSinkConfig kuduSinkConfig, SeaTunnelRowType 
seaTunnelRowType) {
           this.kuduTableName = kuduSinkConfig.getTable();
           ...
   }
   ```



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java:
##########
@@ -17,181 +17,135 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
 
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.serialize.KuduRowSerializer;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;
 
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.SessionConfiguration;
-import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.OperationResponse;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.IOException;
 import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 /** A Kudu outputFormat */
 @Slf4j
 public class KuduOutputFormat implements Serializable {
 
-    public static final long TIMEOUTMS = 18000;
-    public static final long SESSIONTIMEOUTMS = 100000;
-
-    private final String kuduMaster;
     private final String kuduTableName;
     private final KuduSinkConfig.SaveMode saveMode;
+    private final KuduSinkConfig kuduSinkConfig;
     private KuduClient kuduClient;
     private KuduSession kuduSession;
     private KuduTable kuduTable;
 
-    public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) {
-        this.kuduMaster = kuduSinkConfig.getKuduMaster();
-        this.kuduTableName = kuduSinkConfig.getKuduTableName();
-        this.saveMode = kuduSinkConfig.getSaveMode();
-        init();
-    }
+    private SeaTunnelRowSerializer seaTunnelRowSerializer;
 
-    private void transform(PartialRow row, SeaTunnelRow element, Schema 
schema) {
-        int columnCount = schema.getColumnCount();
-        for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
-            ColumnSchema col = schema.getColumnByIndex(columnIndex);
-            try {
-                switch (col.getType()) {
-                    case BOOL:
-                        row.addBoolean(columnIndex, (Boolean) 
element.getField(columnIndex));
-                        break;
-                    case INT8:
-                        row.addByte(columnIndex, (Byte) 
element.getField(columnIndex));
-                        break;
-                    case INT16:
-                        row.addShort(columnIndex, (Short) 
element.getField(columnIndex));
-                        break;
-                    case INT32:
-                        row.addInt(columnIndex, (Integer) 
element.getField(columnIndex));
-                        break;
-                    case INT64:
-                        row.addLong(columnIndex, (Long) 
element.getField(columnIndex));
-                        break;
-                    case UNIXTIME_MICROS:
-                        if (element.getField(columnIndex) instanceof 
Timestamp) {
-                            row.addTimestamp(
-                                    columnIndex, (Timestamp) 
element.getField(columnIndex));
-                        } else {
-                            row.addLong(columnIndex, (Long) 
element.getField(columnIndex));
-                        }
-                        break;
-                    case FLOAT:
-                        row.addFloat(columnIndex, (Float) 
element.getField(columnIndex));
-                        break;
-                    case DOUBLE:
-                        row.addDouble(columnIndex, (Double) 
element.getField(columnIndex));
-                        break;
-                    case STRING:
-                        row.addString(columnIndex, 
element.getField(columnIndex).toString());
-                        break;
-                    case BINARY:
-                        if (element.getField(columnIndex) instanceof byte[]) {
-                            row.addBinary(columnIndex, (byte[]) 
element.getField(columnIndex));
-                        } else {
-                            row.addBinary(columnIndex, (ByteBuffer) 
element.getField(columnIndex));
-                        }
-                        break;
-                    case DECIMAL:
-                        row.addDecimal(columnIndex, (BigDecimal) 
element.getField(columnIndex));
-                        break;
-                    default:
-                        throw new KuduConnectorException(
-                                CommonErrorCode.UNSUPPORTED_DATA_TYPE,
-                                "Unsupported column type: " + col.getType());
-                }
-            } catch (ClassCastException e) {
-                throw new KuduConnectorException(
-                        KuduConnectorErrorCode.DATA_TYPE_CAST_FILED,
-                        "Value type does not match column type "
-                                + col.getType()
-                                + " for column "
-                                + col.getName());
-            }
-        }
+    private SeaTunnelRowType seaTunnelRowType;
+
+    private transient AtomicInteger numPendingRequests;
+
+    public KuduOutputFormat(KuduSinkConfig kuduSinkConfig, SeaTunnelRowType 
seaTunnelRowType) {
+        this.kuduTableName = kuduSinkConfig.getTable();

Review Comment:
   `kuduSinkConfig ` need not null.



##########
docs/en/connector-v2/source/Kudu.md:
##########
@@ -2,58 +2,95 @@
 
 > Kudu source connector
 
-## Description
+## Support Those Engines
 
-Used to read data from Kudu.
-
-The tested kudu version is 1.11.1.
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
 
 ## Key features
 
 - [x] [batch](../../concept/connector-v2-features.md)
-- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)

Review Comment:
   From the code it seems not supported `stream`



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java:
##########
@@ -111,89 +98,33 @@ public String getPluginName() {
     }
 
     @Override
-    public void prepare(Config config) {
-        String kudumaster = "";
-        String tableName = "";
-        String columnslist = "";
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(
-                        config,
-                        KuduSourceConfig.KUDU_MASTER.key(),
-                        KuduSourceConfig.TABLE_NAME.key(),
-                        KuduSourceConfig.COLUMNS_LIST.key());
-        if (checkResult.isSuccess()) {
-            kudumaster = config.getString(KuduSourceConfig.KUDU_MASTER.key());
-            tableName = config.getString(KuduSourceConfig.TABLE_NAME.key());
-            columnslist = 
config.getString(KuduSourceConfig.COLUMNS_LIST.key());
-            kuduInputFormat = new KuduInputFormat(kudumaster, tableName, 
columnslist);
-        } else {
+    public void prepare(Config pluginConfig) {
+        ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
+        try {
+            kuduSourceConfig = new KuduSourceConfig(config);
+        } catch (Exception e) {
             throw new KuduConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format(
                             "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
checkResult.getMsg()));
-        }
-        try {
-            KuduClient.KuduClientBuilder kuduClientBuilder =
-                    new KuduClient.KuduClientBuilder(kudumaster);
-            kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
-
-            KuduClient kuduClient = kuduClientBuilder.build();
-            partitionParameter = initPartitionParameter(kuduClient, tableName);
-            SeaTunnelRowType seaTunnelRowType =
-                    
getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
-            rowTypeInfo = seaTunnelRowType;
-        } catch (KuduException e) {
-            throw new KuduConnectorException(
-                    KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED, e);
+                            getPluginName(), PluginType.SINK, e.getMessage()));

Review Comment:
   Replace `e.getMessage()` to `ExceptionUtils.getMessage(e)` please.



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java:
##########
@@ -38,7 +40,18 @@ public String factoryIdentifier() {
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(KUDU_MASTER, TABLE_NAME, 
COLUMNS_LIST).build();
+        return OptionRule.builder()
+                .required(MASTER, TABLE_NAME)
+                .optional(CatalogTableUtil.SCHEMA)
+                .optional(KuduSourceConfig.WORKER_COUNT)
+                .optional(KuduSourceConfig.OPERATION_TIMEOUT)
+                .optional(KuduSourceConfig.ADMIN_OPERATION_TIMEOUT)
+                .optional(KuduSourceConfig.QUERY_TIMEOUT)
+                .optional(KuduSourceConfig.SCAN_BATCH_SIZE_BYTES)
+                .optional(KuduSourceConfig.FILTER)
+                .optional(KuduSinkConfig.KERBEROS_KRB5_CONF)

Review Comment:
   Add an option `ENABLE_KERBEROS` and add 
   ```
   .conditional(
                           ENABLE_KERBEROS,
                           true,
                           KERBEROS_KRB5_CONF,
                           KERBEROS_PRINCIPAL,
                           KERBEROS_KEYTAB)
   ```



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java:
##########
@@ -111,89 +98,33 @@ public String getPluginName() {
     }
 
     @Override
-    public void prepare(Config config) {
-        String kudumaster = "";
-        String tableName = "";
-        String columnslist = "";
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(
-                        config,
-                        KuduSourceConfig.KUDU_MASTER.key(),
-                        KuduSourceConfig.TABLE_NAME.key(),
-                        KuduSourceConfig.COLUMNS_LIST.key());
-        if (checkResult.isSuccess()) {
-            kudumaster = config.getString(KuduSourceConfig.KUDU_MASTER.key());
-            tableName = config.getString(KuduSourceConfig.TABLE_NAME.key());
-            columnslist = 
config.getString(KuduSourceConfig.COLUMNS_LIST.key());
-            kuduInputFormat = new KuduInputFormat(kudumaster, tableName, 
columnslist);
-        } else {
+    public void prepare(Config pluginConfig) {
+        ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
+        try {
+            kuduSourceConfig = new KuduSourceConfig(config);
+        } catch (Exception e) {
             throw new KuduConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format(
                             "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
checkResult.getMsg()));
-        }
-        try {
-            KuduClient.KuduClientBuilder kuduClientBuilder =
-                    new KuduClient.KuduClientBuilder(kudumaster);
-            kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
-
-            KuduClient kuduClient = kuduClientBuilder.build();
-            partitionParameter = initPartitionParameter(kuduClient, tableName);
-            SeaTunnelRowType seaTunnelRowType =
-                    
getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
-            rowTypeInfo = seaTunnelRowType;
-        } catch (KuduException e) {
-            throw new KuduConnectorException(
-                    KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED, e);
+                            getPluginName(), PluginType.SINK, e.getMessage()));
         }
-    }
 
-    private PartitionParameter initPartitionParameter(KuduClient kuduClient, 
String tableName) {
-        String keyColumn = null;
-        int maxKey = 0;
-        int minKey = 0;
-        boolean flag = true;
-        try {
-            KuduScanner.KuduScannerBuilder kuduScannerBuilder =
-                    
kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
-            ArrayList<String> columnsList = new ArrayList<String>();
-            keyColumn =
-                    kuduClient
-                            .openTable(tableName)
-                            .getSchema()
-                            .getPrimaryKeyColumns()
-                            .get(0)
-                            .getName();
-            columnsList.add("" + keyColumn);
-            kuduScannerBuilder.setProjectedColumnNames(columnsList);
-            KuduScanner kuduScanner = kuduScannerBuilder.build();
-            while (kuduScanner.hasMoreRows()) {
-                RowResultIterator rowResults = kuduScanner.nextRows();
-                while (rowResults.hasNext()) {
-                    RowResult row = rowResults.next();
-                    int id = row.getInt("" + keyColumn);
-                    if (flag) {
-                        maxKey = id;
-                        minKey = id;
-                        flag = false;
-                    } else {
-                        if (id >= maxKey) {
-                            maxKey = id;
-                        }
-                        if (id <= minKey) {
-                            minKey = id;
-                        }
-                    }
-                }
+        if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
+            rowTypeInfo = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+        } else {
+            try (KuduClient kuduClient = 
KuduUtil.getKuduClient(kuduSourceConfig)) {

Review Comment:
   Can you implement kudu catalog?



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java:
##########
@@ -111,89 +98,33 @@ public String getPluginName() {
     }
 
     @Override
-    public void prepare(Config config) {
-        String kudumaster = "";
-        String tableName = "";
-        String columnslist = "";
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(
-                        config,
-                        KuduSourceConfig.KUDU_MASTER.key(),
-                        KuduSourceConfig.TABLE_NAME.key(),
-                        KuduSourceConfig.COLUMNS_LIST.key());
-        if (checkResult.isSuccess()) {
-            kudumaster = config.getString(KuduSourceConfig.KUDU_MASTER.key());
-            tableName = config.getString(KuduSourceConfig.TABLE_NAME.key());
-            columnslist = 
config.getString(KuduSourceConfig.COLUMNS_LIST.key());
-            kuduInputFormat = new KuduInputFormat(kudumaster, tableName, 
columnslist);
-        } else {
+    public void prepare(Config pluginConfig) {
+        ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
+        try {
+            kuduSourceConfig = new KuduSourceConfig(config);
+        } catch (Exception e) {
             throw new KuduConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format(
                             "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
checkResult.getMsg()));
-        }
-        try {
-            KuduClient.KuduClientBuilder kuduClientBuilder =
-                    new KuduClient.KuduClientBuilder(kudumaster);
-            kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
-
-            KuduClient kuduClient = kuduClientBuilder.build();
-            partitionParameter = initPartitionParameter(kuduClient, tableName);
-            SeaTunnelRowType seaTunnelRowType =
-                    
getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
-            rowTypeInfo = seaTunnelRowType;
-        } catch (KuduException e) {
-            throw new KuduConnectorException(
-                    KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED, e);
+                            getPluginName(), PluginType.SINK, e.getMessage()));
         }
-    }
 
-    private PartitionParameter initPartitionParameter(KuduClient kuduClient, 
String tableName) {
-        String keyColumn = null;
-        int maxKey = 0;
-        int minKey = 0;
-        boolean flag = true;
-        try {
-            KuduScanner.KuduScannerBuilder kuduScannerBuilder =
-                    
kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
-            ArrayList<String> columnsList = new ArrayList<String>();
-            keyColumn =
-                    kuduClient
-                            .openTable(tableName)
-                            .getSchema()
-                            .getPrimaryKeyColumns()
-                            .get(0)
-                            .getName();
-            columnsList.add("" + keyColumn);
-            kuduScannerBuilder.setProjectedColumnNames(columnsList);
-            KuduScanner kuduScanner = kuduScannerBuilder.build();
-            while (kuduScanner.hasMoreRows()) {
-                RowResultIterator rowResults = kuduScanner.nextRows();
-                while (rowResults.hasNext()) {
-                    RowResult row = rowResults.next();
-                    int id = row.getInt("" + keyColumn);
-                    if (flag) {
-                        maxKey = id;
-                        minKey = id;
-                        flag = false;
-                    } else {
-                        if (id >= maxKey) {
-                            maxKey = id;
-                        }
-                        if (id <= minKey) {
-                            minKey = id;
-                        }
-                    }
-                }
+        if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
+            rowTypeInfo = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();

Review Comment:
   Kudu has its own table structure, why use a `schema` to construct the table 
structure?



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java:
##########
@@ -18,109 +18,159 @@
 package org.apache.seatunnel.connectors.seatunnel.kudu.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
 
 public class KuduSourceSplitEnumerator
         implements SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> {
 
+    private static final Logger log = 
LoggerFactory.getLogger(KuduSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<KuduSourceSplit> 
enumeratorContext;
-    private PartitionParameter partitionParameter;
-    List<KuduSourceSplit> allSplit = new ArrayList<>();
-    private Long maxVal;
-    private Long minVal;
-    private Long batchSize;
-    private Integer batchNum;
+    private KuduSourceState checkpointState;
+    private KuduSourceConfig kuduSourceConfig;
+    private final Map<Integer, List<KuduSourceSplit>> pendingSplits;
+
+    private final KuduInputFormat kuduInputFormat;
+
+    private final Object stateLock = new Object();
+    private volatile boolean shouldEnumerate;
+
+    public KuduSourceSplitEnumerator(
+            Context<KuduSourceSplit> enumeratorContext,
+            KuduSourceConfig kuduSourceConfig,
+            KuduInputFormat kuduInputFormat) {
+        this(enumeratorContext, kuduSourceConfig, kuduInputFormat, null);
+    }
 
     public KuduSourceSplitEnumerator(
             SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext,
-            PartitionParameter partitionParameter) {
+            KuduSourceConfig kuduSourceConfig,
+            KuduInputFormat kuduInputFormat,
+            KuduSourceState checkpointState) {
         this.enumeratorContext = enumeratorContext;
-        this.partitionParameter = partitionParameter;
+        this.kuduSourceConfig = kuduSourceConfig;
+        this.pendingSplits = new HashMap<>();
+        this.kuduInputFormat = kuduInputFormat;
+        this.shouldEnumerate = checkpointState == null;
+        this.checkpointState = checkpointState;
     }
 
     @Override
-    public void open() {}
+    public void open() {
+        kuduInputFormat.openInputFormat();
+    }
 
     @Override
-    public void run() {}
+    public void run() throws IOException {
+        Set<Integer> readers = enumeratorContext.registeredReaders();
+        if (shouldEnumerate) {
+            Set<KuduSourceSplit> newSplits = discoverySplits();
+
+            synchronized (stateLock) {
+                addPendingSplit(newSplits);
+                shouldEnumerate = false;
+            }
 
-    @Override
-    public void close() throws IOException {}
+            assignSplit(readers);
+        }
 
-    @Override
-    public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {}
+        log.debug(
+                "No more splits to assign." + " Sending NoMoreSplitsEvent to 
reader {}.", readers);
+        readers.forEach(enumeratorContext::signalNoMoreSplits);
+    }
 
-    @Override
-    public int currentUnassignedSplitSize() {
-        return 0;
+    private Set<KuduSourceSplit> discoverySplits() throws IOException {
+        return kuduInputFormat.createInputSplits();
     }
 
     @Override
-    public void handleSplitRequest(int subtaskId) {}
+    public void close() throws IOException {
+        kuduInputFormat.closeInputFormat();
+    }
 
     @Override
-    public void registerReader(int subtaskId) {
-        int parallelism = enumeratorContext.currentParallelism();
-        if (allSplit.isEmpty()) {
-            if (null != partitionParameter) {
-                Serializable[][] parameterValues =
-                        getParameterValues(
-                                partitionParameter.minValue,
-                                partitionParameter.maxValue,
-                                parallelism);
-                for (int i = 0; i < parameterValues.length; i++) {
-                    allSplit.add(new KuduSourceSplit(parameterValues[i], i));
+    public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
+        log.debug("Add back splits {} to KuduSourceSplitEnumerator.", splits);
+        if (!splits.isEmpty()) {
+            addPendingSplit(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void assignSplit(Collection<Integer> readers) {
+        log.debug("Assign pendingSplits to readers {}", readers);
+
+        for (int reader : readers) {
+            List<KuduSourceSplit> assignmentForReader = 
pendingSplits.remove(reader);
+            if (assignmentForReader != null && !assignmentForReader.isEmpty()) 
{
+                log.info("Assign splits {} to reader {}", assignmentForReader, 
reader);
+                try {
+                    enumeratorContext.assignSplit(reader, assignmentForReader);
+                } catch (Exception e) {
+                    log.error(
+                            "Failed to assign splits {} to reader {}",
+                            assignmentForReader,
+                            reader,
+                            e);
+                    pendingSplits.put(reader, assignmentForReader);
                 }
-            } else {
-                allSplit.add(new KuduSourceSplit(null, 0));
             }
         }
-        // Filter the split that the current task needs to run
-        List<KuduSourceSplit> splits =
-                allSplit.stream()
-                        .filter(p -> p.splitId % parallelism == subtaskId)
-                        .collect(Collectors.toList());
-        enumeratorContext.assignSplit(subtaskId, splits);
-        enumeratorContext.signalNoMoreSplits(subtaskId);
     }
 
-    private Serializable[][] getParameterValues(Long minVal, Long maxVal, int 
parallelism) {
-        this.maxVal = maxVal;
-        this.minVal = minVal;
-        long maxElemCount = (maxVal - minVal) + 1;
-        batchNum = parallelism;
-        getBatchSizeAndBatchNum(parallelism);
-        long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
-
-        Serializable[][] parameters = new Serializable[batchNum][2];
-        long start = minVal;
-        for (int i = 0; i < batchNum; i++) {
-            long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
-            parameters[i] = new Long[] {start, end};
-            start = end + 1;
+    private void addPendingSplit(Collection<KuduSourceSplit> splits) {
+        int readerCount = enumeratorContext.currentParallelism();
+        for (KuduSourceSplit split : splits) {
+            int ownerReader = getSplitOwner(split.splitId(), readerCount);
+            log.info("Assigning {} to {} reader.", split, ownerReader);
+            pendingSplits.computeIfAbsent(ownerReader, r -> new 
ArrayList<>()).add(split);
         }
-        return parameters;
     }
 
-    private void getBatchSizeAndBatchNum(int parallelism) {
-        batchNum = parallelism;
-        long maxElemCount = (maxVal - minVal) + 1;
-        if (batchNum > maxElemCount) {
-            batchNum = (int) maxElemCount;
+    private int getSplitOwner(String splitId, int numReaders) {
+        return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplits.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        throw new KuduConnectorException(
+                CommonErrorCode.UNSUPPORTED_OPERATION,
+                String.format("Unsupported handleSplitRequest: %d", 
subtaskId));
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        log.debug("Register reader {} to KuduSourceSplitEnumerator.", 
subtaskId);
+        if (!pendingSplits.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
         }
-        this.batchNum = batchNum;
-        this.batchSize = new Double(Math.ceil((double) maxElemCount / 
batchNum)).longValue();
     }
 
     @Override
     public KuduSourceState snapshotState(long checkpointId) throws Exception {
-        return null;
+        synchronized (stateLock) {
+            return new KuduSourceState(pendingSplits);

Review Comment:
   ```suggestion
               return new KuduSourceState(new HashMap<>(pendingSplits));
   ```



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java:
##########
@@ -18,109 +18,159 @@
 package org.apache.seatunnel.connectors.seatunnel.kudu.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
 
 public class KuduSourceSplitEnumerator
         implements SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> {
 
+    private static final Logger log = 
LoggerFactory.getLogger(KuduSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<KuduSourceSplit> 
enumeratorContext;
-    private PartitionParameter partitionParameter;
-    List<KuduSourceSplit> allSplit = new ArrayList<>();
-    private Long maxVal;
-    private Long minVal;
-    private Long batchSize;
-    private Integer batchNum;
+    private KuduSourceState checkpointState;
+    private KuduSourceConfig kuduSourceConfig;
+    private final Map<Integer, List<KuduSourceSplit>> pendingSplits;
+
+    private final KuduInputFormat kuduInputFormat;
+
+    private final Object stateLock = new Object();
+    private volatile boolean shouldEnumerate;
+
+    public KuduSourceSplitEnumerator(
+            Context<KuduSourceSplit> enumeratorContext,
+            KuduSourceConfig kuduSourceConfig,
+            KuduInputFormat kuduInputFormat) {
+        this(enumeratorContext, kuduSourceConfig, kuduInputFormat, null);
+    }
 
     public KuduSourceSplitEnumerator(
             SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext,
-            PartitionParameter partitionParameter) {
+            KuduSourceConfig kuduSourceConfig,
+            KuduInputFormat kuduInputFormat,
+            KuduSourceState checkpointState) {
         this.enumeratorContext = enumeratorContext;
-        this.partitionParameter = partitionParameter;
+        this.kuduSourceConfig = kuduSourceConfig;
+        this.pendingSplits = new HashMap<>();
+        this.kuduInputFormat = kuduInputFormat;
+        this.shouldEnumerate = checkpointState == null;
+        this.checkpointState = checkpointState;
     }
 
     @Override
-    public void open() {}
+    public void open() {
+        kuduInputFormat.openInputFormat();
+    }
 
     @Override
-    public void run() {}
+    public void run() throws IOException {
+        Set<Integer> readers = enumeratorContext.registeredReaders();
+        if (shouldEnumerate) {
+            Set<KuduSourceSplit> newSplits = discoverySplits();
+
+            synchronized (stateLock) {
+                addPendingSplit(newSplits);
+                shouldEnumerate = false;
+            }
 
-    @Override
-    public void close() throws IOException {}
+            assignSplit(readers);
+        }
 
-    @Override
-    public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {}
+        log.debug(
+                "No more splits to assign." + " Sending NoMoreSplitsEvent to 
reader {}.", readers);
+        readers.forEach(enumeratorContext::signalNoMoreSplits);
+    }
 
-    @Override
-    public int currentUnassignedSplitSize() {
-        return 0;
+    private Set<KuduSourceSplit> discoverySplits() throws IOException {
+        return kuduInputFormat.createInputSplits();
     }
 
     @Override
-    public void handleSplitRequest(int subtaskId) {}
+    public void close() throws IOException {
+        kuduInputFormat.closeInputFormat();
+    }
 
     @Override
-    public void registerReader(int subtaskId) {
-        int parallelism = enumeratorContext.currentParallelism();
-        if (allSplit.isEmpty()) {
-            if (null != partitionParameter) {
-                Serializable[][] parameterValues =
-                        getParameterValues(
-                                partitionParameter.minValue,
-                                partitionParameter.maxValue,
-                                parallelism);
-                for (int i = 0; i < parameterValues.length; i++) {
-                    allSplit.add(new KuduSourceSplit(parameterValues[i], i));
+    public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
+        log.debug("Add back splits {} to KuduSourceSplitEnumerator.", splits);
+        if (!splits.isEmpty()) {

Review Comment:
   Need add `synchronized (stateLock) `  here, because `pendingSplits` is not a 
thread safe Map.



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java:
##########
@@ -18,109 +18,159 @@
 package org.apache.seatunnel.connectors.seatunnel.kudu.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
 
 public class KuduSourceSplitEnumerator
         implements SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> {
 
+    private static final Logger log = 
LoggerFactory.getLogger(KuduSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<KuduSourceSplit> 
enumeratorContext;
-    private PartitionParameter partitionParameter;
-    List<KuduSourceSplit> allSplit = new ArrayList<>();
-    private Long maxVal;
-    private Long minVal;
-    private Long batchSize;
-    private Integer batchNum;
+    private KuduSourceState checkpointState;
+    private KuduSourceConfig kuduSourceConfig;
+    private final Map<Integer, List<KuduSourceSplit>> pendingSplits;
+
+    private final KuduInputFormat kuduInputFormat;
+
+    private final Object stateLock = new Object();
+    private volatile boolean shouldEnumerate;
+
+    public KuduSourceSplitEnumerator(
+            Context<KuduSourceSplit> enumeratorContext,
+            KuduSourceConfig kuduSourceConfig,
+            KuduInputFormat kuduInputFormat) {
+        this(enumeratorContext, kuduSourceConfig, kuduInputFormat, null);
+    }
 
     public KuduSourceSplitEnumerator(
             SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext,
-            PartitionParameter partitionParameter) {
+            KuduSourceConfig kuduSourceConfig,
+            KuduInputFormat kuduInputFormat,
+            KuduSourceState checkpointState) {
         this.enumeratorContext = enumeratorContext;
-        this.partitionParameter = partitionParameter;
+        this.kuduSourceConfig = kuduSourceConfig;
+        this.pendingSplits = new HashMap<>();
+        this.kuduInputFormat = kuduInputFormat;
+        this.shouldEnumerate = checkpointState == null;
+        this.checkpointState = checkpointState;
     }
 
     @Override
-    public void open() {}
+    public void open() {
+        kuduInputFormat.openInputFormat();
+    }
 
     @Override
-    public void run() {}
+    public void run() throws IOException {
+        Set<Integer> readers = enumeratorContext.registeredReaders();
+        if (shouldEnumerate) {
+            Set<KuduSourceSplit> newSplits = discoverySplits();
+
+            synchronized (stateLock) {
+                addPendingSplit(newSplits);
+                shouldEnumerate = false;
+            }
 
-    @Override
-    public void close() throws IOException {}
+            assignSplit(readers);
+        }
 
-    @Override
-    public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {}
+        log.debug(
+                "No more splits to assign." + " Sending NoMoreSplitsEvent to 
reader {}.", readers);
+        readers.forEach(enumeratorContext::signalNoMoreSplits);
+    }
 
-    @Override
-    public int currentUnassignedSplitSize() {
-        return 0;
+    private Set<KuduSourceSplit> discoverySplits() throws IOException {
+        return kuduInputFormat.createInputSplits();
     }
 
     @Override
-    public void handleSplitRequest(int subtaskId) {}
+    public void close() throws IOException {
+        kuduInputFormat.closeInputFormat();
+    }
 
     @Override
-    public void registerReader(int subtaskId) {
-        int parallelism = enumeratorContext.currentParallelism();
-        if (allSplit.isEmpty()) {
-            if (null != partitionParameter) {
-                Serializable[][] parameterValues =
-                        getParameterValues(
-                                partitionParameter.minValue,
-                                partitionParameter.maxValue,
-                                parallelism);
-                for (int i = 0; i < parameterValues.length; i++) {
-                    allSplit.add(new KuduSourceSplit(parameterValues[i], i));
+    public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
+        log.debug("Add back splits {} to KuduSourceSplitEnumerator.", splits);
+        if (!splits.isEmpty()) {
+            addPendingSplit(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void assignSplit(Collection<Integer> readers) {
+        log.debug("Assign pendingSplits to readers {}", readers);
+
+        for (int reader : readers) {
+            List<KuduSourceSplit> assignmentForReader = 
pendingSplits.remove(reader);
+            if (assignmentForReader != null && !assignmentForReader.isEmpty()) 
{
+                log.info("Assign splits {} to reader {}", assignmentForReader, 
reader);
+                try {
+                    enumeratorContext.assignSplit(reader, assignmentForReader);
+                } catch (Exception e) {
+                    log.error(
+                            "Failed to assign splits {} to reader {}",
+                            assignmentForReader,
+                            reader,
+                            e);
+                    pendingSplits.put(reader, assignmentForReader);
                 }
-            } else {
-                allSplit.add(new KuduSourceSplit(null, 0));
             }
         }
-        // Filter the split that the current task needs to run
-        List<KuduSourceSplit> splits =
-                allSplit.stream()
-                        .filter(p -> p.splitId % parallelism == subtaskId)
-                        .collect(Collectors.toList());
-        enumeratorContext.assignSplit(subtaskId, splits);
-        enumeratorContext.signalNoMoreSplits(subtaskId);
     }
 
-    private Serializable[][] getParameterValues(Long minVal, Long maxVal, int 
parallelism) {
-        this.maxVal = maxVal;
-        this.minVal = minVal;
-        long maxElemCount = (maxVal - minVal) + 1;
-        batchNum = parallelism;
-        getBatchSizeAndBatchNum(parallelism);
-        long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
-
-        Serializable[][] parameters = new Serializable[batchNum][2];
-        long start = minVal;
-        for (int i = 0; i < batchNum; i++) {
-            long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
-            parameters[i] = new Long[] {start, end};
-            start = end + 1;
+    private void addPendingSplit(Collection<KuduSourceSplit> splits) {
+        int readerCount = enumeratorContext.currentParallelism();
+        for (KuduSourceSplit split : splits) {
+            int ownerReader = getSplitOwner(split.splitId(), readerCount);
+            log.info("Assigning {} to {} reader.", split, ownerReader);
+            pendingSplits.computeIfAbsent(ownerReader, r -> new 
ArrayList<>()).add(split);
         }
-        return parameters;
     }
 
-    private void getBatchSizeAndBatchNum(int parallelism) {
-        batchNum = parallelism;
-        long maxElemCount = (maxVal - minVal) + 1;
-        if (batchNum > maxElemCount) {
-            batchNum = (int) maxElemCount;
+    private int getSplitOwner(String splitId, int numReaders) {
+        return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplits.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        throw new KuduConnectorException(
+                CommonErrorCode.UNSUPPORTED_OPERATION,
+                String.format("Unsupported handleSplitRequest: %d", 
subtaskId));
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        log.debug("Register reader {} to KuduSourceSplitEnumerator.", 
subtaskId);
+        if (!pendingSplits.isEmpty()) {

Review Comment:
   Need synchronized (stateLock) too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to