This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a7ca51b585  [Feature][Connector-V2][Tablestore] Support Source 
connector for Tablestore #7448  (#7467)
a7ca51b585 is described below

commit a7ca51b58593fc34758e0d713a85b131f99fd5fe
Author: cloud456 <[email protected]>
AuthorDate: Tue Aug 27 22:31:08 2024 +0800

     [Feature][Connector-V2][Tablestore] Support Source connector for 
Tablestore #7448  (#7467)
---
 docs/en/connector-v2/source/Tablestore.md          | 102 ++++++++++++
 plugin-mapping.properties                          |   1 +
 .../tablestore/config/TablestoreOptions.java       |  19 +++
 .../serialize/DefaultSeaTunnelRowDeserializer.java |  38 +++++
 .../serialize/SeaTunnelRowDeserializer.java        |  26 +++
 .../tablestore/source/TableStoreDBSource.java      | 102 ++++++++++++
 .../source/TableStoreDBSourceReader.java           | 175 +++++++++++++++++++++
 .../tablestore/source/TableStoreDBSourceSplit.java |  38 +++++
 .../source/TableStoreDBSourceSplitEnumerator.java  | 166 +++++++++++++++++++
 .../tablestore/source/TableStoreDBSourceState.java |  34 ++++
 .../source/TableStoreDbSourceFactory.java          |  64 ++++++++
 .../tablestore/source/TableStoreProcessor.java     |  95 +++++++++++
 12 files changed, 860 insertions(+)

