This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch tsfile_hdfs_example in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 920fd7b78e5fcdae3afa7459839aeef6cbb03420 Author: samperson1997 <[email protected]> AuthorDate: Tue Apr 7 09:40:55 2020 +0800 Add TsFile writing to HDFS example --- .../iotdb/hadoop/tsfile/TsFileWriteToHDFS.java | 77 ++++++++++++++++++++++ .../fileOutputFactory/LocalFSOutputFactory.java | 4 +- .../write/writer/ForceAppendTsFileWriter.java | 2 +- ...ultTsFileOutput.java => LocalTsFileOutput.java} | 11 +--- .../write/writer/RestorableTsFileIOWriter.java | 2 - .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 5 +- 6 files changed, 86 insertions(+), 15 deletions(-) diff --git a/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java b/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java new file mode 100644 index 0000000..bdd6fd9 --- /dev/null +++ b/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.hadoop.tsfile; + +import java.io.File; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +public class TsFileWriteToHDFS { + + private static TSFileConfig config = TSFileDescriptor.getInstance().getConfig(); + + + public static void main(String[] args) { + config.setTSFileStorageFs("HDFS"); + + try { + String path = "hdfs://localhost:9000/test.tsfile"; + File f = FSFactoryProducer.getFSFactory().getFile(path); + if (!f.exists()) { + f.createNewFile(); + } + TsFileWriter tsFileWriter = new TsFileWriter(f); + tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_1), + new MeasurementSchema(Constant.SENSOR_1, TSDataType.INT64, TSEncoding.RLE)); + tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_2), + new MeasurementSchema(Constant.SENSOR_2, TSDataType.INT64, TSEncoding.RLE)); + tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_3), + new MeasurementSchema(Constant.SENSOR_3, TSDataType.INT64, TSEncoding.RLE)); + + // construct TSRecord + for (int i = 0; i < 100; i++) { + TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_1); + DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i); + DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i); + DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i); + tsRecord.addTuple(dPoint1); + tsRecord.addTuple(dPoint2); + tsRecord.addTuple(dPoint3); + + // write TSRecord + tsFileWriter.write(tsRecord); + } + + tsFileWriter.close(); + } catch (Throwable e) { + e.printStackTrace(); + System.out.println(e.getMessage()); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java index 52dfd6d..c21caed 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java @@ -25,7 +25,7 @@ import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.iotdb.tsfile.write.writer.DefaultTsFileOutput; +import org.apache.iotdb.tsfile.write.writer.LocalTsFileOutput; import org.apache.iotdb.tsfile.write.writer.TsFileOutput; public class LocalFSOutputFactory implements FileOutputFactory { @@ -34,7 +34,7 @@ public class LocalFSOutputFactory implements FileOutputFactory { public TsFileOutput getTsFileOutput(String filePath, boolean append) { try { - return new DefaultTsFileOutput(new FileOutputStream(filePath, append)); + return new LocalTsFileOutput(new FileOutputStream(filePath, append)); } catch (IOException e) { logger.error("Failed to get TsFile output of file: {}, ", filePath, e); return null; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java index 04aee9c..d1b0114 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java @@ -43,7 +43,7 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter { if (resourceLogger.isInfoEnabled()) { resourceLogger.info("{} writer is opened.", file.getName()); } - this.out = new DefaultTsFileOutput(file, true); + this.out = new LocalTsFileOutput(file, true); this.file = file; // file doesn't exist diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java similarity index 85% rename from tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java rename to tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java index 684b270..1e6e105 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java @@ -31,22 +31,17 @@ import java.nio.ByteBuffer; * existed, it will be created. Otherwise the file will be written from position * 0. */ -public class DefaultTsFileOutput implements TsFileOutput { +public class LocalTsFileOutput implements TsFileOutput { private FileOutputStream outputStream; private BufferedOutputStream bufferedStream; - DefaultTsFileOutput(File file) throws FileNotFoundException { - this.outputStream = new FileOutputStream(file); - this.bufferedStream = new BufferedOutputStream(outputStream); - } - - DefaultTsFileOutput(File file, boolean append) throws FileNotFoundException { + LocalTsFileOutput(File file, boolean append) throws FileNotFoundException { this.outputStream = new FileOutputStream(file, append); this.bufferedStream = new BufferedOutputStream(outputStream); } - public DefaultTsFileOutput(FileOutputStream outputStream) { + public LocalTsFileOutput(FileOutputStream outputStream) { this.outputStream = outputStream; this.bufferedStream = new BufferedOutputStream(outputStream); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java index fcae9cd..ee94550 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java @@ -47,8 +47,6 @@ import org.slf4j.LoggerFactory; */ public class RestorableTsFileIOWriter extends TsFileIOWriter { - private static final Logger logger = LoggerFactory - .getLogger(RestorableTsFileIOWriter.class); private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor"); private long truncatedPosition = -1; private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index a8146c8..d8d9ad2 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.BytesUtils; @@ -98,7 +99,7 @@ public class TsFileIOWriter { * @throws IOException if I/O error occurs */ public TsFileIOWriter(File file) throws IOException { - this.out = new DefaultTsFileOutput(file); + this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false); this.file = file; if (resourceLogger.isInfoEnabled()) { resourceLogger.info("{} writer is opened.", file.getName()); @@ -234,7 +235,7 @@ public class TsFileIOWriter { } Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList(chunkMetadataListMap); - + TsFileMetadata tsFileMetaData = new TsFileMetadata(); tsFileMetaData.setDeviceMetadataIndex(deviceMetaDataMap); tsFileMetaData.setVersionInfo(versionInfo);
