This is an automated email from the ASF dual-hosted git repository. yuyuankang pushed a commit to branch flink_iotdb in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5681888618bcf5c9288e38bd06207ff4891e98fb Author: Ring-k <yuyuank...@hotmail.com> AuthorDate: Thu Apr 1 17:30:32 2021 +0800 IoTDBSource --- .../org/apache/iotdb/flink/FlinkIoTDBSink.java | 5 +- .../java/org/apache/iotdb/flink/IoTDBSink.java | 13 ++-- .../java/org/apache/iotdb/flink/IoTDBSource.java | 71 ++++++++++++++++++++++ .../apache/iotdb/flink/options/IoTDBOptions.java | 69 +++++++++++++++++++++ .../IoTDBSinkOptions.java} | 51 +++------------- .../iotdb/flink/options/IoTDBSourceOptions.java | 35 +++++++++++ .../flink/DefaultIoTSerializationSchemaTest.java | 6 +- .../iotdb/flink/IoTDBSinkBatchInsertTest.java | 5 +- .../iotdb/flink/IoTDBSinkBatchTimerTest.java | 5 +- .../apache/iotdb/flink/IoTDBSinkInsertTest.java | 5 +- 10 files changed, 205 insertions(+), 60 deletions(-) diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java index 0e45dc5..5620949 100644 --- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java +++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java @@ -17,6 +17,7 @@ */ package org.apache.iotdb.flink; +import org.apache.iotdb.flink.options.IoTDBSinkOptions; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; @@ -35,7 +36,7 @@ public class FlinkIoTDBSink { // run the flink job on local mini cluster StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - IoTDBOptions options = new IoTDBOptions(); + IoTDBSinkOptions options = new IoTDBSinkOptions(); options.setHost("127.0.0.1"); options.setPort(6667); options.setUser("root"); @@ -46,7 +47,7 @@ public class FlinkIoTDBSink { // here. options.setTimeseriesOptionList( Lists.newArrayList( - new IoTDBOptions.TimeseriesOption( + new IoTDBSinkOptions.TimeseriesOption( "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY))); IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema(); diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java index a56bbe0..eaba61d 100644 --- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java +++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java @@ -18,6 +18,7 @@ package org.apache.iotdb.flink; +import org.apache.iotdb.flink.options.IoTDBSinkOptions; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.pool.SessionPool; @@ -50,9 +51,9 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class); - private IoTDBOptions options; + private IoTDBSinkOptions options; private IoTSerializationSchema<IN> serializationSchema; - private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap; + private Map<String, IoTDBSinkOptions.TimeseriesOption> timeseriesOptionMap; private transient SessionPool pool; private transient ScheduledExecutorService scheduledExecutor; @@ -61,12 +62,12 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> { private List<Event> batchList; private int sessionPoolSize = 2; - public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) { + public IoTDBSink(IoTDBSinkOptions options, IoTSerializationSchema<IN> schema) { this.options = options; this.serializationSchema = schema; this.batchList = new LinkedList<>(); this.timeseriesOptionMap = new HashMap<>(); - for (IoTDBOptions.TimeseriesOption timeseriesOption : options.getTimeseriesOptionList()) { + for (IoTDBSinkOptions.TimeseriesOption timeseriesOption : options.getTimeseriesOptionList()) { timeseriesOptionMap.put(timeseriesOption.getPath(), timeseriesOption); } } @@ -94,7 +95,7 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> { } } - for (IoTDBOptions.TimeseriesOption option : options.getTimeseriesOptionList()) { + for (IoTDBSinkOptions.TimeseriesOption option : options.getTimeseriesOptionList()) { if (!pool.checkTimeseriesExists(option.getPath())) { pool.createTimeseries( option.getPath(), option.getDataType(), option.getEncoding(), option.getCompressor()); @@ -191,7 +192,7 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> { && measurements.size() == values.size()) { for (int i = 0; i < measurements.size(); i++) { String measurement = device + TsFileConstant.PATH_SEPARATOR + measurements.get(i); - IoTDBOptions.TimeseriesOption timeseriesOption = timeseriesOptionMap.get(measurement); + IoTDBSinkOptions.TimeseriesOption timeseriesOption = timeseriesOptionMap.get(measurement); if (timeseriesOption != null && TSDataType.TEXT.equals(timeseriesOption.getDataType())) { // The TEXT data type should be covered by " or ' values.set(i, "'" + values.get(i) + "'"); diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSource.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSource.java new file mode 100644 index 0000000..3432a65 --- /dev/null +++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSource.java @@ -0,0 +1,71 @@ +package org.apache.iotdb.flink; + +import org.apache.iotdb.flink.options.IoTDBSourceOptions; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.tsfile.read.common.RowRecord; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class IoTDBSource<T> extends RichSourceFunction<T> { + + private static final Logger LOG = LoggerFactory.getLogger(IoTDBSource.class); + private static final long serialVersionUID = 1L; + private IoTDBSourceOptions sourceOptions; + + private transient Session session; + SessionDataSet dataSet; + + public IoTDBSource(IoTDBSourceOptions ioTDBSourceOptions) { + this.sourceOptions = ioTDBSourceOptions; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + initSession(); + } + + public abstract T convert(RowRecord rowRecord); + + @Override + public void run(SourceContext<T> sourceContext) throws Exception { + dataSet = session.executeQueryStatement(sourceOptions.getSql()); + dataSet.setFetchSize(1024); // default is 10000 + while (dataSet.hasNext()) { + sourceContext.collect(convert(dataSet.next())); + } + dataSet.closeOperationHandle(); + } + + @Override + public void cancel() { + try { + dataSet.closeOperationHandle(); + } catch (StatementExecutionException e) { + e.printStackTrace(); + } catch (IoTDBConnectionException e) { + e.printStackTrace(); + } + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + } + + void initSession() throws Exception { + session = + new Session( + sourceOptions.getHost(), + sourceOptions.getPort(), + sourceOptions.getUser(), + sourceOptions.getPassword()); + } +} diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java new file mode 100644 index 0000000..a619836 --- /dev/null +++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.flink.options; + +import java.io.Serializable; + +public class IoTDBOptions implements Serializable { + + protected String host; + protected int port; + protected String user; + protected String password; + + public IoTDBOptions(String host, int port, String user, String password) { + this.host = host; + this.port = port; + this.user = user; + this.password = password; + } + + public IoTDBOptions() {} + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } +} diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java similarity index 79% rename from flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java rename to flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java index 6dc83e9..60bab60 100644 --- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java +++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.flink; +package org.apache.iotdb.flink.options; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -26,63 +26,25 @@ import java.io.Serializable; import java.util.List; /** IoTDBOptions describes the configuration related information for IoTDB and timeseries. */ -public class IoTDBOptions implements Serializable { - private String host; - private int port; - private String user; - private String password; +public class IoTDBSinkOptions extends IoTDBOptions { + private String storageGroup; private List<TimeseriesOption> timeseriesOptionList; - public IoTDBOptions() {} + public IoTDBSinkOptions() {} - public IoTDBOptions( + public IoTDBSinkOptions( String host, int port, String user, String password, String storageGroup, List<TimeseriesOption> timeseriesOptionList) { - this.host = host; - this.port = port; - this.user = user; - this.password = password; + super(host, port, user, password); this.storageGroup = storageGroup; this.timeseriesOptionList = timeseriesOptionList; } - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String getUser() { - return user; - } - - public void setUser(String user) { - this.user = user; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - public String getStorageGroup() { return storageGroup; } @@ -100,6 +62,7 @@ public class IoTDBOptions implements Serializable { } public static class TimeseriesOption implements Serializable { + private String path; private TSDataType dataType = TSDataType.TEXT; private TSEncoding encoding = TSEncoding.PLAIN; diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSourceOptions.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSourceOptions.java new file mode 100644 index 0000000..4d3c4ce --- /dev/null +++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSourceOptions.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.flink.options; + +public class IoTDBSourceOptions extends IoTDBOptions { + private String sql; + + public IoTDBSourceOptions(String host, int port, String user, String password, String sql) { + super(host, port, user, password); + this.sql = sql; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } +} diff --git a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java index 58dd779..d1c0714 100644 --- a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java +++ b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java @@ -18,6 +18,8 @@ package org.apache.iotdb.flink; +import org.apache.iotdb.flink.options.IoTDBSinkOptions; + import com.google.common.collect.Lists; import org.junit.Test; @@ -30,9 +32,9 @@ public class DefaultIoTSerializationSchemaTest { @Test public void serialize() { - IoTDBOptions options = new IoTDBOptions(); + IoTDBSinkOptions options = new IoTDBSinkOptions(); options.setTimeseriesOptionList( - Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.D01.temperature"))); + Lists.newArrayList(new IoTDBSinkOptions.TimeseriesOption("root.sg.D01.temperature"))); DefaultIoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema(); Map<String, String> tuple = new HashMap(); diff --git a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java index 779b636..68cd7b4 100644 --- a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java +++ b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java @@ -18,6 +18,7 @@ package org.apache.iotdb.flink; +import org.apache.iotdb.flink.options.IoTDBSinkOptions; import org.apache.iotdb.session.pool.SessionPool; import com.google.common.collect.Lists; @@ -40,9 +41,9 @@ public class IoTDBSinkBatchInsertTest { @Before public void setUp() { - IoTDBOptions options = new IoTDBOptions(); + IoTDBSinkOptions options = new IoTDBSinkOptions(); options.setTimeseriesOptionList( - Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.D01.temperature"))); + Lists.newArrayList(new IoTDBSinkOptions.TimeseriesOption("root.sg.D01.temperature"))); ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema()); ioTDBSink.withBatchSize(3); diff --git a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java index 338f845..72397c8 100644 --- a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java +++ b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java @@ -18,6 +18,7 @@ package org.apache.iotdb.flink; +import org.apache.iotdb.flink.options.IoTDBSinkOptions; import org.apache.iotdb.session.pool.SessionPool; import com.google.common.collect.Lists; @@ -40,9 +41,9 @@ public class IoTDBSinkBatchTimerTest { @Before public void setUp() { - IoTDBOptions options = new IoTDBOptions(); + IoTDBSinkOptions options = new IoTDBSinkOptions(); options.setTimeseriesOptionList( - Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.D01.temperature"))); + Lists.newArrayList(new IoTDBSinkOptions.TimeseriesOption("root.sg.D01.temperature"))); ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema()); ioTDBSink.withBatchSize(3); ioTDBSink.withFlushIntervalMs(1000); diff --git a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java index b42faf6..9efcef7 100644 --- a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java +++ b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java @@ -18,6 +18,7 @@ package org.apache.iotdb.flink; +import org.apache.iotdb.flink.options.IoTDBSinkOptions; import org.apache.iotdb.session.pool.SessionPool; import com.google.common.collect.Lists; @@ -39,9 +40,9 @@ public class IoTDBSinkInsertTest { @Before public void setUp() throws Exception { - IoTDBOptions options = new IoTDBOptions(); + IoTDBSinkOptions options = new IoTDBSinkOptions(); options.setTimeseriesOptionList( - Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.D01.temperature"))); + Lists.newArrayList(new IoTDBSinkOptions.TimeseriesOption("root.sg.D01.temperature"))); ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema()); pool = mock(SessionPool.class);