This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7b78d6c92 [Connectors-V2]Support IoTDB Source (#2431)
7b78d6c92 is described below
commit 7b78d6c9228e438dab0ac62539ab642c0ca11e41
Author: Kirs <[email protected]>
AuthorDate: Fri Aug 19 12:04:46 2022 +0800
[Connectors-V2]Support IoTDB Source (#2431)
* [Connectors-V2]Support IoTDB Source
* fix license header
* fix java doc
* fix check style
* improve read method
* used by SeatunelSchema get source field type
* fetch upstream
* reformat docs
---
docs/en/connector-v2/source/IoTDB.md | 137 ++++++++++++++
plugin-mapping.properties | 1 +
.../seatunnel/iotdb/config/SourceConfig.java | 102 ++++++++++
.../seatunnel/iotdb/constant/SourceConstants.java | 32 ++++
.../seatunnel/iotdb/source/IoTDBSource.java | 101 ++++++++++
.../seatunnel/iotdb/source/IoTDBSourceReader.java | 205 +++++++++++++++++++++
.../seatunnel/iotdb/source/IoTDBSourceSplit.java | 46 +++++
.../iotdb/source/IoTDBSourceSplitEnumerator.java | 184 ++++++++++++++++++
.../seatunnel/iotdb/state/IoTDBSourceState.java | 36 ++++
9 files changed, 844 insertions(+)
diff --git a/docs/en/connector-v2/source/IoTDB.md
b/docs/en/connector-v2/source/IoTDB.md
new file mode 100644
index 000000000..cd241e420
--- /dev/null
+++ b/docs/en/connector-v2/source/IoTDB.md
@@ -0,0 +1,137 @@
+# IoTDB
+
+> IoTDB source connector
+
+## Description
+
+Read external data source data through IoTDB. Currently supports Batch mode.
+
+## Options
+
+| name | type | required | default value |
+|----------------------------|---------|----------|---------------|
+| host | string | yes | - |
+| port | Int | yes | - |
+| node_urls | string | yes | - |
+| sql | string | yes | |
+| fields | config | yes | - |
+| fetch_size | int | no | - |
+| username | string | no | - |
+| password | string | no | - |
+| lower_bound | long | no | - |
+| upper_bound | long | no | - |
+| num_partitions | int | no | - |
+| thrift_default_buffer_size | int | no | - |
+| enable_cache_leader | boolean | no | - |
+| version | string | no | - |
+
+### single node, you need to set host and port to connect to the remote data
source.
+
+**host** [string] the host of the IoTDB when you select host of the IoTDB
+
+**port** [int] the port of the IoTDB when you select
+
+### multi node, you need to set node_urls to connect to the remote data source.
+
+**node_urls** [string] the node_urls of the IoTDB when you select
+
+e.g.
+
+``` 127.0.0.1:8080,127.0.0.2:8080
+```
+
+### other parameters
+
+**sql** [string]
+execute sql statement e.g.
+
+```
+select name,age from test
+```
+
+### fields [string]
+
+the fields of the IoTDB when you select
+
+the field type is SeaTunnel field type
`org.apache.seatunnel.api.table.type.SqlType`
+
+e.g.
+
+```
+fields{
+ name=STRING
+ age=INT
+ }
+```
+
+### option parameters
+
+### fetch_size [int]
+
+the fetch_size of the IoTDB when you select
+
+### username [string]
+
+the username of the IoTDB when you select
+
+### password [string]
+
+the password of the IoTDB when you select
+
+### lower_bound [long]
+
+the lower_bound of the IoTDB when you select
+
+### upper_bound [long]
+
+the upper_bound of the IoTDB when you select
+
+### num_partitions [int]
+
+the num_partitions of the IoTDB when you select
+
+### thrift_default_buffer_size [int]
+
+the thrift_default_buffer_size of the IoTDB when you select
+
+### enable_cache_leader [boolean]
+
+enable_cache_leader of the IoTDB when you select
+
+### version [string]
+
+Version represents the SQL semantic version used by the client, which is used
to be compatible with the SQL semantics of
+0.12 when upgrading 0.13. The possible values are: V_0_12, V_0_13.
+
+### split partitions
+
+we can split the partitions of the IoTDB and we used time column split
+
+#### num_partitions [int]
+
+split num
+
+### upper_bound [long]
+
+upper bound of the time column
+
+### lower_bound [long]
+
+lower bound of the time column
+
+```
+ split the time range into numPartitions parts
+ if numPartitions is 1, use the whole time range
+ if numPartitions < (upper_bound - lower_bound), use (upper_bound -
lower_bound) partitions
+
+ eg: lower_bound = 1, upper_bound = 10, numPartitions = 2
+ sql = "select * from test where age > 0 and age < 10"
+
+ split result
+
+ split 1: select * from test where (time >= 1 and time < 6) and ( age >
0 and age < 10 )
+
+ split 2: select * from test where (time >= 6 and time < 11) and ( age >
0 and age < 10 )
+
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 28bbf5041..9d6983871 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -115,4 +115,5 @@ seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
seatunnel.sink.elasticsearch = connector-elasticsearch
+seatunnel.source.IoTDB = connector-iotdb
seatunnel.sink.IoTDB = connector-iotdb
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
new file mode 100644
index 000000000..5c952b416
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
+
+/**
+ * SourceConfig is the configuration for the IotDBSource.
+ * <p>
+ * please see the following link for more details:
+ *
https://iotdb.apache.org/UserGuide/Master/API/Programming-Java-Native-API.html
+ */
+public class SourceConfig {
+
+ public static final String SQL = "sql";
+
+ /*---------------------- single node configurations
-------------------------*/
+
+ /**
+ * The host of the IotDB server.
+ */
+ public static final String HOST = "host";
+
+ /*
+ * The port of the IotDB server.
+ */
+ public static final String PORT = "port";
+
+
+ /*---------------------- multiple node configurations
-------------------------*/
+
+ /**
+ * Username for the source.
+ */
+ public static final String USERNAME = "username";
+
+ /**
+ * Password for the source.
+ */
+ public static final String PASSWORD = "password";
+
+ /**
+ * multiple nodes
+ */
+ public static final String NODE_URLS = "node_urls";
+
+ /*---------------------- other configurations -------------------------*/
+
+ /**
+ * Fetches the next batch of data from the source.
+ */
+ public static final String FETCH_SIZE = "fetch_size";
+
+ /**
+ * thrift default buffer size
+ */
+ public static final String THRIFT_DEFAULT_BUFFER_SIZE =
"thrift_default_buffer_size";
+
+ /**
+ * thrift max frame size
+ */
+ public static final String THRIFT_MAX_FRAME_SIZE = "thrift_max_frame_size";
+
+ /**
+ * cassandra default buffer size
+ */
+ public static final String ENABLE_CACHE_LEADER = "enable_cache_leader";
+
+ /**
+ * Version represents the SQL semantic version used by the client, which
is used to be compatible with the SQL semantics of 0.12 when upgrading 0.13.
The possible values are: V_0_12, V_0_13.
+ */
+ public static final String VERSION = "version";
+
+ /**
+ * Query lower bound of the time range to be read.
+ */
+ public static final String LOWER_BOUND = "lower_bound";
+
+ /**
+ * Query upper bound of the time range to be read.
+ */
+ public static final String UPPER_BOUND = "upper_bound";
+
+ /**
+ * Query num partitions to be read.
+ */
+ public static final String NUM_PARTITIONS = "num_partitions";
+
+}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/constant/SourceConstants.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/constant/SourceConstants.java
new file mode 100644
index 000000000..25b657cfa
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/constant/SourceConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.constant;
+
+public class SourceConstants {
+
+ public static final String FIELDS_K_V_SPLIT = ":";
+
+ public static final String FIELDS_SPLIT = ",";
+
+ public static final String NODES_SPLIT = ",";
+
+ public static final String SQL_WHERE = "where";
+
+ public static final String DEFAULT_PARTITIONS = "0";
+
+}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
new file mode 100644
index 000000000..987c43c84
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NODE_URLS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.iotdb.state.IoTDBSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@AutoService(SeaTunnelSource.class)
+public class IoTDBSource implements SeaTunnelSource<SeaTunnelRow,
IoTDBSourceSplit, IoTDBSourceState> {
+
+ private SeaTunnelContext seaTunnelContext;
+
+ private SeaTunnelRowType typeInfo;
+
+ private Map<String, Object> configParams = new HashMap();
+
+ @Override
+ public String getPluginName() {
+ return "IoTDB";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HOST, PORT);
+ if (!result.isSuccess()) {
+ result = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS);
+
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(),
PluginType.SOURCE, "host and port and node urls are both empty");
+ }
+ }
+ SeatunnelSchema seatunnelSchema =
SeatunnelSchema.buildWithConfig(pluginConfig);
+ this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
+ pluginConfig.entrySet().forEach(entry ->
configParams.put(entry.getKey(), entry.getValue().unwrapped()));
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return this.typeInfo;
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, IoTDBSourceSplit>
createReader(SourceReader.Context readerContext) {
+ return new IoTDBSourceReader(configParams, readerContext, typeInfo);
+ }
+
+ @Override
+ public SourceSplitEnumerator<IoTDBSourceSplit, IoTDBSourceState>
createEnumerator(SourceSplitEnumerator.Context<IoTDBSourceSplit>
enumeratorContext) throws Exception {
+ return new IoTDBSourceSplitEnumerator(enumeratorContext, configParams);
+ }
+
+ @Override
+ public SourceSplitEnumerator<IoTDBSourceSplit, IoTDBSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<IoTDBSourceSplit>
enumeratorContext, IoTDBSourceState checkpointState) throws Exception {
+ return new IoTDBSourceSplitEnumerator(enumeratorContext,
checkpointState, configParams);
+ }
+
+}
+
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
new file mode 100644
index 000000000..2fcac5ff4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.ENABLE_CACHE_LEADER;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.FETCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NODE_URLS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_DEFAULT_BUFFER_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_MAX_FRAME_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.VERSION;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.NODES_SPLIT;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.session.util.Version;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+public class IoTDBSourceReader implements SourceReader<SeaTunnelRow,
IoTDBSourceSplit> {
+
+ private static final long THREAD_WAIT_TIME = 500L;
+
+ private Map<String, Object> conf;
+
+ private Set<IoTDBSourceSplit> sourceSplits;
+
+ private final SourceReader.Context context;
+
+ private SeaTunnelRowType seaTunnelRowType;
+
+ private Session session;
+
+ public IoTDBSourceReader(Map<String, Object> conf, SourceReader.Context
readerContext, SeaTunnelRowType seaTunnelRowType) {
+ this.conf = conf;
+ this.sourceSplits = new HashSet<>();
+ this.context = readerContext;
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public void open() throws IoTDBConnectionException {
+ session = buildSession(conf);
+ session.open();
+ }
+
+ @Override
+ public void close() throws IOException {
+ //nothing to do
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ throw new IOException("close IoTDB session failed", e);
+ }
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ if (sourceSplits.isEmpty()) {
+ Thread.sleep(THREAD_WAIT_TIME);
+ return;
+ }
+ sourceSplits.forEach(source -> {
+ try {
+ read(source, output);
+ } catch (Exception e) {
+ throw new RuntimeException("IotDB source read error", e);
+ }
+ });
+
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ log.info("Closed the bounded fake source");
+ context.signalNoMoreElement();
+ }
+ }
+
+ private void read(IoTDBSourceSplit split, Collector<SeaTunnelRow> output)
throws Exception {
+ try (SessionDataSet dataSet =
session.executeQueryStatement(split.getQuery())) {
+ while (dataSet.hasNext()) {
+ RowRecord row = dataSet.next();
+ Object[] datas = new Object[row.getFields().size()];
+ for (int i = 0; i < row.getFields().size(); i++) {
+ row.getFields().get(i).getDataType();
+ datas[i] = convertToDataType(row.getFields().get(i));
+ }
+ output.collect(new SeaTunnelRow(datas));
+ }
+ }
+ }
+
+ private Object convertToDataType(Field field) {
+
+ switch (field.getDataType()) {
+ case INT32:
+ return field.getIntV();
+ case INT64:
+ return field.getLongV();
+ case FLOAT:
+ return field.getFloatV();
+ case DOUBLE:
+ return field.getDoubleV();
+ case TEXT:
+ return field.getStringValue();
+ case BOOLEAN:
+ return field.getBoolV();
+ default:
+ throw new IllegalArgumentException("unknown TSData type: " +
field.getDataType());
+ }
+ }
+
+ private Session buildSession(Map<String, Object> conf) {
+ Session.Builder sessionBuilder = new Session.Builder();
+ if (conf.containsKey(HOST)) {
+ sessionBuilder
+ .host((String) conf.get(HOST))
+ .port(Integer.parseInt(conf.get(PORT).toString()))
+ .build();
+ } else {
+ String nodeUrlsString = (String) conf.get(NODE_URLS);
+
+ List<String> nodes =
Stream.of(nodeUrlsString.split(NODES_SPLIT)).collect(Collectors.toList());
+ sessionBuilder.nodeUrls(nodes);
+ }
+ if (null != conf.get(FETCH_SIZE)) {
+
sessionBuilder.fetchSize(Integer.parseInt(conf.get(FETCH_SIZE).toString()));
+ }
+ if (null != conf.get(USERNAME)) {
+ sessionBuilder.username((String) conf.get(USERNAME));
+ }
+ if (null != conf.get(PASSWORD)) {
+ sessionBuilder.password((String) conf.get(PASSWORD));
+ }
+ if (null != conf.get(THRIFT_DEFAULT_BUFFER_SIZE)) {
+
sessionBuilder.thriftDefaultBufferSize(Integer.parseInt(conf.get(THRIFT_DEFAULT_BUFFER_SIZE).toString()));
+ }
+ if (null != conf.get(THRIFT_MAX_FRAME_SIZE)) {
+
sessionBuilder.thriftMaxFrameSize(Integer.parseInt(conf.get(THRIFT_MAX_FRAME_SIZE).toString()));
+ }
+ if (null != conf.get(ENABLE_CACHE_LEADER)) {
+
sessionBuilder.enableCacheLeader(Boolean.parseBoolean(conf.get(ENABLE_CACHE_LEADER).toString()));
+ }
+ if (null != conf.get(VERSION)) {
+ Version version = Version.valueOf(conf.get(VERSION).toString());
+ sessionBuilder.version(version);
+ }
+ return sessionBuilder.build();
+ }
+
+ @Override
+ public List<IoTDBSourceSplit> snapshotState(long checkpointId) {
+ return new ArrayList<>(sourceSplits);
+ }
+
+ @Override
+ public void addSplits(List<IoTDBSourceSplit> splits) {
+ sourceSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ // do nothing
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // do nothing
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java
new file mode 100644
index 000000000..513318054
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class IoTDBSourceSplit implements SourceSplit {
+
+ private static final long serialVersionUID = -1L;
+
+ private String splitId;
+
+ /**
+ * final query statement
+ */
+ private String query;
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public IoTDBSourceSplit(String splitId, String query) {
+ this.splitId = splitId;
+ this.query = query;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
new file mode 100644
index 000000000..01852a488
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.LOWER_BOUND;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NUM_PARTITIONS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.SQL;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.UPPER_BOUND;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.DEFAULT_PARTITIONS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.SQL_WHERE;
+import static
org.apache.iotdb.tsfile.common.constant.QueryConstant.RESERVED_TIME;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.iotdb.state.IoTDBSourceState;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class IoTDBSourceSplitEnumerator implements
SourceSplitEnumerator<IoTDBSourceSplit, IoTDBSourceState> {
+
+ private final Context<IoTDBSourceSplit> context;
+ private Set<IoTDBSourceSplit> pendingSplit;
+ private Set<IoTDBSourceSplit> assignedSplit;
+ private Map<String, Object> conf;
+
+ /**
+ * A SQL statement can contain at most one where
+ * We split the SQL using the where keyword
+ * Therefore, it can be split into two SQL at most
+ */
+ private static final int SQL_WHERE_SPLIT_LENGTH = 2;
+
+ public
IoTDBSourceSplitEnumerator(SourceSplitEnumerator.Context<IoTDBSourceSplit>
context, Map<String, Object> conf) {
+ this.context = context;
+ this.conf = conf;
+ }
+
+ public
IoTDBSourceSplitEnumerator(SourceSplitEnumerator.Context<IoTDBSourceSplit>
context, IoTDBSourceState sourceState, Map<String, Object> conf) {
+ this(context, conf);
+ this.assignedSplit = sourceState.getAssignedSplit();
+ }
+
+ @Override
+ public void open() {
+ this.assignedSplit = new HashSet<>();
+ this.pendingSplit = new HashSet<>();
+ }
+
+ @Override
+ public void run() {
+ pendingSplit = getIotDBSplit();
+ assignSplit(context.registeredReaders());
+ }
+
+ /**
+ * split the time range into numPartitions parts
+ * if numPartitions is 1, use the whole time range
+ * if numPartitions < (end - start), use (start-end) partitions
+ * <p>
+ * eg: start = 1, end = 10, numPartitions = 2
+ * sql = "select * from test where age > 0 and age < 10"
+ * <p>
+ * split result
+ * <p>
+ * split 1: select * from test where (time >= 1 and time < 6) and ( age
> 0 and age < 10 )
+ * <p>
+ * split 2: select * from test where (time >= 6 and time < 11) and ( age
> 0 and age < 10 )
+ */
+ private Set<IoTDBSourceSplit> getIotDBSplit() {
+ String sql = conf.get(SQL).toString();
+ Set<IoTDBSourceSplit> iotDBSourceSplits = new HashSet<>();
+ // no need numPartitions, use one partition
+ if (!conf.containsKey(NUM_PARTITIONS)) {
+ iotDBSourceSplits.add(new IoTDBSourceSplit(DEFAULT_PARTITIONS,
sql));
+ return iotDBSourceSplits;
+ }
+ long start = Long.parseLong(conf.get(LOWER_BOUND).toString());
+ long end = Long.parseLong(conf.get(UPPER_BOUND).toString());
+ int numPartitions =
Integer.parseInt(conf.get(NUM_PARTITIONS).toString());
+ String[] sqls = sql.split(SQL_WHERE);
+ if (sqls.length > SQL_WHERE_SPLIT_LENGTH) {
+ throw new IllegalArgumentException("sql should not contain more
than one where");
+ }
+ int size = (int) (end - start) / numPartitions + 1;
+ int remainder = (int) ((end + 1 - start) % numPartitions);
+ if (end - start < numPartitions) {
+ numPartitions = (int) (end - start);
+ }
+ long currentStart = start;
+ int i = 0;
+ while (i < numPartitions) {
+ String query = " where (" + RESERVED_TIME + " >= " + currentStart
+ " and " + RESERVED_TIME + " < " + (currentStart + size) + ") ";
+ i++;
+ currentStart += size;
+ if (i + 1 <= numPartitions) {
+ currentStart = currentStart - remainder;
+ }
+ query = sqls[0] + query;
+ if (sqls.length > 1) {
+ query = query + " and ( " + sqls[1] + " ) ";
+ }
+ iotDBSourceSplits.add(new IoTDBSourceSplit(String.valueOf(i +
System.nanoTime()), query));
+ }
+ return iotDBSourceSplits;
+ }
+
+ @Override
+ public void addSplitsBack(List<IoTDBSourceSplit> splits, int subtaskId) {
+ if (!splits.isEmpty()) {
+ pendingSplit.addAll(splits);
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplit.size();
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ if (!pendingSplit.isEmpty()) {
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+
+ private void assignSplit(Collection<Integer> taskIDList) {
+ Map<Integer, List<IoTDBSourceSplit>> readySplit = new
HashMap<>(Common.COLLECTION_SIZE);
+ for (int taskID : taskIDList) {
+ readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+ }
+ pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(),
taskIDList.size()))
+ .add(s));
+ readySplit.forEach(context::assignSplit);
+ assignedSplit.addAll(pendingSplit);
+ pendingSplit.clear();
+ }
+
+ @Override
+ public IoTDBSourceState snapshotState(long checkpointId) throws Exception {
+ return new IoTDBSourceState(assignedSplit);
+ }
+
+ private static int getSplitOwner(String tp, int numReaders) {
+ return tp.hashCode() % numReaders;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ //nothing to do
+ }
+
+ @Override
+ public void close() {
+ //nothing to do
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+ //nothing to do
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/state/IoTDBSourceState.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/state/IoTDBSourceState.java
new file mode 100644
index 000000000..924cecd24
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/state/IoTDBSourceState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.state;
+
+import org.apache.seatunnel.connectors.seatunnel.iotdb.source.IoTDBSourceSplit;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class IoTDBSourceState implements Serializable {
+
+ private Set<IoTDBSourceSplit> assignedSplit;
+
+ public IoTDBSourceState(Set<IoTDBSourceSplit> assignedSplit) {
+ this.assignedSplit = assignedSplit;
+ }
+
+ public Set<IoTDBSourceSplit> getAssignedSplit() {
+ return assignedSplit;
+ }
+}