This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 10e791d60a [Feature][Connector-V2] Support
TableSourceFactory/TableSinkFactory on kudu (#5789)
10e791d60a is described below
commit 10e791d60a1d287e2b4e94e2f1ec2deace3eb40a
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Wed Nov 8 11:50:36 2023 +0800
[Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on kudu
(#5789)
---
.../seatunnel/kudu/kuduclient/KuduInputFormat.java | 4 ++
.../connectors/seatunnel/kudu/sink/KuduSink.java | 35 ++--------
.../seatunnel/kudu/sink/KuduSinkFactory.java | 12 ++++
.../seatunnel/kudu/source/KuduSource.java | 80 ++--------------------
.../seatunnel/kudu/source/KuduSourceFactory.java | 71 +++++++++++++++++++
5 files changed, 99 insertions(+), 103 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
index 006a8adb3c..6fec8ed6e9 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
@@ -109,4 +109,8 @@ public class KuduInputFormat implements Serializable {
public KuduScanner scanner(byte[] token) throws IOException {
return KuduScanToken.deserializeIntoScanner(token, kuduClient);
}
+
+ public SeaTunnelRowType getRowTypeInfo() {
+ return this.rowTypeInfo;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
index 0c8494afe8..0cc827f1ed 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
@@ -17,21 +17,13 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
-import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
@@ -52,29 +44,14 @@ public class KuduSink
private KuduSinkConfig kuduSinkConfig;
private SeaTunnelRowType seaTunnelRowType;
- @Override
- public String getPluginName() {
- return "kudu";
+ public KuduSink(KuduSinkConfig kuduSinkConfig, CatalogTable catalogTable) {
+ this.kuduSinkConfig = kuduSinkConfig;
+ this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
}
@Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
- ConfigValidator.of(config).validate(new
KuduSinkFactory().optionRule());
- try {
- kuduSinkConfig = new KuduSinkConfig(config);
- } catch (Exception e) {
- throw new KuduConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
ExceptionUtils.getMessage(e)));
- }
+ public String getPluginName() {
+ return "kudu";
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
index c78e7859a7..fc94e94b88 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
@@ -17,9 +17,13 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
import com.google.auto.service.AutoService;
@@ -64,4 +68,12 @@ public class KuduSinkFactory implements TableSinkFactory {
KuduSinkConfig.KERBEROS_KEYTAB)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig config = context.getOptions();
+ CatalogTable catalogTable = context.getCatalogTable();
+ KuduSinkConfig kuduSinkConfig = new KuduSinkConfig(config);
+ return () -> new KuduSink(kuduSinkConfig, catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index 3748de2522..d43a02c7c9 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -17,41 +17,20 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
-import
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
-import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.client.KuduClient;
import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
-import java.util.ArrayList;
-import java.util.List;
-
@Slf4j
@AutoService(SeaTunnelSource.class)
public class KuduSource
@@ -62,6 +41,12 @@ public class KuduSource
private KuduInputFormat kuduInputFormat;
private KuduSourceConfig kuduSourceConfig;
+ public KuduSource(KuduSourceConfig kuduSourceConfig, KuduInputFormat
kuduInputFormat) {
+ this.kuduSourceConfig = kuduSourceConfig;
+ this.kuduInputFormat = kuduInputFormat;
+ this.rowTypeInfo = kuduInputFormat.getRowTypeInfo();
+ }
+
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
@@ -96,57 +81,4 @@ public class KuduSource
public String getPluginName() {
return "Kudu";
}
-
- @Override
- public void prepare(Config pluginConfig) {
- ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
- ConfigValidator.of(config).validate(new
KuduSourceFactory().optionRule());
- try {
- kuduSourceConfig = new KuduSourceConfig(config);
- } catch (Exception e) {
- throw new KuduConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
ExceptionUtils.getMessage(e)));
- }
-
- if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
- rowTypeInfo =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- } else {
- try (KuduClient kuduClient =
KuduUtil.getKuduClient(kuduSourceConfig)) {
- rowTypeInfo =
- getSeaTunnelRowType(
- kuduClient
- .openTable(kuduSourceConfig.getTable())
- .getSchema()
- .getColumns());
- } catch (Exception e) {
- throw new
KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
- }
- }
- kuduInputFormat = new KuduInputFormat(kuduSourceConfig, rowTypeInfo);
- }
-
- public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema>
columnSchemaList) {
- ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
- ArrayList<String> fieldNames = new ArrayList<>();
- try {
-
- for (int i = 0; i < columnSchemaList.size(); i++) {
- fieldNames.add(columnSchemaList.get(i).getName());
-
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
- }
-
- } catch (Exception e) {
- throw new KuduConnectorException(
- CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- "Kudu", PluginType.SOURCE,
ExceptionUtils.getMessage(e)));
- }
- return new SeaTunnelRowType(
- fieldNames.toArray(new String[0]),
- seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
- }
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
index 5bb035633d..2aebca68d1 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
@@ -17,16 +17,38 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+import
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduClient;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
import static
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.MASTER;
import static
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_NAME;
@@ -63,4 +85,53 @@ public class KuduSourceFactory implements TableSourceFactory
{
public Class<? extends SeaTunnelSource> getSourceClass() {
return KuduSource.class;
}
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ ReadonlyConfig config = context.getOptions();
+ KuduSourceConfig kuduSourceConfig = new KuduSourceConfig(config);
+ SeaTunnelRowType rowTypeInfo;
+ if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+ rowTypeInfo =
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+ } else {
+ try (KuduClient kuduClient =
KuduUtil.getKuduClient(kuduSourceConfig)) {
+ rowTypeInfo =
+ getSeaTunnelRowType(
+ kuduClient
+ .openTable(kuduSourceConfig.getTable())
+ .getSchema()
+ .getColumns());
+ } catch (Exception e) {
+ throw new
KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
+ }
+ }
+ KuduInputFormat kuduInputFormat = new
KuduInputFormat(kuduSourceConfig, rowTypeInfo);
+
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new KuduSource(kuduSourceConfig, kuduInputFormat);
+ }
+
+ public static SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema>
columnSchemaList) {
+ ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+ ArrayList<String> fieldNames = new ArrayList<>();
+ try {
+
+ for (int i = 0; i < columnSchemaList.size(); i++) {
+ fieldNames.add(columnSchemaList.get(i).getName());
+
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
+ }
+
+ } catch (Exception e) {
+ throw new KuduConnectorException(
+ CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ "Kudu", PluginType.SOURCE,
ExceptionUtils.getMessage(e)));
+ }
+ return new SeaTunnelRowType(
+ fieldNames.toArray(new String[0]),
+ seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
+ }
}