This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch flink_iotdb
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5681888618bcf5c9288e38bd06207ff4891e98fb
Author: Ring-k <yuyuank...@hotmail.com>
AuthorDate: Thu Apr 1 17:30:32 2021 +0800

    IoTDBSource
---
 .../org/apache/iotdb/flink/FlinkIoTDBSink.java     |  5 +-
 .../java/org/apache/iotdb/flink/IoTDBSink.java     | 13 ++--
 .../java/org/apache/iotdb/flink/IoTDBSource.java   | 71 ++++++++++++++++++++++
 .../apache/iotdb/flink/options/IoTDBOptions.java   | 69 +++++++++++++++++++++
 .../IoTDBSinkOptions.java}                         | 51 +++-------------
 .../iotdb/flink/options/IoTDBSourceOptions.java    | 35 +++++++++++
 .../flink/DefaultIoTSerializationSchemaTest.java   |  6 +-
 .../iotdb/flink/IoTDBSinkBatchInsertTest.java      |  5 +-
 .../iotdb/flink/IoTDBSinkBatchTimerTest.java       |  5 +-
 .../apache/iotdb/flink/IoTDBSinkInsertTest.java    |  5 +-
 10 files changed, 205 insertions(+), 60 deletions(-)

diff --git 
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java 
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
index 0e45dc5..5620949 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.iotdb.flink;
 