diff --git a/docs/en/connector-v2/source/Tablestore.md 
b/docs/en/connector-v2/source/Tablestore.md
new file mode 100644
index 0000000000..8e0d1aeebc
--- /dev/null
+++ b/docs/en/connector-v2/source/Tablestore.md
@@ -0,0 +1,102 @@
+# Tablestore
+
+> Tablestore source connector
+
+## Description
+
+Read data from Alicloud Tablestore,support full and CDC.
+
+
+## Key features
+
+- [ ] [batch](../../concept/connector-v2-features.md)
+- [X] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                  | type   | required | default value |
+|-----------------------|--------|----------|---------------|
+| end_point             | string | yes      | -             |
+| instance_name         | string | yes      | -             |
+| access_key_id         | string | yes      | -             |
+| access_key_secret     | string | yes      | -             |
+| table                 | string | yes      | -             |
+| primary_keys          | array  | yes      | -             |
+| schema                | config | yes      | -             |
+
+
+### end_point [string]
+
+The endpoint of Tablestore.
+
+### instance_name [string]
+
+The intance name of Tablestore.
+
+### access_key_id [string]
+
+The access id of Tablestore.
+
+### access_key_secret [string]
+
+The access secret of Tablestore.
+
+### table [string]
+
+The table name of Tablestore.
+
+### primary_keys [array]
+
+The primarky key of table,just add a unique primary key.
+
+### schema [Config]
+
+
+
+## Example
+
+```bash
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Tablestore {
+    end_point = "https://****.cn-zhangjiakou.tablestore.aliyuncs.com";
+    instance_name = "****"
+    access_key_id="***************2Ag5"
+    access_key_secret="***********2Dok"
+    table="test"
+    primary_keys=["id"]
+    schema={
+        fields {
+            id = string
+            name = string
+        }
+    }
+  }
+}
+
+
+sink {
+  MongoDB{
+    uri = "mongodb://localhost:27017"
+    database = "test"
+    collection = "test"
+    primary-key = ["id"]
+    schema = {
+      fields {
+        id = string
+        name = string
+      }
+    }
+  }
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index d77b70e5e8..ece3bd0c77 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -85,6 +85,7 @@ seatunnel.sink.InfluxDB = connector-influxdb
 seatunnel.source.GoogleSheets = connector-google-sheets
 seatunnel.sink.GoogleFirestore = connector-google-firestore
 seatunnel.sink.Tablestore = connector-tablestore
+seatunnel.source.Tablestore = connector-tablestore
 seatunnel.source.Lemlist = connector-http-lemlist
 seatunnel.source.Klaviyo = connector-http-klaviyo
 seatunnel.sink.Slack = connector-slack
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
index 7b2aa6bae6..be12181893 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
@@ -19,11 +19,14 @@ package 
org.apache.seatunnel.connectors.seatunnel.tablestore.config;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
 
@@ -45,6 +48,8 @@ public class TablestoreOptions implements Serializable {
 
     public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());
 
+    public TablestoreOptions() {}
+
     public TablestoreOptions(Config config) {
         this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
         this.instanceName = 
config.getString(TablestoreConfig.INSTANCE_NAME.key());
@@ -57,4 +62,18 @@ public class TablestoreOptions implements Serializable {
             this.batchSize = config.getInt(BATCH_SIZE.key());
         }
     }
+
+    public static TablestoreOptions of(ReadonlyConfig config) {
+        Map<String, Object> map = config.getSourceMap();
+        TablestoreOptions tablestoreOptions = new TablestoreOptions();
+        tablestoreOptions.setEndpoint(config.get(TablestoreConfig.END_POINT));
+        
tablestoreOptions.setInstanceName(config.get(TablestoreConfig.INSTANCE_NAME));
+        
tablestoreOptions.setAccessKeyId(config.get(TablestoreConfig.ACCESS_KEY_ID));
+        
tablestoreOptions.setAccessKeySecret(config.get(TablestoreConfig.ACCESS_KEY_SECRET));
+        tablestoreOptions.setTable(config.get(TablestoreConfig.TABLE));
+        List<String> keys = (List<String>) 
map.get(TablestoreConfig.PRIMARY_KEYS.key());
+
+        tablestoreOptions.setPrimaryKeys(keys);
+        return tablestoreOptions;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowDeserializer.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowDeserializer.java
new file mode 100644
index 0000000000..9bdb060a49
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowDeserializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import com.alicloud.openservices.tablestore.model.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer {
+
+    @Override
+    public SeaTunnelRow deserialize(StreamRecord r) {
+        List<Object> fields = new ArrayList<>();
+        r.getColumns()
+                .forEach(
+                        k -> {
+                            fields.add(k.getColumn().getValue());
+                        });
+        return new SeaTunnelRow(fields.toArray());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowDeserializer.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowDeserializer.java
new file mode 100644
index 0000000000..44a2560693
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowDeserializer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import com.alicloud.openservices.tablestore.model.StreamRecord;
+
+public interface SeaTunnelRowDeserializer {
+
+    SeaTunnelRow deserialize(StreamRecord streamRecord);
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
new file mode 100644
index 0000000000..85c0062ed3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceReader.Context;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.JobMode;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@Slf4j
+public class TableStoreDBSource
+        implements SeaTunnelSource<SeaTunnelRow, TableStoreDBSourceSplit, 
TableStoreDBSourceState>,
+                SupportParallelism,
+                SupportColumnProjection {
+
+    private TablestoreOptions tablestoreOptions;
+    private SeaTunnelRowType typeInfo;
+    private JobContext jobContext;
+
+    @Override
+    public String getPluginName() {
+        return "Tablestore";
+    }
+
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return SeaTunnelSource.super.getProducedCatalogTables();
+    }
+
+    public TableStoreDBSource(ReadonlyConfig config) {
+        this.tablestoreOptions = TablestoreOptions.of(config);
+        CatalogTableUtil.buildWithConfig(config);
+        this.typeInfo = 
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return JobMode.BATCH.equals(jobContext.getJobMode())
+                ? Boundedness.BOUNDED
+                : Boundedness.UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, TableStoreDBSourceSplit> 
createReader(Context readerContext)
+            throws Exception {
+        return new TableStoreDBSourceReader(readerContext, tablestoreOptions, 
typeInfo);
+    }
+
+    @Override
+    public SourceSplitEnumerator<TableStoreDBSourceSplit, 
TableStoreDBSourceState> createEnumerator(
+            
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<TableStoreDBSourceSplit>
+                    enumeratorContext)
+            throws Exception {
+        return new TableStoreDBSourceSplitEnumerator(enumeratorContext, 
tablestoreOptions);
+    }
+
+    @Override
+    public SourceSplitEnumerator<TableStoreDBSourceSplit, 
TableStoreDBSourceState>
+            restoreEnumerator(
+                    
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<
+                                    TableStoreDBSourceSplit>
+                            enumeratorContext,
+                    TableStoreDBSourceState checkpointState)
+                    throws Exception {
+        return new TableStoreDBSourceSplitEnumerator(
+                enumeratorContext, tablestoreOptions, checkpointState);
+    }
+
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
new file mode 100644
index 0000000000..eefd4aae03
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
@@ -0,0 +1,175 @@
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import com.alicloud.openservices.tablestore.SyncClient;
+import com.alicloud.openservices.tablestore.TunnelClient;
+import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
+import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
+import com.alicloud.openservices.tablestore.model.tunnel.DeleteTunnelRequest;
+import com.alicloud.openservices.tablestore.model.tunnel.DeleteTunnelResponse;
+import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelRequest;
+import 
com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelResponse;
+import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
+import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
+import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+@Slf4j
+public class TableStoreDBSourceReader
+        implements SourceReader<SeaTunnelRow, TableStoreDBSourceSplit> {
+
+    protected SourceReader.Context context;
+    protected TablestoreOptions tablestoreOptions;
+    protected SeaTunnelRowType seaTunnelRowType;
+    Queue<TableStoreDBSourceSplit> pendingSplits = new 
ConcurrentLinkedDeque<>();
+    private SyncClient client;
+    private volatile boolean noMoreSplit;
+    private TunnelClient tunnelClient;
+
+    public TableStoreDBSourceReader(
+            SourceReader.Context context,
+            TablestoreOptions options,
+            SeaTunnelRowType seaTunnelRowType) {
+
+        this.context = context;
+        this.tablestoreOptions = options;
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public void open() throws Exception {
+        client =
+                new SyncClient(
+                        tablestoreOptions.getEndpoint(),
+                        tablestoreOptions.getAccessKeyId(),
+                        tablestoreOptions.getAccessKeySecret(),
+                        tablestoreOptions.getInstanceName());
+        tunnelClient =
+                new TunnelClient(
+                        tablestoreOptions.getEndpoint(),
+                        tablestoreOptions.getAccessKeyId(),
+                        tablestoreOptions.getAccessKeySecret(),
+                        tablestoreOptions.getInstanceName());
+    }
+
+    @Override
+    public void close() throws IOException {
+        tunnelClient.shutdown();
+        client.shutdown();
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        synchronized (output.getCheckpointLock()) {
+            TableStoreDBSourceSplit split = pendingSplits.poll();
+            if (Objects.nonNull(split)) {
+                read(split, output);
+            }
+            /*if (split == null) {
+                log.info(
+                        "TableStore Source Reader [{}] waiting for splits",
+                        context.getIndexOfSubtask());
+            }*/
+            if (noMoreSplit) {
+                // signal to the source that we have reached the end of the 
data.
+                log.info("Closed the bounded tablestore source");
+                context.signalNoMoreElement();
+                Thread.sleep(2000L);
+            } else {
+                Thread.sleep(1000L);
+            }
+        }
+    }
+
+    private void read(TableStoreDBSourceSplit split, Collector<SeaTunnelRow> 
output) {
+        String tunnelId = getTunel(split);
+        TableStoreProcessor processor =
+                new TableStoreProcessor(split.getTableName(), 
split.getPrimaryKey(), output);
+        TunnelWorkerConfig workerConfig = new TunnelWorkerConfig(processor);
+        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, 
workerConfig);
+        try {
+            worker.connectAndWorking();
+        } catch (Exception e) {
+            log.error("Start OTS tunnel failed.", e);
+            worker.shutdown();
+        }
+    }
+
+    public String getTunel(TableStoreDBSourceSplit split) {
+        deleteTunel(split);
+        String tunnelId = null;
+        String tunnelName = split.getTableName() + "_migration2aws_tunnel4" + 
split.getSplitId();
+
+        try {
+            DescribeTunnelRequest drequest = new DescribeTunnelRequest("test", 
tunnelName);
+            DescribeTunnelResponse dresp = 
tunnelClient.describeTunnel(drequest);
+            tunnelId = dresp.getTunnelInfo().getTunnelId();
+        } catch (Exception be) {
+            CreateTunnelRequest crequest =
+                    new CreateTunnelRequest(
+                            split.getTableName(), tunnelName, 
TunnelType.valueOf("BaseAndStream"));
+            CreateTunnelResponse cresp = tunnelClient.createTunnel(crequest);
+            tunnelId = cresp.getTunnelId();
+        }
+        log.info("Tunnel found, Id: " + tunnelId);
+        return tunnelId;
+    }
+
+    public void deleteTunel(TableStoreDBSourceSplit split) {
+        String tunnelName = split.getTableName() + "_migration2aws_tunnel4" + 
split.getSplitId();
+        try {
+            DeleteTunnelRequest drequest =
+                    new DeleteTunnelRequest(split.getTableName(), tunnelName);
+            DeleteTunnelResponse dresp = tunnelClient.deleteTunnel(drequest);
+            log.info("Tunnel has been deleted: " + dresp.toString());
+        } catch (Exception be) {
+            log.warn("Tunnel deletion failed due to not found: " + tunnelName);
+        }
+    }
+
+    @Override
+    public List<TableStoreDBSourceSplit> snapshotState(long checkpointId) 
throws Exception {
+        return new ArrayList<>(pendingSplits);
+    }
+
+    @Override
+    public void addSplits(List<TableStoreDBSourceSplit> splits) {
+        this.pendingSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        log.info("Reader [{}] received noMoreSplit event.", 
context.getIndexOfSubtask());
+        noMoreSplit = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
new file mode 100644
index 0000000000..24328b0a6f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@AllArgsConstructor
+@Getter
+@Setter
+public class TableStoreDBSourceSplit implements SourceSplit {
+
+    private Integer splitId;
+    private String tableName;
+    private String primaryKey;
+
+    @Override
+    public String splitId() {
+        return splitId.toString();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
new file mode 100644
index 0000000000..3dd58b7e69
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class TableStoreDBSourceSplitEnumerator
+        implements SourceSplitEnumerator<TableStoreDBSourceSplit, 
TableStoreDBSourceState> {
+
+    private final SourceSplitEnumerator.Context<TableStoreDBSourceSplit> 
enumeratorContext;
+    private final Map<Integer, List<TableStoreDBSourceSplit>> pendingSplits;
+    private final TablestoreOptions tablestoreOptions;
+
+    private final Object stateLock = new Object();
+    private volatile boolean shouldEnumerate;
+
+    /**
+     * @param enumeratorContext
+     * @param tablestoreOptions
+     */
+    public TableStoreDBSourceSplitEnumerator(
+            Context<TableStoreDBSourceSplit> enumeratorContext,
+            TablestoreOptions tablestoreOptions) {
+        this(enumeratorContext, tablestoreOptions, null);
+    }
+
+    public TableStoreDBSourceSplitEnumerator(
+            Context<TableStoreDBSourceSplit> enumeratorContext,
+            TablestoreOptions tablestoreOptions,
+            TableStoreDBSourceState sourceState) {
+        this.enumeratorContext = enumeratorContext;
+        this.tablestoreOptions = tablestoreOptions;
+        this.pendingSplits = new HashMap<>();
+        this.shouldEnumerate = sourceState == null;
+        if (sourceState != null) {
+            this.shouldEnumerate = sourceState.isShouldEnumerate();
+            this.pendingSplits.putAll(sourceState.getPendingSplits());
+        }
+    }
+
+    @Override
+    public void open() {}
+
+    @Override
+    public void run() throws Exception {
+        Set<Integer> readers = enumeratorContext.registeredReaders();
+        if (shouldEnumerate) {
+            Set<TableStoreDBSourceSplit> newSplits = 
getTableStoreDBSourceSplit();
+            synchronized (stateLock) {
+                addPendingSplit(newSplits);
+                shouldEnumerate = false;
+            }
+            assignSplit(readers);
+        }
+    }
+
+    private void assignSplit(Set<Integer> readers) {
+        for (int reader : readers) {
+            List<TableStoreDBSourceSplit> assignmentForReader = 
pendingSplits.remove(reader);
+            if (assignmentForReader != null && !assignmentForReader.isEmpty()) 
{
+                log.info("Assign splits {} to reader {}", assignmentForReader, 
reader);
+                try {
+                    enumeratorContext.assignSplit(reader, assignmentForReader);
+                } catch (Exception e) {
+                    log.error(
+                            "Failed to assign splits {} to reader {}",
+                            assignmentForReader,
+                            reader,
+                            e);
+                    pendingSplits.put(reader, assignmentForReader);
+                }
+            }
+        }
+    }
+
+    private Set<TableStoreDBSourceSplit> getTableStoreDBSourceSplit() {
+
+        Set<TableStoreDBSourceSplit> allSplit = new HashSet<>();
+        String tables = tablestoreOptions.getTable();
+        String[] tableArr = tables.split(",");
+        for (int i = 0; i < tableArr.length; i++) {
+            allSplit.add(
+                    new TableStoreDBSourceSplit(
+                            i, tableArr[i], 
tablestoreOptions.getPrimaryKeys().get(i)));
+        }
+        return allSplit;
+    }
+
+    private void addPendingSplit(Collection<TableStoreDBSourceSplit> splits) {
+        int readerCount = enumeratorContext.currentParallelism();
+        for (TableStoreDBSourceSplit split : splits) {
+            int ownerReader = split.getSplitId() % readerCount;
+            pendingSplits.computeIfAbsent(ownerReader, k -> new 
ArrayList<>()).add(split);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        // TODO Auto-generated method stub
+        throw new UnsupportedOperationException("Unimplemented method 
'close'");
+    }
+
+    @Override
+    public void addSplitsBack(List<TableStoreDBSourceSplit> splits, int 
subtaskId) {
+        log.debug("Add back splits {} to tablestore.", splits);
+        if (!splits.isEmpty()) {
+            addPendingSplit(splits);
+            assignSplit(Collections.singleton(subtaskId));
+            enumeratorContext.signalNoMoreSplits(subtaskId);
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplits.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {}
+
+    @Override
+    public void registerReader(int subtaskId) {
+        log.debug("Register reader {} to TablestoreSplitEnumerator.", 
subtaskId);
+        if (!pendingSplits.isEmpty()) {
+            assignSplit(Collections.singleton(subtaskId));
+        }
+    }
+
+    @Override
+    public TableStoreDBSourceState snapshotState(long checkpointId) throws 
Exception {
+        synchronized (stateLock) {
+            return new TableStoreDBSourceState(shouldEnumerate, pendingSplits);
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
new file mode 100644
index 0000000000..05a73a6310
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Setter
+@AllArgsConstructor
+public class TableStoreDBSourceState implements Serializable {
+
+    private boolean shouldEnumerate;
+    private Map<Integer, List<TableStoreDBSourceSplit>> pendingSplits;
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
new file mode 100644
index 0000000000..f93ae4bfe3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class TableStoreDbSourceFactory implements TableSourceFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "Tablestore";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(
+                        TablestoreConfig.END_POINT,
+                        TablestoreConfig.INSTANCE_NAME,
+                        TablestoreConfig.ACCESS_KEY_ID,
+                        TablestoreConfig.ACCESS_KEY_SECRET,
+                        TablestoreConfig.TABLE,
+                        TablestoreConfig.PRIMARY_KEYS)
+                .build();
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>) new 
TableStoreDBSource(context.getOptions());
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return TableStoreDBSource.class;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreProcessor.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreProcessor.java
new file mode 100644
index 0000000000..ba5334a85e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreProcessor.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowDeserializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alicloud.openservices.tablestore.model.StreamRecord;
+import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
+import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TableStoreProcessor implements IChannelProcessor {
+    private String tableName = null;
+    private String primaryKey = null;
+    private Collector<SeaTunnelRow> output = null;
+    protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
+    private static final Logger log = 
LoggerFactory.getLogger(TableStoreProcessor.class);
+
+    public TableStoreProcessor(
+            String tableName, String primaryKey, Collector<SeaTunnelRow> 
output) {
+        this.tableName = tableName;
+        this.primaryKey = primaryKey;
+        this.output = output;
+    }
+
+    @Override
+    public void process(ProcessRecordsInput input) {
+        log.info("Default record processor, would print records count");
+
+        log.info(
+                String.format(
+                        "Process %d records, NextToken: %s",
+                        input.getRecords().size(), input.getNextToken()));
+
+        for (StreamRecord r : input.getRecords()) {
+            try {
+                List<Object> fields = new ArrayList<>();
+                Arrays.stream(r.getPrimaryKey().getPrimaryKeyColumns())
+                        .forEach(
+                                k -> {
+                                    fields.add(k.getValue().toString());
+                                });
+                r.getColumns()
+                        .forEach(
+                                k -> {
+                                    
fields.add(k.getColumn().getValue().toString());
+                                });
+                SeaTunnelRow row = new SeaTunnelRow(fields.toArray());
+                row.setTableId(tableName);
+                switch ((r.getRecordType())) {
+                    case PUT:
+                        row.setRowKind(RowKind.INSERT);
+                        break;
+                    case UPDATE:
+                        row.setRowKind(RowKind.UPDATE_AFTER);
+                        break;
+                    case DELETE:
+                        row.setRowKind(RowKind.DELETE);
+                        break;
+                }
+                output.collect(row);
+            } catch (Exception e) {
+                log.error("send to target failed with record: " + 
r.toString(), e);
+            }
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        log.info("process shutdown du to finished for table: " + tableName);
+    }
+}


Reply via email to