This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7b78d6c92 [Connectors-V2]Support IoTDB Source (#2431)
7b78d6c92 is described below

commit 7b78d6c9228e438dab0ac62539ab642c0ca11e41
Author: Kirs <[email protected]>
AuthorDate: Fri Aug 19 12:04:46 2022 +0800

    [Connectors-V2]Support IoTDB Source (#2431)
    
    * [Connectors-V2]Support IoTDB Source
    
    * fix license header
    
    * fix java doc
    
    * fix check style
    
    * improve read method
    
    * used by SeatunelSchema get source field type
    
    * fetch upstream
    
    * reformat docs
---
 docs/en/connector-v2/source/IoTDB.md               | 137 ++++++++++++++
 plugin-mapping.properties                          |   1 +
 .../seatunnel/iotdb/config/SourceConfig.java       | 102 ++++++++++
 .../seatunnel/iotdb/constant/SourceConstants.java  |  32 ++++
 .../seatunnel/iotdb/source/IoTDBSource.java        | 101 ++++++++++
 .../seatunnel/iotdb/source/IoTDBSourceReader.java  | 205 +++++++++++++++++++++
 .../seatunnel/iotdb/source/IoTDBSourceSplit.java   |  46 +++++
 .../iotdb/source/IoTDBSourceSplitEnumerator.java   | 184 ++++++++++++++++++
 .../seatunnel/iotdb/state/IoTDBSourceState.java    |  36 ++++
 9 files changed, 844 insertions(+)

diff --git a/docs/en/connector-v2/source/IoTDB.md 
b/docs/en/connector-v2/source/IoTDB.md
new file mode 100644
index 000000000..cd241e420
--- /dev/null
+++ b/docs/en/connector-v2/source/IoTDB.md
@@ -0,0 +1,137 @@
+# IoTDB
+
+> IoTDB source connector
+
+## Description
+
+Read external data source data through IoTDB. Currently supports Batch mode.
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| host                       | string  | yes      | -             |
+| port                       | Int     | yes      | -             |
+| node_urls                  | string  | yes      | -             |
+| sql                        | string  | yes      |          |
+| fields                     | config  | yes      | -             |
+| fetch_size                 | int     | no       | -             |
+| username                   | string  | no       | -             |
+| password   | string  | no       | -             |
+| lower_bound                | long    | no       | -             |
+| upper_bound                | long    | no       | -             |
+| num_partitions             | int     | no       | -             |
+| thrift_default_buffer_size | int     | no       | -             |
+| enable_cache_leader        | boolean | no       | -             |
+| version                    | string  | no       | -             |
+
+### single node, you need to set host and port to connect to the remote data 
source.
+
+**host** [string] the host of the IoTDB when you select host of the IoTDB
+
+**port** [int] the port of the IoTDB when you select
+
+### multi node, you need to set node_urls to connect to the remote data source.
+
+**node_urls** [string] the node_urls of the IoTDB when you select
+
+e.g.
+
+``` 127.0.0.1:8080,127.0.0.2:8080
+```
+
+### other parameters
+
+**sql** [string]
+execute sql statement e.g.
+
+```
+select name,age from test
+```
+
+### fields [string]
+
+the fields of the IoTDB when you select
+
+the field type is SeaTunnel field type 
`org.apache.seatunnel.api.table.type.SqlType`
+
+e.g.
+
+```
+fields{
+    name=STRING
+    age=INT
+    }
+```
+
+### option parameters
+
+### fetch_size [int]
+
+the fetch_size of the IoTDB when you select
+
+### username [string]
+
+the username of the IoTDB when you select
+
+### password [string]
+
+the password of the IoTDB when you select
+
+### lower_bound [long]
+
+the lower_bound of the IoTDB when you select
+
+### upper_bound [long]
+
+the upper_bound of the IoTDB when you select
+
+### num_partitions [int]
+
+the num_partitions of the IoTDB when you select
+
+### thrift_default_buffer_size [int]
+
+the thrift_default_buffer_size of the IoTDB when you select
+
+### enable_cache_leader [boolean]
+
+enable_cache_leader of the IoTDB when you select
+
+### version [string]
+
+Version represents the SQL semantic version used by the client, which is used 
to be compatible with the SQL semantics of
+0.12 when upgrading 0.13. The possible values are: V_0_12, V_0_13.
+
+### split partitions
+
+we can split the partitions of the IoTDB and we used time column split
+
+#### num_partitions [int]
+
+split num
+
+### upper_bound [long]
+
+upper bound of the time column
+
+### lower_bound [long]
+
+lower bound of the time column
+
+```
+     split the time range into numPartitions parts
+     if numPartitions is 1, use the whole time range
+     if numPartitions < (upper_bound - lower_bound), use (upper_bound - 
lower_bound) partitions
+     
+     eg: lower_bound = 1, upper_bound = 10, numPartitions = 2
+     sql = "select * from test where age > 0 and age < 10"
+     
+     split result
+
+     split 1: select * from test  where (time >= 1 and time < 6)  and (  age > 
0 and age < 10 )
+     
+     split 2: select * from test  where (time >= 6 and time < 11) and (  age > 
0 and age < 10 )
+
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 28bbf5041..9d6983871 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -115,4 +115,5 @@ seatunnel.source.Pulsar = connector-pulsar
 seatunnel.source.Hudi = connector-hudi
 seatunnel.sink.DingTalk = connector-dingtalk
 seatunnel.sink.elasticsearch = connector-elasticsearch
+seatunnel.source.IoTDB = connector-iotdb
 seatunnel.sink.IoTDB = connector-iotdb
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
new file mode 100644
index 000000000..5c952b416
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
+
+/**
+ * SourceConfig is the configuration for the IotDBSource.
+ * <p>
+ * please see the following link for more details:
+ * 
https://iotdb.apache.org/UserGuide/Master/API/Programming-Java-Native-API.html
+ */
+public class SourceConfig {
+
+    public static final String SQL = "sql";
+
+    /*---------------------- single node configurations 
-------------------------*/
+
+    /**
+     * The host of the IotDB server.
+     */
+    public static final String HOST = "host";
+
+    /*
+     * The port of the IotDB server.
+     */
+    public static final String PORT = "port";
+
+
+    /*---------------------- multiple node configurations 
-------------------------*/
+
+    /**
+     * Username for the source.
+     */
+    public static final String USERNAME = "username";
+
+    /**
+     * Password for the source.
+     */
+    public static final String PASSWORD = "password";
+
+    /**
+     * multiple nodes
+     */
+    public static final String NODE_URLS = "node_urls";
+
+    /*---------------------- other configurations -------------------------*/
+
+    /**
+     * Fetches the next batch of data from the source.
+     */
+    public static final String FETCH_SIZE = "fetch_size";
+
+    /**
+     * thrift default buffer size
+     */
+    public static final String THRIFT_DEFAULT_BUFFER_SIZE = 
"thrift_default_buffer_size";
+
+    /**
+     * thrift max frame size
+     */
+    public static final String THRIFT_MAX_FRAME_SIZE = "thrift_max_frame_size";
+
+    /**
+     * cassandra default buffer size
+     */
+    public static final String ENABLE_CACHE_LEADER = "enable_cache_leader";
+
+    /**
+     * Version represents the SQL semantic version used by the client, which 
is used to be compatible with the SQL semantics of 0.12 when upgrading 0.13. 
The possible values are: V_0_12, V_0_13.
+     */
+    public static final String VERSION = "version";
+
+    /**
+     * Query lower bound of the time range to be read.
+     */
+    public static final String LOWER_BOUND = "lower_bound";
+
+    /**
+     * Query upper bound of the time range to be read.
+     */
+    public static final String UPPER_BOUND = "upper_bound";
+
+    /**
+     * Query num partitions to be read.
+     */
+    public static final String NUM_PARTITIONS = "num_partitions";
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/constant/SourceConstants.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/constant/SourceConstants.java
new file mode 100644
index 000000000..25b657cfa
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/constant/SourceConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.constant;
+
+public class SourceConstants {
+
+    public static final String FIELDS_K_V_SPLIT = ":";
+
+    public static final String FIELDS_SPLIT = ",";
+
+    public static final String NODES_SPLIT = ",";
+
+    public static final String SQL_WHERE = "where";
+
+    public static final String DEFAULT_PARTITIONS = "0";
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
new file mode 100644
index 000000000..987c43c84
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NODE_URLS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.iotdb.state.IoTDBSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@AutoService(SeaTunnelSource.class)
+public class IoTDBSource implements SeaTunnelSource<SeaTunnelRow, 
IoTDBSourceSplit, IoTDBSourceState> {
+
+    private SeaTunnelContext seaTunnelContext;
+
+    private SeaTunnelRowType typeInfo;
+
+    private Map<String, Object> configParams = new HashMap();
+
+    @Override
+    public String getPluginName() {
+        return "IoTDB";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HOST, PORT);
+        if (!result.isSuccess()) {
+            result = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS);
+
+            if (!result.isSuccess()) {
+                throw new PrepareFailException(getPluginName(), 
PluginType.SOURCE, "host and port and node urls are both empty");
+            }
+        }
+        SeatunnelSchema seatunnelSchema = 
SeatunnelSchema.buildWithConfig(pluginConfig);
+        this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
+        pluginConfig.entrySet().forEach(entry -> 
configParams.put(entry.getKey(), entry.getValue().unwrapped()));
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.typeInfo;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, IoTDBSourceSplit> 
createReader(SourceReader.Context readerContext) {
+        return new IoTDBSourceReader(configParams, readerContext, typeInfo);
+    }
+
+    @Override
+    public SourceSplitEnumerator<IoTDBSourceSplit, IoTDBSourceState> 
createEnumerator(SourceSplitEnumerator.Context<IoTDBSourceSplit> 
enumeratorContext) throws Exception {
+        return new IoTDBSourceSplitEnumerator(enumeratorContext, configParams);
+    }
+
+    @Override
+    public SourceSplitEnumerator<IoTDBSourceSplit, IoTDBSourceState> 
restoreEnumerator(SourceSplitEnumerator.Context<IoTDBSourceSplit> 
enumeratorContext, IoTDBSourceState checkpointState) throws Exception {
+        return new IoTDBSourceSplitEnumerator(enumeratorContext, 
checkpointState, configParams);
+    }
+
+}
+
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
new file mode 100644
index 000000000..2fcac5ff4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.ENABLE_CACHE_LEADER;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.FETCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NODE_URLS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_DEFAULT_BUFFER_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_MAX_FRAME_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.VERSION;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.NODES_SPLIT;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.session.util.Version;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+public class IoTDBSourceReader implements SourceReader<SeaTunnelRow, 
IoTDBSourceSplit> {
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private Map<String, Object> conf;
+
+    private Set<IoTDBSourceSplit> sourceSplits;
+
+    private final SourceReader.Context context;
+
+    private SeaTunnelRowType seaTunnelRowType;
+
+    private Session session;
+
+    public IoTDBSourceReader(Map<String, Object> conf, SourceReader.Context 
readerContext, SeaTunnelRowType seaTunnelRowType) {
+        this.conf = conf;
+        this.sourceSplits = new HashSet<>();
+        this.context = readerContext;
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public void open() throws IoTDBConnectionException {
+        session = buildSession(conf);
+        session.open();
+    }
+
+    @Override
+    public void close() throws IOException {
+        //nothing to do
+        try {
+            session.close();
+        } catch (IoTDBConnectionException e) {
+            throw new IOException("close IoTDB session failed", e);
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        sourceSplits.forEach(source -> {
+            try {
+                read(source, output);
+            } catch (Exception e) {
+                throw new RuntimeException("IotDB source read error", e);
+            }
+        });
+
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded fake source");
+            context.signalNoMoreElement();
+        }
+    }
+
+    private void read(IoTDBSourceSplit split, Collector<SeaTunnelRow> output) 
throws Exception {
+        try (SessionDataSet dataSet = 
session.executeQueryStatement(split.getQuery())) {
+            while (dataSet.hasNext()) {
+                RowRecord row = dataSet.next();
+                Object[] datas = new Object[row.getFields().size()];
+                for (int i = 0; i < row.getFields().size(); i++) {
+                    row.getFields().get(i).getDataType();
+                    datas[i] = convertToDataType(row.getFields().get(i));
+                }
+                output.collect(new SeaTunnelRow(datas));
+            }
+        }
+    }
+
+    private Object convertToDataType(Field field) {
+
+        switch (field.getDataType()) {
+            case INT32:
+                return field.getIntV();
+            case INT64:
+                return field.getLongV();
+            case FLOAT:
+                return field.getFloatV();
+            case DOUBLE:
+                return field.getDoubleV();
+            case TEXT:
+                return field.getStringValue();
+            case BOOLEAN:
+                return field.getBoolV();
+            default:
+                throw new IllegalArgumentException("unknown TSData type: " + 
field.getDataType());
+        }
+    }
+
+    private Session buildSession(Map<String, Object> conf) {
+        Session.Builder sessionBuilder = new Session.Builder();
+        if (conf.containsKey(HOST)) {
+            sessionBuilder
+                    .host((String) conf.get(HOST))
+                    .port(Integer.parseInt(conf.get(PORT).toString()))
+                    .build();
+        } else {
+            String nodeUrlsString = (String) conf.get(NODE_URLS);
+
+            List<String> nodes = 
Stream.of(nodeUrlsString.split(NODES_SPLIT)).collect(Collectors.toList());
+            sessionBuilder.nodeUrls(nodes);
+        }
+        if (null != conf.get(FETCH_SIZE)) {
+            
sessionBuilder.fetchSize(Integer.parseInt(conf.get(FETCH_SIZE).toString()));
+        }
+        if (null != conf.get(USERNAME)) {
+            sessionBuilder.username((String) conf.get(USERNAME));
+        }
+        if (null != conf.get(PASSWORD)) {
+            sessionBuilder.password((String) conf.get(PASSWORD));
+        }
+        if (null != conf.get(THRIFT_DEFAULT_BUFFER_SIZE)) {
+            
sessionBuilder.thriftDefaultBufferSize(Integer.parseInt(conf.get(THRIFT_DEFAULT_BUFFER_SIZE).toString()));
+        }
+        if (null != conf.get(THRIFT_MAX_FRAME_SIZE)) {
+            
sessionBuilder.thriftMaxFrameSize(Integer.parseInt(conf.get(THRIFT_MAX_FRAME_SIZE).toString()));
+        }
+        if (null != conf.get(ENABLE_CACHE_LEADER)) {
+            
sessionBuilder.enableCacheLeader(Boolean.parseBoolean(conf.get(ENABLE_CACHE_LEADER).toString()));
+        }
+        if (null != conf.get(VERSION)) {
+            Version version = Version.valueOf(conf.get(VERSION).toString());
+            sessionBuilder.version(version);
+        }
+        return sessionBuilder.build();
+    }
+
+    @Override
+    public List<IoTDBSourceSplit> snapshotState(long checkpointId) {
+        return new ArrayList<>(sourceSplits);
+    }
+
+    @Override
+    public void addSplits(List<IoTDBSourceSplit> splits) {
+        sourceSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        // do nothing
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // do nothing
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java
new file mode 100644
index 000000000..513318054
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class IoTDBSourceSplit implements SourceSplit {
+
+    private static final long serialVersionUID = -1L;
+
+    private String splitId;
+
+    /**
+     * final query statement
+     */
+    private String query;
+
+    @Override
+    public String splitId() {
+        return splitId;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    public IoTDBSourceSplit(String splitId, String query) {
+        this.splitId = splitId;
+        this.query = query;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
new file mode 100644
index 000000000..01852a488
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.LOWER_BOUND;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NUM_PARTITIONS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.SQL;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.UPPER_BOUND;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.DEFAULT_PARTITIONS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.SQL_WHERE;
+import static 
org.apache.iotdb.tsfile.common.constant.QueryConstant.RESERVED_TIME;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.iotdb.state.IoTDBSourceState;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class IoTDBSourceSplitEnumerator implements 
SourceSplitEnumerator<IoTDBSourceSplit, IoTDBSourceState> {
+
+    private final Context<IoTDBSourceSplit> context;
+    private Set<IoTDBSourceSplit> pendingSplit;
+    private Set<IoTDBSourceSplit> assignedSplit;
+    private Map<String, Object> conf;
+
+    /**
+     * A SQL statement can contain at most one where
+     * We split the SQL using the where keyword
+     * Therefore, it can be split into two SQL at most
+     */
+    private static final int SQL_WHERE_SPLIT_LENGTH = 2;
+
+    public 
IoTDBSourceSplitEnumerator(SourceSplitEnumerator.Context<IoTDBSourceSplit> 
context, Map<String, Object> conf) {
+        this.context = context;
+        this.conf = conf;
+    }
+
+    public 
IoTDBSourceSplitEnumerator(SourceSplitEnumerator.Context<IoTDBSourceSplit> 
context, IoTDBSourceState sourceState, Map<String, Object> conf) {
+        this(context, conf);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    @Override
+    public void open() {
+        this.assignedSplit = new HashSet<>();
+        this.pendingSplit = new HashSet<>();
+    }
+
+    @Override
+    public void run() {
+        pendingSplit = getIotDBSplit();
+        assignSplit(context.registeredReaders());
+    }
+
+    /**
+     * split the time range into numPartitions parts
+     * if numPartitions is 1, use the whole time range
+     * if numPartitions < (end - start), use (start-end) partitions
+     * <p>
+     * eg: start = 1, end = 10, numPartitions = 2
+     * sql = "select * from test where age > 0 and age < 10"
+     * <p>
+     * split result
+     * <p>
+     * split 1: select * from test  where (time >= 1 and time < 6)  and (  age 
> 0 and age < 10 )
+     * <p>
+     * split 2: select * from test  where (time >= 6 and time < 11) and (  age 
> 0 and age < 10 )
+     */
+    private Set<IoTDBSourceSplit> getIotDBSplit() {
+        String sql = conf.get(SQL).toString();
+        Set<IoTDBSourceSplit> iotDBSourceSplits = new HashSet<>();
+        // no need numPartitions, use one partition
+        if (!conf.containsKey(NUM_PARTITIONS)) {
+            iotDBSourceSplits.add(new IoTDBSourceSplit(DEFAULT_PARTITIONS, 
sql));
+            return iotDBSourceSplits;
+        }
+        long start = Long.parseLong(conf.get(LOWER_BOUND).toString());
+        long end = Long.parseLong(conf.get(UPPER_BOUND).toString());
+        int numPartitions = 
Integer.parseInt(conf.get(NUM_PARTITIONS).toString());
+        String[] sqls = sql.split(SQL_WHERE);
+        if (sqls.length > SQL_WHERE_SPLIT_LENGTH) {
+            throw new IllegalArgumentException("sql should not contain more 
than one where");
+        }
+        int size = (int) (end - start) / numPartitions + 1;
+        int remainder = (int) ((end + 1 - start) % numPartitions);
+        if (end - start < numPartitions) {
+            numPartitions = (int) (end - start);
+        }
+        long currentStart = start;
+        int i = 0;
+        while (i < numPartitions) {
+            String query = " where (" + RESERVED_TIME + " >= " + currentStart 
+ " and " + RESERVED_TIME + " < " + (currentStart + size) + ") ";
+            i++;
+            currentStart += size;
+            if (i + 1 <= numPartitions) {
+                currentStart = currentStart - remainder;
+            }
+            query = sqls[0] + query;
+            if (sqls.length > 1) {
+                query = query + " and ( " + sqls[1] + " ) ";
+            }
+            iotDBSourceSplits.add(new IoTDBSourceSplit(String.valueOf(i + 
System.nanoTime()), query));
+        }
+        return iotDBSourceSplits;
+    }
+
+    @Override
+    public void addSplitsBack(List<IoTDBSourceSplit> splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void assignSplit(Collection<Integer> taskIDList) {
+        Map<Integer, List<IoTDBSourceSplit>> readySplit = new 
HashMap<>(Common.COLLECTION_SIZE);
+        for (int taskID : taskIDList) {
+            readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+        }
+        pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(), 
taskIDList.size()))
+                .add(s));
+        readySplit.forEach(context::assignSplit);
+        assignedSplit.addAll(pendingSplit);
+        pendingSplit.clear();
+    }
+
+    @Override
+    public IoTDBSourceState snapshotState(long checkpointId) throws Exception {
+        return new IoTDBSourceState(assignedSplit);
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return tp.hashCode() % numReaders;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        //nothing to do
+    }
+
+    @Override
+    public void close() {
+        //nothing to do
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        //nothing to do
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/state/IoTDBSourceState.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/state/IoTDBSourceState.java
new file mode 100644
index 000000000..924cecd24
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/state/IoTDBSourceState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.state;
+
+import org.apache.seatunnel.connectors.seatunnel.iotdb.source.IoTDBSourceSplit;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class IoTDBSourceState implements Serializable {
+
+    private Set<IoTDBSourceSplit> assignedSplit;
+
+    public IoTDBSourceState(Set<IoTDBSourceSplit> assignedSplit) {
+        this.assignedSplit = assignedSplit;
+    }
+
+    public Set<IoTDBSourceSplit> getAssignedSplit() {
+        return assignedSplit;
+    }
+}

Reply via email to