This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit af8943e480d9857e69b6b4e5a251535e9198e894 Author: Wei Zhong <[email protected]> AuthorDate: Wed Apr 29 20:02:52 2020 +0800 address comments --- .../apache/iotdb/flink/FlinkTsFileBatchSink.java | 2 +- .../apache/iotdb/flink/FlinkTsFileStreamSink.java | 2 +- flink-tsfile-connector/README.md | 2 +- .../iotdb/flink/tsfile/RowTSRecordConverter.java | 2 +- .../iotdb/flink/tsfile/TSRecordConverter.java | 27 +++++++++++++++++++++- .../iotdb/flink/tsfile/TSRecordOutputFormat.java | 4 ++-- .../iotdb/flink/tsfile/TsFileOutputFormat.java | 4 ++++ 7 files changed, 36 insertions(+), 7 deletions(-) diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java index 28aaf83..2394824 100644 --- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java +++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java @@ -40,7 +40,7 @@ import java.util.List; import java.util.stream.Collectors; /** - * The example of writing TsFile via Flink DataSet API. + * The example of writing to TsFile via Flink DataSet API. */ public class FlinkTsFileBatchSink { diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java index 2043c87..cdf1390 100644 --- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java +++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java @@ -40,7 +40,7 @@ import java.util.List; import java.util.stream.Collectors; /** - * The example of writing TsFile via Flink DataStream API. + * The example of writing to TsFile via Flink DataStream API. */ public class FlinkTsFileStreamSink { diff --git a/flink-tsfile-connector/README.md b/flink-tsfile-connector/README.md index 8b8b88d..9e576ba 100644 --- a/flink-tsfile-connector/README.md +++ b/flink-tsfile-connector/README.md @@ -91,7 +91,7 @@ for (String s : result) { } ``` -### TSRecordOutputFormat Example +### Example of TSRecordOutputFormat 1. create TSRecordOutputFormat with default RowTSRecordConverter. diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java index 238085a..640b9c6 100644 --- a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java +++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java @@ -107,7 +107,7 @@ public class RowTSRecordConverter implements TSRecordConverter<Row> { } @Override - public void covertAndCollect(Row input, Collector<TSRecord> collector) throws IOException { + public void convert(Row input, Collector<TSRecord> collector) throws IOException { long timestamp = (long) input.getField(timeIndex); for (TSRecord tsRecord : reuse) { tsRecord.dataPointList.clear(); diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java index 0d66425..97810d9 100644 --- a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java +++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java @@ -26,11 +26,36 @@ import org.apache.iotdb.tsfile.write.schema.Schema; import java.io.IOException; import java.io.Serializable; +/** + * The converter describes how to turn a data object into multiple TSRecord objects, which is required by the + * {@link TSRecordOutputFormat}. + * + * @param <T> The type of the upstream data. + */ public interface TSRecordConverter<T> extends Serializable { + /** + * Opens current converter. + * + * @param schema The schema of the TSRecord. + */ void open(Schema schema) throws IOException; - void covertAndCollect(T input, Collector<TSRecord> collector) throws IOException; + /** + * Converts the input data into one or multiple TSRecords. The collector in param list is used to collect the + * output. + * + * When this method is called, the converter is guaranteed to be opened. + * + * @param input The input data. + * @param collector The collector used to collect the output. + */ + void convert(T input, Collector<TSRecord> collector) throws IOException; + /** + * Method that marks the end of the life-cycle of this converter. + * + * When this method is called, the converter is guaranteed to be opened. + */ void close() throws IOException; } diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java index 9050da9..4bbda7c 100644 --- a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java +++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java @@ -70,7 +70,7 @@ public class TSRecordOutputFormat<T> extends TsFileOutputFormat<T> { @Override public void writeRecord(T t) throws IOException { try { - converter.covertAndCollect(t, tsRecordCollector); + converter.convert(t, tsRecordCollector); } catch (FlinkRuntimeException e) { throw new IOException(e.getCause()); } @@ -96,4 +96,4 @@ public class TSRecordOutputFormat<T> extends TsFileOutputFormat<T> { public TSRecordConverter<T> getConverter() { return converter; } -} \ No newline at end of file +} diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java index 859e08f..2d90913 100644 --- a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java +++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java @@ -40,6 +40,10 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Optional; +/** + * The abstract base class of the output formats which write data to TsFile. + * @param <T> The input data type. + */ public abstract class TsFileOutputFormat<T> extends FileOutputFormat<T> { protected Schema schema;