+import org.apache.iotdb.flink.options.IoTDBSinkOptions;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -35,7 +36,7 @@ public class FlinkIoTDBSink {
     // run the flink job on local mini cluster
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-    IoTDBOptions options = new IoTDBOptions();
+    IoTDBSinkOptions options = new IoTDBSinkOptions();
     options.setHost("127.0.0.1");
     options.setPort(6667);
     options.setUser("root");
@@ -46,7 +47,7 @@ public class FlinkIoTDBSink {
     // here.
     options.setTimeseriesOptionList(
         Lists.newArrayList(
-            new IoTDBOptions.TimeseriesOption(
+            new IoTDBSinkOptions.TimeseriesOption(
                 "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, 
CompressionType.SNAPPY)));
 
     IoTSerializationSchema serializationSchema = new 
DefaultIoTSerializationSchema();
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index a56bbe0..eaba61d 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -18,6 +18,7 @@
 
 package org.apache.iotdb.flink;
 
+import org.apache.iotdb.flink.options.IoTDBSinkOptions;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.pool.SessionPool;
@@ -50,9 +51,9 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
 
-  private IoTDBOptions options;
+  private IoTDBSinkOptions options;
   private IoTSerializationSchema<IN> serializationSchema;
-  private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
+  private Map<String, IoTDBSinkOptions.TimeseriesOption> timeseriesOptionMap;
   private transient SessionPool pool;
   private transient ScheduledExecutorService scheduledExecutor;
 
@@ -61,12 +62,12 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
   private List<Event> batchList;
   private int sessionPoolSize = 2;
 
-  public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
+  public IoTDBSink(IoTDBSinkOptions options, IoTSerializationSchema<IN> 
schema) {
     this.options = options;
     this.serializationSchema = schema;
     this.batchList = new LinkedList<>();
     this.timeseriesOptionMap = new HashMap<>();
-    for (IoTDBOptions.TimeseriesOption timeseriesOption : 
options.getTimeseriesOptionList()) {
+    for (IoTDBSinkOptions.TimeseriesOption timeseriesOption : 
options.getTimeseriesOptionList()) {
       timeseriesOptionMap.put(timeseriesOption.getPath(), timeseriesOption);
     }
   }
@@ -94,7 +95,7 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
       }
     }
 
-    for (IoTDBOptions.TimeseriesOption option : 
options.getTimeseriesOptionList()) {
+    for (IoTDBSinkOptions.TimeseriesOption option : 
options.getTimeseriesOptionList()) {
       if (!pool.checkTimeseriesExists(option.getPath())) {
         pool.createTimeseries(
             option.getPath(), option.getDataType(), option.getEncoding(), 
option.getCompressor());
@@ -191,7 +192,7 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
         && measurements.size() == values.size()) {
       for (int i = 0; i < measurements.size(); i++) {
         String measurement = device + TsFileConstant.PATH_SEPARATOR + 
measurements.get(i);
-        IoTDBOptions.TimeseriesOption timeseriesOption = 
timeseriesOptionMap.get(measurement);
+        IoTDBSinkOptions.TimeseriesOption timeseriesOption = 
timeseriesOptionMap.get(measurement);
         if (timeseriesOption != null && 
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
           // The TEXT data type should be covered by " or '
           values.set(i, "'" + values.get(i) + "'");
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSource.java 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSource.java
new file mode 100644
index 0000000..3432a65
--- /dev/null
+++ 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSource.java
@@ -0,0 +1,71 @@
+package org.apache.iotdb.flink;
+
+import org.apache.iotdb.flink.options.IoTDBSourceOptions;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class IoTDBSource<T> extends RichSourceFunction<T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(IoTDBSource.class);
+  private static final long serialVersionUID = 1L;
+  private IoTDBSourceOptions sourceOptions;
+
+  private transient Session session;
+  SessionDataSet dataSet;
+
+  public IoTDBSource(IoTDBSourceOptions ioTDBSourceOptions) {
+    this.sourceOptions = ioTDBSourceOptions;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    initSession();
+  }
+
+  public abstract T convert(RowRecord rowRecord);
+
+  @Override
+  public void run(SourceContext<T> sourceContext) throws Exception {
+    dataSet = session.executeQueryStatement(sourceOptions.getSql());
+    dataSet.setFetchSize(1024); // default is 10000
+    while (dataSet.hasNext()) {
+      sourceContext.collect(convert(dataSet.next()));
+    }
+    dataSet.closeOperationHandle();
+  }
+
+  @Override
+  public void cancel() {
+    try {
+      dataSet.closeOperationHandle();
+    } catch (StatementExecutionException e) {
+      e.printStackTrace();
+    } catch (IoTDBConnectionException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    session.close();
+  }
+
+  void initSession() throws Exception {
+    session =
+        new Session(
+            sourceOptions.getHost(),
+            sourceOptions.getPort(),
+            sourceOptions.getUser(),
+            sourceOptions.getPassword());
+  }
+}
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
new file mode 100644
index 0000000..a619836
--- /dev/null
+++ 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBOptions.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink.options;
+
+import java.io.Serializable;
+
+public class IoTDBOptions implements Serializable {
+
+  protected String host;
+  protected int port;
+  protected String user;
+  protected String password;
+
+  public IoTDBOptions(String host, int port, String user, String password) {
+    this.host = host;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+  }
+
+  public IoTDBOptions() {}
+
+  public String getHost() {
+    return host;
+  }
+
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+}
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
similarity index 79%
rename from 
flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java
rename to 
flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
index 6dc83e9..60bab60 100644
--- 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBOptions.java
+++ 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.iotdb.flink;
+package org.apache.iotdb.flink.options;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -26,63 +26,25 @@ import java.io.Serializable;
 import java.util.List;
 
 /** IoTDBOptions describes the configuration related information for IoTDB and 
timeseries. */
-public class IoTDBOptions implements Serializable {
-  private String host;
-  private int port;
-  private String user;
-  private String password;
+public class IoTDBSinkOptions extends IoTDBOptions {
+
   private String storageGroup;
   private List<TimeseriesOption> timeseriesOptionList;
 
-  public IoTDBOptions() {}
+  public IoTDBSinkOptions() {}
 
-  public IoTDBOptions(
+  public IoTDBSinkOptions(
       String host,
       int port,
       String user,
       String password,
       String storageGroup,
       List<TimeseriesOption> timeseriesOptionList) {
-    this.host = host;
-    this.port = port;
-    this.user = user;
-    this.password = password;
+    super(host, port, user, password);
     this.storageGroup = storageGroup;
     this.timeseriesOptionList = timeseriesOptionList;
   }
 
-  public String getHost() {
-    return host;
-  }
-
-  public void setHost(String host) {
-    this.host = host;
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  public void setPort(int port) {
-    this.port = port;
-  }
-
-  public String getUser() {
-    return user;
-  }
-
-  public void setUser(String user) {
-    this.user = user;
-  }
-
-  public String getPassword() {
-    return password;
-  }
-
-  public void setPassword(String password) {
-    this.password = password;
-  }
-
   public String getStorageGroup() {
     return storageGroup;
   }
@@ -100,6 +62,7 @@ public class IoTDBOptions implements Serializable {
   }
 
   public static class TimeseriesOption implements Serializable {
+
     private String path;
     private TSDataType dataType = TSDataType.TEXT;
     private TSEncoding encoding = TSEncoding.PLAIN;
diff --git 
a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSourceOptions.java
 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSourceOptions.java
new file mode 100644
index 0000000..4d3c4ce
--- /dev/null
+++ 
b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSourceOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink.options;
+
+public class IoTDBSourceOptions extends IoTDBOptions {
+  private String sql;
+
+  public IoTDBSourceOptions(String host, int port, String user, String 
password, String sql) {
+    super(host, port, user, password);
+    this.sql = sql;
+  }
+
+  public String getSql() {
+    return sql;
+  }
+
+  public void setSql(String sql) {
+    this.sql = sql;
+  }
+}
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
index 58dd779..d1c0714 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/DefaultIoTSerializationSchemaTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.iotdb.flink;
 
+import org.apache.iotdb.flink.options.IoTDBSinkOptions;
+
 import com.google.common.collect.Lists;
 import org.junit.Test;
 
@@ -30,9 +32,9 @@ public class DefaultIoTSerializationSchemaTest {
 
   @Test
   public void serialize() {
-    IoTDBOptions options = new IoTDBOptions();
+    IoTDBSinkOptions options = new IoTDBSinkOptions();
     options.setTimeseriesOptionList(
-        Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+        Lists.newArrayList(new 
IoTDBSinkOptions.TimeseriesOption("root.sg.D01.temperature")));
     DefaultIoTSerializationSchema serializationSchema = new 
DefaultIoTSerializationSchema();
 
     Map<String, String> tuple = new HashMap();
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
index 779b636..68cd7b4 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchInsertTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.iotdb.flink;
 
+import org.apache.iotdb.flink.options.IoTDBSinkOptions;
 import org.apache.iotdb.session.pool.SessionPool;
 
 import com.google.common.collect.Lists;
@@ -40,9 +41,9 @@ public class IoTDBSinkBatchInsertTest {
 
   @Before
   public void setUp() {
-    IoTDBOptions options = new IoTDBOptions();
+    IoTDBSinkOptions options = new IoTDBSinkOptions();
     options.setTimeseriesOptionList(
-        Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+        Lists.newArrayList(new 
IoTDBSinkOptions.TimeseriesOption("root.sg.D01.temperature")));
     ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
     ioTDBSink.withBatchSize(3);
 
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
index 338f845..72397c8 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.iotdb.flink;
 
+import org.apache.iotdb.flink.options.IoTDBSinkOptions;
 import org.apache.iotdb.session.pool.SessionPool;
 
 import com.google.common.collect.Lists;
@@ -40,9 +41,9 @@ public class IoTDBSinkBatchTimerTest {
 
   @Before
   public void setUp() {
-    IoTDBOptions options = new IoTDBOptions();
+    IoTDBSinkOptions options = new IoTDBSinkOptions();
     options.setTimeseriesOptionList(
-        Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+        Lists.newArrayList(new 
IoTDBSinkOptions.TimeseriesOption("root.sg.D01.temperature")));
     ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
     ioTDBSink.withBatchSize(3);
     ioTDBSink.withFlushIntervalMs(1000);
diff --git 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
index b42faf6..9efcef7 100644
--- 
a/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
+++ 
b/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkInsertTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.iotdb.flink;
 
+import org.apache.iotdb.flink.options.IoTDBSinkOptions;
 import org.apache.iotdb.session.pool.SessionPool;
 
 import com.google.common.collect.Lists;
@@ -39,9 +40,9 @@ public class IoTDBSinkInsertTest {
 
   @Before
   public void setUp() throws Exception {
-    IoTDBOptions options = new IoTDBOptions();
+    IoTDBSinkOptions options = new IoTDBSinkOptions();
     options.setTimeseriesOptionList(
-        Lists.newArrayList(new 
IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
+        Lists.newArrayList(new 
IoTDBSinkOptions.TimeseriesOption("root.sg.D01.temperature")));
     ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
 
     pool = mock(SessionPool.class);

Reply via email to