This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a7ca51b585 [Feature][Connector-V2][Tablestore] Support Source
connector for Tablestore #7448 (#7467)
a7ca51b585 is described below
commit a7ca51b58593fc34758e0d713a85b131f99fd5fe
Author: cloud456 <[email protected]>
AuthorDate: Tue Aug 27 22:31:08 2024 +0800
[Feature][Connector-V2][Tablestore] Support Source connector for
Tablestore #7448 (#7467)
---
docs/en/connector-v2/source/Tablestore.md | 102 ++++++++++++
plugin-mapping.properties | 1 +
.../tablestore/config/TablestoreOptions.java | 19 +++
.../serialize/DefaultSeaTunnelRowDeserializer.java | 38 +++++
.../serialize/SeaTunnelRowDeserializer.java | 26 +++
.../tablestore/source/TableStoreDBSource.java | 102 ++++++++++++
.../source/TableStoreDBSourceReader.java | 175 +++++++++++++++++++++
.../tablestore/source/TableStoreDBSourceSplit.java | 38 +++++
.../source/TableStoreDBSourceSplitEnumerator.java | 166 +++++++++++++++++++
.../tablestore/source/TableStoreDBSourceState.java | 34 ++++
.../source/TableStoreDbSourceFactory.java | 64 ++++++++
.../tablestore/source/TableStoreProcessor.java | 95 +++++++++++
12 files changed, 860 insertions(+)
diff --git a/docs/en/connector-v2/source/Tablestore.md
b/docs/en/connector-v2/source/Tablestore.md
new file mode 100644
index 0000000000..8e0d1aeebc
--- /dev/null
+++ b/docs/en/connector-v2/source/Tablestore.md
@@ -0,0 +1,102 @@
+# Tablestore
+
+> Tablestore source connector
+
+## Description
+
+Read data from Alicloud Tablestoreļ¼support full and CDC.
+
+
+## Key features
+
+- [ ] [batch](../../concept/connector-v2-features.md)
+- [X] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|-----------------------|--------|----------|---------------|
+| end_point | string | yes | - |
+| instance_name | string | yes | - |
+| access_key_id | string | yes | - |
+| access_key_secret | string | yes | - |
+| table | string | yes | - |
+| primary_keys | array | yes | - |
+| schema | config | yes | - |
+
+
+### end_point [string]
+
+The endpoint of Tablestore.
+
+### instance_name [string]
+
+The intance name of Tablestore.
+
+### access_key_id [string]
+
+The access id of Tablestore.
+
+### access_key_secret [string]
+
+The access secret of Tablestore.
+
+### table [string]
+
+The table name of Tablestore.
+
+### primary_keys [array]
+
+The primarky key of table,just add a unique primary key.
+
+### schema [Config]
+
+
+
+## Example
+
+```bash
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Tablestore {
+ end_point = "https://****.cn-zhangjiakou.tablestore.aliyuncs.com"
+ instance_name = "****"
+ access_key_id="***************2Ag5"
+ access_key_secret="***********2Dok"
+ table="test"
+ primary_keys=["id"]
+ schema={
+ fields {
+ id = string
+ name = string
+ }
+ }
+ }
+}
+
+
+sink {
+ MongoDB{
+ uri = "mongodb://localhost:27017"
+ database = "test"
+ collection = "test"
+ primary-key = ["id"]
+ schema = {
+ fields {
+ id = string
+ name = string
+ }
+ }
+ }
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index d77b70e5e8..ece3bd0c77 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -85,6 +85,7 @@ seatunnel.sink.InfluxDB = connector-influxdb
seatunnel.source.GoogleSheets = connector-google-sheets
seatunnel.sink.GoogleFirestore = connector-google-firestore
seatunnel.sink.Tablestore = connector-tablestore
+seatunnel.source.Tablestore = connector-tablestore
seatunnel.source.Lemlist = connector-http-lemlist
seatunnel.source.Klaviyo = connector-http-klaviyo
seatunnel.sink.Slack = connector-slack
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
index 7b2aa6bae6..be12181893 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
@@ -19,11 +19,14 @@ package
org.apache.seatunnel.connectors.seatunnel.tablestore.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
+import java.util.Map;
import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
@@ -45,6 +48,8 @@ public class TablestoreOptions implements Serializable {
public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());
+ public TablestoreOptions() {}
+
public TablestoreOptions(Config config) {
this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
this.instanceName =
config.getString(TablestoreConfig.INSTANCE_NAME.key());
@@ -57,4 +62,18 @@ public class TablestoreOptions implements Serializable {
this.batchSize = config.getInt(BATCH_SIZE.key());
}
}
+
+ public static TablestoreOptions of(ReadonlyConfig config) {
+ Map<String, Object> map = config.getSourceMap();
+ TablestoreOptions tablestoreOptions = new TablestoreOptions();
+ tablestoreOptions.setEndpoint(config.get(TablestoreConfig.END_POINT));
+
tablestoreOptions.setInstanceName(config.get(TablestoreConfig.INSTANCE_NAME));
+
tablestoreOptions.setAccessKeyId(config.get(TablestoreConfig.ACCESS_KEY_ID));
+
tablestoreOptions.setAccessKeySecret(config.get(TablestoreConfig.ACCESS_KEY_SECRET));
+ tablestoreOptions.setTable(config.get(TablestoreConfig.TABLE));
+ List<String> keys = (List<String>)
map.get(TablestoreConfig.PRIMARY_KEYS.key());
+
+ tablestoreOptions.setPrimaryKeys(keys);
+ return tablestoreOptions;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowDeserializer.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowDeserializer.java
new file mode 100644
index 0000000000..9bdb060a49
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowDeserializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import com.alicloud.openservices.tablestore.model.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer {
+
+ @Override
+ public SeaTunnelRow deserialize(StreamRecord r) {
+ List<Object> fields = new ArrayList<>();
+ r.getColumns()
+ .forEach(
+ k -> {
+ fields.add(k.getColumn().getValue());
+ });
+ return new SeaTunnelRow(fields.toArray());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowDeserializer.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowDeserializer.java
new file mode 100644
index 0000000000..44a2560693
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowDeserializer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import com.alicloud.openservices.tablestore.model.StreamRecord;
+
+public interface SeaTunnelRowDeserializer {
+
+ SeaTunnelRow deserialize(StreamRecord streamRecord);
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
new file mode 100644
index 0000000000..85c0062ed3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceReader.Context;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.JobMode;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@Slf4j
+public class TableStoreDBSource
+ implements SeaTunnelSource<SeaTunnelRow, TableStoreDBSourceSplit,
TableStoreDBSourceState>,
+ SupportParallelism,
+ SupportColumnProjection {
+
+ private TablestoreOptions tablestoreOptions;
+ private SeaTunnelRowType typeInfo;
+ private JobContext jobContext;
+
+ @Override
+ public String getPluginName() {
+ return "Tablestore";
+ }
+
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ return SeaTunnelSource.super.getProducedCatalogTables();
+ }
+
+ public TableStoreDBSource(ReadonlyConfig config) {
+ this.tablestoreOptions = TablestoreOptions.of(config);
+ CatalogTableUtil.buildWithConfig(config);
+ this.typeInfo =
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return JobMode.BATCH.equals(jobContext.getJobMode())
+ ? Boundedness.BOUNDED
+ : Boundedness.UNBOUNDED;
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, TableStoreDBSourceSplit>
createReader(Context readerContext)
+ throws Exception {
+ return new TableStoreDBSourceReader(readerContext, tablestoreOptions,
typeInfo);
+ }
+
+ @Override
+ public SourceSplitEnumerator<TableStoreDBSourceSplit,
TableStoreDBSourceState> createEnumerator(
+
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<TableStoreDBSourceSplit>
+ enumeratorContext)
+ throws Exception {
+ return new TableStoreDBSourceSplitEnumerator(enumeratorContext,
tablestoreOptions);
+ }
+
+ @Override
+ public SourceSplitEnumerator<TableStoreDBSourceSplit,
TableStoreDBSourceState>
+ restoreEnumerator(
+
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<
+ TableStoreDBSourceSplit>
+ enumeratorContext,
+ TableStoreDBSourceState checkpointState)
+ throws Exception {
+ return new TableStoreDBSourceSplitEnumerator(
+ enumeratorContext, tablestoreOptions, checkpointState);
+ }
+
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
new file mode 100644
index 0000000000..eefd4aae03
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
@@ -0,0 +1,175 @@
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import com.alicloud.openservices.tablestore.SyncClient;
+import com.alicloud.openservices.tablestore.TunnelClient;
+import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
+import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
+import com.alicloud.openservices.tablestore.model.tunnel.DeleteTunnelRequest;
+import com.alicloud.openservices.tablestore.model.tunnel.DeleteTunnelResponse;
+import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelRequest;
+import
com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelResponse;
+import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
+import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
+import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+@Slf4j
+public class TableStoreDBSourceReader
+ implements SourceReader<SeaTunnelRow, TableStoreDBSourceSplit> {
+
+ protected SourceReader.Context context;
+ protected TablestoreOptions tablestoreOptions;
+ protected SeaTunnelRowType seaTunnelRowType;
+ Queue<TableStoreDBSourceSplit> pendingSplits = new
ConcurrentLinkedDeque<>();
+ private SyncClient client;
+ private volatile boolean noMoreSplit;
+ private TunnelClient tunnelClient;
+
+ public TableStoreDBSourceReader(
+ SourceReader.Context context,
+ TablestoreOptions options,
+ SeaTunnelRowType seaTunnelRowType) {
+
+ this.context = context;
+ this.tablestoreOptions = options;
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public void open() throws Exception {
+ client =
+ new SyncClient(
+ tablestoreOptions.getEndpoint(),
+ tablestoreOptions.getAccessKeyId(),
+ tablestoreOptions.getAccessKeySecret(),
+ tablestoreOptions.getInstanceName());
+ tunnelClient =
+ new TunnelClient(
+ tablestoreOptions.getEndpoint(),
+ tablestoreOptions.getAccessKeyId(),
+ tablestoreOptions.getAccessKeySecret(),
+ tablestoreOptions.getInstanceName());
+ }
+
+ @Override
+ public void close() throws IOException {
+ tunnelClient.shutdown();
+ client.shutdown();
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ synchronized (output.getCheckpointLock()) {
+ TableStoreDBSourceSplit split = pendingSplits.poll();
+ if (Objects.nonNull(split)) {
+ read(split, output);
+ }
+ /*if (split == null) {
+ log.info(
+ "TableStore Source Reader [{}] waiting for splits",
+ context.getIndexOfSubtask());
+ }*/
+ if (noMoreSplit) {
+ // signal to the source that we have reached the end of the
data.
+ log.info("Closed the bounded tablestore source");
+ context.signalNoMoreElement();
+ Thread.sleep(2000L);
+ } else {
+ Thread.sleep(1000L);
+ }
+ }
+ }
+
+ private void read(TableStoreDBSourceSplit split, Collector<SeaTunnelRow>
output) {
+ String tunnelId = getTunel(split);
+ TableStoreProcessor processor =
+ new TableStoreProcessor(split.getTableName(),
split.getPrimaryKey(), output);
+ TunnelWorkerConfig workerConfig = new TunnelWorkerConfig(processor);
+ TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient,
workerConfig);
+ try {
+ worker.connectAndWorking();
+ } catch (Exception e) {
+ log.error("Start OTS tunnel failed.", e);
+ worker.shutdown();
+ }
+ }
+
+ public String getTunel(TableStoreDBSourceSplit split) {
+ deleteTunel(split);
+ String tunnelId = null;
+ String tunnelName = split.getTableName() + "_migration2aws_tunnel4" +
split.getSplitId();
+
+ try {
+ DescribeTunnelRequest drequest = new DescribeTunnelRequest("test",
tunnelName);
+ DescribeTunnelResponse dresp =
tunnelClient.describeTunnel(drequest);
+ tunnelId = dresp.getTunnelInfo().getTunnelId();
+ } catch (Exception be) {
+ CreateTunnelRequest crequest =
+ new CreateTunnelRequest(
+ split.getTableName(), tunnelName,
TunnelType.valueOf("BaseAndStream"));
+ CreateTunnelResponse cresp = tunnelClient.createTunnel(crequest);
+ tunnelId = cresp.getTunnelId();
+ }
+ log.info("Tunnel found, Id: " + tunnelId);
+ return tunnelId;
+ }
+
+ public void deleteTunel(TableStoreDBSourceSplit split) {
+ String tunnelName = split.getTableName() + "_migration2aws_tunnel4" +
split.getSplitId();
+ try {
+ DeleteTunnelRequest drequest =
+ new DeleteTunnelRequest(split.getTableName(), tunnelName);
+ DeleteTunnelResponse dresp = tunnelClient.deleteTunnel(drequest);
+ log.info("Tunnel has been deleted: " + dresp.toString());
+ } catch (Exception be) {
+ log.warn("Tunnel deletion failed due to not found: " + tunnelName);
+ }
+ }
+
+ @Override
+ public List<TableStoreDBSourceSplit> snapshotState(long checkpointId)
throws Exception {
+ return new ArrayList<>(pendingSplits);
+ }
+
+ @Override
+ public void addSplits(List<TableStoreDBSourceSplit> splits) {
+ this.pendingSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ log.info("Reader [{}] received noMoreSplit event.",
context.getIndexOfSubtask());
+ noMoreSplit = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
new file mode 100644
index 0000000000..24328b0a6f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@AllArgsConstructor
+@Getter
+@Setter
+public class TableStoreDBSourceSplit implements SourceSplit {
+
+ private Integer splitId;
+ private String tableName;
+ private String primaryKey;
+
+ @Override
+ public String splitId() {
+ return splitId.toString();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
new file mode 100644
index 0000000000..3dd58b7e69
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class TableStoreDBSourceSplitEnumerator
+ implements SourceSplitEnumerator<TableStoreDBSourceSplit,
TableStoreDBSourceState> {
+
+ private final SourceSplitEnumerator.Context<TableStoreDBSourceSplit>
enumeratorContext;
+ private final Map<Integer, List<TableStoreDBSourceSplit>> pendingSplits;
+ private final TablestoreOptions tablestoreOptions;
+
+ private final Object stateLock = new Object();
+ private volatile boolean shouldEnumerate;
+
+ /**
+ * @param enumeratorContext
+ * @param tablestoreOptions
+ */
+ public TableStoreDBSourceSplitEnumerator(
+ Context<TableStoreDBSourceSplit> enumeratorContext,
+ TablestoreOptions tablestoreOptions) {
+ this(enumeratorContext, tablestoreOptions, null);
+ }
+
+ public TableStoreDBSourceSplitEnumerator(
+ Context<TableStoreDBSourceSplit> enumeratorContext,
+ TablestoreOptions tablestoreOptions,
+ TableStoreDBSourceState sourceState) {
+ this.enumeratorContext = enumeratorContext;
+ this.tablestoreOptions = tablestoreOptions;
+ this.pendingSplits = new HashMap<>();
+ this.shouldEnumerate = sourceState == null;
+ if (sourceState != null) {
+ this.shouldEnumerate = sourceState.isShouldEnumerate();
+ this.pendingSplits.putAll(sourceState.getPendingSplits());
+ }
+ }
+
+ @Override
+ public void open() {}
+
+ @Override
+ public void run() throws Exception {
+ Set<Integer> readers = enumeratorContext.registeredReaders();
+ if (shouldEnumerate) {
+ Set<TableStoreDBSourceSplit> newSplits =
getTableStoreDBSourceSplit();
+ synchronized (stateLock) {
+ addPendingSplit(newSplits);
+ shouldEnumerate = false;
+ }
+ assignSplit(readers);
+ }
+ }
+
+ private void assignSplit(Set<Integer> readers) {
+ for (int reader : readers) {
+ List<TableStoreDBSourceSplit> assignmentForReader =
pendingSplits.remove(reader);
+ if (assignmentForReader != null && !assignmentForReader.isEmpty())
{
+ log.info("Assign splits {} to reader {}", assignmentForReader,
reader);
+ try {
+ enumeratorContext.assignSplit(reader, assignmentForReader);
+ } catch (Exception e) {
+ log.error(
+ "Failed to assign splits {} to reader {}",
+ assignmentForReader,
+ reader,
+ e);
+ pendingSplits.put(reader, assignmentForReader);
+ }
+ }
+ }
+ }
+
+ private Set<TableStoreDBSourceSplit> getTableStoreDBSourceSplit() {
+
+ Set<TableStoreDBSourceSplit> allSplit = new HashSet<>();
+ String tables = tablestoreOptions.getTable();
+ String[] tableArr = tables.split(",");
+ for (int i = 0; i < tableArr.length; i++) {
+ allSplit.add(
+ new TableStoreDBSourceSplit(
+ i, tableArr[i],
tablestoreOptions.getPrimaryKeys().get(i)));
+ }
+ return allSplit;
+ }
+
+ private void addPendingSplit(Collection<TableStoreDBSourceSplit> splits) {
+ int readerCount = enumeratorContext.currentParallelism();
+ for (TableStoreDBSourceSplit split : splits) {
+ int ownerReader = split.getSplitId() % readerCount;
+ pendingSplits.computeIfAbsent(ownerReader, k -> new
ArrayList<>()).add(split);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("Unimplemented method
'close'");
+ }
+
+ @Override
+ public void addSplitsBack(List<TableStoreDBSourceSplit> splits, int
subtaskId) {
+ log.debug("Add back splits {} to tablestore.", splits);
+ if (!splits.isEmpty()) {
+ addPendingSplit(splits);
+ assignSplit(Collections.singleton(subtaskId));
+ enumeratorContext.signalNoMoreSplits(subtaskId);
+ }
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplits.size();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {}
+
+ @Override
+ public void registerReader(int subtaskId) {
+ log.debug("Register reader {} to TablestoreSplitEnumerator.",
subtaskId);
+ if (!pendingSplits.isEmpty()) {
+ assignSplit(Collections.singleton(subtaskId));
+ }
+ }
+
+ @Override
+ public TableStoreDBSourceState snapshotState(long checkpointId) throws
Exception {
+ synchronized (stateLock) {
+ return new TableStoreDBSourceState(shouldEnumerate, pendingSplits);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
new file mode 100644
index 0000000000..05a73a6310
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Setter
+@AllArgsConstructor
+public class TableStoreDBSourceState implements Serializable {
+
+ private boolean shouldEnumerate;
+ private Map<Integer, List<TableStoreDBSourceSplit>> pendingSplits;
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
new file mode 100644
index 0000000000..f93ae4bfe3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class TableStoreDbSourceFactory implements TableSourceFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "Tablestore";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ TablestoreConfig.END_POINT,
+ TablestoreConfig.INSTANCE_NAME,
+ TablestoreConfig.ACCESS_KEY_ID,
+ TablestoreConfig.ACCESS_KEY_SECRET,
+ TablestoreConfig.TABLE,
+ TablestoreConfig.PRIMARY_KEYS)
+ .build();
+ }
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>) new
TableStoreDBSource(context.getOptions());
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return TableStoreDBSource.class;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreProcessor.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreProcessor.java
new file mode 100644
index 0000000000..ba5334a85e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreProcessor.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowDeserializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alicloud.openservices.tablestore.model.StreamRecord;
+import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
+import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TableStoreProcessor implements IChannelProcessor {
+ private String tableName = null;
+ private String primaryKey = null;
+ private Collector<SeaTunnelRow> output = null;
+ protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
+ private static final Logger log =
LoggerFactory.getLogger(TableStoreProcessor.class);
+
+ public TableStoreProcessor(
+ String tableName, String primaryKey, Collector<SeaTunnelRow>
output) {
+ this.tableName = tableName;
+ this.primaryKey = primaryKey;
+ this.output = output;
+ }
+
+ @Override
+ public void process(ProcessRecordsInput input) {
+ log.info("Default record processor, would print records count");
+
+ log.info(
+ String.format(
+ "Process %d records, NextToken: %s",
+ input.getRecords().size(), input.getNextToken()));
+
+ for (StreamRecord r : input.getRecords()) {
+ try {
+ List<Object> fields = new ArrayList<>();
+ Arrays.stream(r.getPrimaryKey().getPrimaryKeyColumns())
+ .forEach(
+ k -> {
+ fields.add(k.getValue().toString());
+ });
+ r.getColumns()
+ .forEach(
+ k -> {
+
fields.add(k.getColumn().getValue().toString());
+ });
+ SeaTunnelRow row = new SeaTunnelRow(fields.toArray());
+ row.setTableId(tableName);
+ switch ((r.getRecordType())) {
+ case PUT:
+ row.setRowKind(RowKind.INSERT);
+ break;
+ case UPDATE:
+ row.setRowKind(RowKind.UPDATE_AFTER);
+ break;
+ case DELETE:
+ row.setRowKind(RowKind.DELETE);
+ break;
+ }
+ output.collect(row);
+ } catch (Exception e) {
+ log.error("send to target failed with record: " +
r.toString(), e);
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ log.info("process shutdown du to finished for table: " + tableName);
+ }
+}