This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit af8943e480d9857e69b6b4e5a251535e9198e894
Author: Wei Zhong <[email protected]>
AuthorDate: Wed Apr 29 20:02:52 2020 +0800

    address comments
---
 .../apache/iotdb/flink/FlinkTsFileBatchSink.java   |  2 +-
 .../apache/iotdb/flink/FlinkTsFileStreamSink.java  |  2 +-
 flink-tsfile-connector/README.md                   |  2 +-
 .../iotdb/flink/tsfile/RowTSRecordConverter.java   |  2 +-
 .../iotdb/flink/tsfile/TSRecordConverter.java      | 27 +++++++++++++++++++++-
 .../iotdb/flink/tsfile/TSRecordOutputFormat.java   |  4 ++--
 .../iotdb/flink/tsfile/TsFileOutputFormat.java     |  4 ++++
 7 files changed, 36 insertions(+), 7 deletions(-)

diff --git 
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java 
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java
index 28aaf83..2394824 100644
--- 
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java
+++ 
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java
@@ -40,7 +40,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * The example of writing TsFile via Flink DataSet API.
+ * The example of writing to TsFile via Flink DataSet API.
  */
 public class FlinkTsFileBatchSink {
 
diff --git 
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java 
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java
index 2043c87..cdf1390 100644
--- 
a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java
+++ 
b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java
@@ -40,7 +40,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * The example of writing TsFile via Flink DataStream API.
+ * The example of writing to TsFile via Flink DataStream API.
  */
 public class FlinkTsFileStreamSink {
 
diff --git a/flink-tsfile-connector/README.md b/flink-tsfile-connector/README.md
index 8b8b88d..9e576ba 100644
--- a/flink-tsfile-connector/README.md
+++ b/flink-tsfile-connector/README.md
@@ -91,7 +91,7 @@ for (String s : result) {
 }
 ```
 
-### TSRecordOutputFormat Example
+### Example of TSRecordOutputFormat 
 
 1. create TSRecordOutputFormat with default RowTSRecordConverter.
 
diff --git 
a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java
 
b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java
index 238085a..640b9c6 100644
--- 
a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java
+++ 
b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java
@@ -107,7 +107,7 @@ public class RowTSRecordConverter implements 
TSRecordConverter<Row> {
        }
 
        @Override
-       public void covertAndCollect(Row input, Collector<TSRecord> collector) 
throws IOException {
+       public void convert(Row input, Collector<TSRecord> collector) throws 
IOException {
                long timestamp = (long) input.getField(timeIndex);
                for (TSRecord tsRecord : reuse) {
                        tsRecord.dataPointList.clear();
diff --git 
a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java
 
b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java
index 0d66425..97810d9 100644
--- 
a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java
+++ 
b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java
@@ -26,11 +26,36 @@ import org.apache.iotdb.tsfile.write.schema.Schema;
 import java.io.IOException;
 import java.io.Serializable;
 
+/**
+ * The converter describes how to turn a data object into multiple TSRecord 
objects, which is required by the
+ * {@link TSRecordOutputFormat}.
+ *
+ * @param <T> The type of the upstream data.
+ */
 public interface TSRecordConverter<T> extends Serializable {
 
+       /**
+        * Opens current converter.
+        *
+        * @param schema The schema of the TSRecord.
+        */
        void open(Schema schema) throws IOException;
 
-       void covertAndCollect(T input, Collector<TSRecord> collector) throws 
IOException;
+       /**
+        * Converts the input data into one or multiple TSRecords. The 
collector in param list is used to collect the
+        * output.
+        *
+        * When this method is called, the converter is guaranteed to be opened.
+        *
+        * @param input The input data.
+        * @param collector The collector used to collect the output.
+        */
+       void convert(T input, Collector<TSRecord> collector) throws IOException;
 
+       /**
+        * Method that marks the end of the life-cycle of this converter.
+        *
+        * When this method is called, the converter is guaranteed to be opened.
+        */
        void close() throws IOException;
 }
diff --git 
a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java
 
b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java
index 9050da9..4bbda7c 100644
--- 
a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java
+++ 
b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java
@@ -70,7 +70,7 @@ public class TSRecordOutputFormat<T> extends 
TsFileOutputFormat<T> {
        @Override
        public void writeRecord(T t) throws IOException {
                try {
-                       converter.covertAndCollect(t, tsRecordCollector);
+                       converter.convert(t, tsRecordCollector);
                } catch (FlinkRuntimeException e) {
                        throw new IOException(e.getCause());
                }
@@ -96,4 +96,4 @@ public class TSRecordOutputFormat<T> extends 
TsFileOutputFormat<T> {
        public TSRecordConverter<T> getConverter() {
                return converter;
        }
-}
\ No newline at end of file
+}
diff --git 
a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java
 
b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java
index 859e08f..2d90913 100644
--- 
a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java
+++ 
b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java
@@ -40,6 +40,10 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Optional;
 
+/**
+ * The abstract base class of the output formats which write data to TsFile.
+ * @param <T> The input data type.
+ */
 public abstract class TsFileOutputFormat<T> extends FileOutputFormat<T> {
 
        protected Schema schema;

Reply via email to