This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 10e791d60a [Feature][Connector-V2] Support 
TableSourceFactory/TableSinkFactory on kudu (#5789)
10e791d60a is described below

commit 10e791d60a1d287e2b4e94e2f1ec2deace3eb40a
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Wed Nov 8 11:50:36 2023 +0800

    [Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on kudu 
(#5789)
---
 .../seatunnel/kudu/kuduclient/KuduInputFormat.java |  4 ++
 .../connectors/seatunnel/kudu/sink/KuduSink.java   | 35 ++--------
 .../seatunnel/kudu/sink/KuduSinkFactory.java       | 12 ++++
 .../seatunnel/kudu/source/KuduSource.java          | 80 ++--------------------
 .../seatunnel/kudu/source/KuduSourceFactory.java   | 71 +++++++++++++++++++
 5 files changed, 99 insertions(+), 103 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
index 006a8adb3c..6fec8ed6e9 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
@@ -109,4 +109,8 @@ public class KuduInputFormat implements Serializable {
     public KuduScanner scanner(byte[] token) throws IOException {
         return KuduScanToken.deserializeIntoScanner(token, kuduClient);
     }
+
+    public SeaTunnelRowType getRowTypeInfo() {
+        return this.rowTypeInfo;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
index 0c8494afe8..0cc827f1ed 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
@@ -17,21 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
@@ -52,29 +44,14 @@ public class KuduSink
     private KuduSinkConfig kuduSinkConfig;
     private SeaTunnelRowType seaTunnelRowType;
 
-    @Override
-    public String getPluginName() {
-        return "kudu";
+    public KuduSink(KuduSinkConfig kuduSinkConfig, CatalogTable catalogTable) {
+        this.kuduSinkConfig = kuduSinkConfig;
+        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
-        ConfigValidator.of(config).validate(new 
KuduSinkFactory().optionRule());
-        try {
-            kuduSinkConfig = new KuduSinkConfig(config);
-        } catch (Exception e) {
-            throw new KuduConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
ExceptionUtils.getMessage(e)));
-        }
+    public String getPluginName() {
+        return "kudu";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
index c78e7859a7..fc94e94b88 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
@@ -17,9 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
 
 import com.google.auto.service.AutoService;
@@ -64,4 +68,12 @@ public class KuduSinkFactory implements TableSinkFactory {
                         KuduSinkConfig.KERBEROS_KEYTAB)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig config = context.getOptions();
+        CatalogTable catalogTable = context.getCatalogTable();
+        KuduSinkConfig kuduSinkConfig = new KuduSinkConfig(config);
+        return () -> new KuduSink(kuduSinkConfig, catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index 3748de2522..d43a02c7c9 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -17,41 +17,20 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
-import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
-import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.client.KuduClient;
 
 import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.ArrayList;
-import java.util.List;
-
 @Slf4j
 @AutoService(SeaTunnelSource.class)
 public class KuduSource
@@ -62,6 +41,12 @@ public class KuduSource
     private KuduInputFormat kuduInputFormat;
     private KuduSourceConfig kuduSourceConfig;
 
+    public KuduSource(KuduSourceConfig kuduSourceConfig, KuduInputFormat 
kuduInputFormat) {
+        this.kuduSourceConfig = kuduSourceConfig;
+        this.kuduInputFormat = kuduInputFormat;
+        this.rowTypeInfo = kuduInputFormat.getRowTypeInfo();
+    }
+
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
@@ -96,57 +81,4 @@ public class KuduSource
     public String getPluginName() {
         return "Kudu";
     }
-
-    @Override
-    public void prepare(Config pluginConfig) {
-        ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
-        ConfigValidator.of(config).validate(new 
KuduSourceFactory().optionRule());
-        try {
-            kuduSourceConfig = new KuduSourceConfig(config);
-        } catch (Exception e) {
-            throw new KuduConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
ExceptionUtils.getMessage(e)));
-        }
-
-        if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
-            rowTypeInfo = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-        } else {
-            try (KuduClient kuduClient = 
KuduUtil.getKuduClient(kuduSourceConfig)) {
-                rowTypeInfo =
-                        getSeaTunnelRowType(
-                                kuduClient
-                                        .openTable(kuduSourceConfig.getTable())
-                                        .getSchema()
-                                        .getColumns());
-            } catch (Exception e) {
-                throw new 
KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
-            }
-        }
-        kuduInputFormat = new KuduInputFormat(kuduSourceConfig, rowTypeInfo);
-    }
-
-    public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> 
columnSchemaList) {
-        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
-        ArrayList<String> fieldNames = new ArrayList<>();
-        try {
-
-            for (int i = 0; i < columnSchemaList.size(); i++) {
-                fieldNames.add(columnSchemaList.get(i).getName());
-                
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
-            }
-
-        } catch (Exception e) {
-            throw new KuduConnectorException(
-                    CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            "Kudu", PluginType.SOURCE, 
ExceptionUtils.getMessage(e)));
-        }
-        return new SeaTunnelRowType(
-                fieldNames.toArray(new String[0]),
-                seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
index 5bb035633d..2aebca68d1 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
@@ -17,16 +17,38 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.source;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduClient;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
 import static 
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.MASTER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_NAME;
 
@@ -63,4 +85,53 @@ public class KuduSourceFactory implements TableSourceFactory 
{
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return KuduSource.class;
     }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        ReadonlyConfig config = context.getOptions();
+        KuduSourceConfig kuduSourceConfig = new KuduSourceConfig(config);
+        SeaTunnelRowType rowTypeInfo;
+        if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+            rowTypeInfo = 
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+        } else {
+            try (KuduClient kuduClient = 
KuduUtil.getKuduClient(kuduSourceConfig)) {
+                rowTypeInfo =
+                        getSeaTunnelRowType(
+                                kuduClient
+                                        .openTable(kuduSourceConfig.getTable())
+                                        .getSchema()
+                                        .getColumns());
+            } catch (Exception e) {
+                throw new 
KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
+            }
+        }
+        KuduInputFormat kuduInputFormat = new 
KuduInputFormat(kuduSourceConfig, rowTypeInfo);
+
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new KuduSource(kuduSourceConfig, kuduInputFormat);
+    }
+
+    public static SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> 
columnSchemaList) {
+        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+        ArrayList<String> fieldNames = new ArrayList<>();
+        try {
+
+            for (int i = 0; i < columnSchemaList.size(); i++) {
+                fieldNames.add(columnSchemaList.get(i).getName());
+                
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
+            }
+
+        } catch (Exception e) {
+            throw new KuduConnectorException(
+                    CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            "Kudu", PluginType.SOURCE, 
ExceptionUtils.getMessage(e)));
+        }
+        return new SeaTunnelRowType(
+                fieldNames.toArray(new String[0]),
+                seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
+    }
 }

Reply via email to