This is an automated email from the ASF dual-hosted git repository.
spricoder pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new a060b095694 Add Model Writer and Reader
new 259fbb2948f Merge remote-tracking branch 'origin/object_type' into
object_type
a060b095694 is described below
commit a060b09569471b95aeaa21f201d9962e20f0b0cc
Author: spricoder <[email protected]>
AuthorDate: Mon Jul 7 13:14:01 2025 +0800
Add Model Writer and Reader
---
iotdb-client/service-rpc/pom.xml | 5 +
.../rpc/model/CompressedTsFileModelWriter.java | 188 +++++++++++
.../org/apache/iotdb/rpc/model/ModelWriter.java | 33 ++
.../apache/iotdb/rpc/model/ModelWriterType.java | 25 ++
.../rpc/model/UnCompressedTiffModelWriter.java | 34 ++
iotdb-core/datanode/pom.xml | 5 +
.../utils/model/CompressedTsFileModelReader.java | 343 +++++++++++++++++++++
.../apache/iotdb/db/utils/model/ModelReader.java | 37 +++
.../iotdb/db/utils/model/ModelReaderType.java | 25 ++
.../utils/model/UnCompressedTiffModelReader.java | 48 +++
10 files changed, 743 insertions(+)
diff --git a/iotdb-client/service-rpc/pom.xml b/iotdb-client/service-rpc/pom.xml
index d7efd73817c..c525afa7cfc 100644
--- a/iotdb-client/service-rpc/pom.xml
+++ b/iotdb-client/service-rpc/pom.xml
@@ -84,6 +84,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>net.imagej</groupId>
+ <artifactId>ij</artifactId>
+ <version>1.54g</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTsFileModelWriter.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTsFileModelWriter.java
new file mode 100644
index 00000000000..27468c696af
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTsFileModelWriter.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.model;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.ColumnSchemaBuilder;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.v4.DeviceTableModelWriter;
+import org.apache.tsfile.write.v4.ITsFileWriter;
+import org.apache.tsfile.write.writer.TsFileOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+
+public class CompressedTsFileModelWriter extends ModelWriter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CompressedTsFileModelWriter.class);
+ private static final int DEFAULT_CHUNK_NUMBER = 128 * 128;
+
+ @Override
+ void write(String filePath, float[] values, int width, int height) {
+ try {
+
+ TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(1);
+ String tableName = "t";
+ MemoryTsFileOutput tsFileOutput = new MemoryTsFileOutput(60 * 1024 *
1024);
+
+ TableSchema tableSchema =
+ new TableSchema(
+ tableName,
+ Collections.singletonList(
+ new ColumnSchemaBuilder()
+ .name("v")
+ .dataType(TSDataType.FLOAT)
+ .category(ColumnCategory.FIELD)
+ .build()));
+ try (ITsFileWriter writer = new DeviceTableModelWriter(tsFileOutput,
tableSchema, 1)) {
+ Tablet tablet =
+ new Tablet(
+ Collections.singletonList("v"),
+ Collections.singletonList(TSDataType.FLOAT),
+ DEFAULT_CHUNK_NUMBER);
+
+ for (int i = 0; i < values.length; i++) {
+ int row = tablet.getRowSize();
+ tablet.addTimestamp(row, i);
+ tablet.addValue(row, "v", values[i]);
+ // write
+ if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
+ writer.write(tablet);
+ tablet.reset();
+ }
+ }
+ // write
+ if (tablet.getRowSize() != 0) {
+ writer.write(tablet);
+ tablet.reset();
+ }
+ }
+ // SpriCoder write byteBuffer to file
+ ByteBuffer buffer = tsFileOutput.getByteBuffer();
+ createFile(filePath);
+ buffer.position(0); // 重置读取位置(安全措施)
+
+ try (FileChannel channel = FileChannel.open(Paths.get(filePath),
StandardOpenOption.WRITE)) {
+
+ while (buffer.hasRemaining()) {
+ channel.write(buffer); // 零拷贝直接写入
+ }
+ buffer.flip(); // 恢复原始状态
+
+ // 强制磁盘同步(可靠性要求高的场景使用)
+ channel.force(true); // 强制元数据和内容写入磁盘
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void createFile(String filePath) throws TException {
+ File file = new File(filePath);
+ File directory = file.getParentFile();
+
+ if (directory != null && !directory.exists()) {
+ boolean isCreated = directory.mkdirs();
+ if (!isCreated) {
+ LOGGER.error("directory create failed please check");
+ throw new TException("insert Grid error ");
+ }
+ }
+ try {
+ boolean newFile = file.createNewFile();
+ if (!newFile) {
+ LOGGER.error("createNewFile failed please check");
+ throw new TException("insert Grid error ");
+ }
+ } catch (IOException e) {
+ LOGGER.error("createNewFile failed please check");
+ throw new TException("insert Grid error ");
+ }
+ }
+
+ private static class MemoryTsFileOutput implements TsFileOutput {
+
+ private ByteArrayOutputStream baos;
+
+ public MemoryTsFileOutput(int initialSize) {
+ this.baos = new PublicBAOS(initialSize);
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ baos.write(b);
+ }
+
+ @Override
+ public void write(byte b) throws IOException {
+ baos.write(b);
+ }
+
+ @Override
+ public void write(ByteBuffer b) throws IOException {
+ baos.write(b.array());
+ }
+
+ @Override
+ public long getPosition() throws IOException {
+ return baos.size();
+ }
+
+ @Override
+ public void close() throws IOException {
+ baos.close();
+ }
+
+ @Override
+ public OutputStream wrapAsStream() throws IOException {
+ return baos;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ baos.flush();
+ }
+
+ @Override
+ public void truncate(long size) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void force() throws IOException {}
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriter.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriter.java
new file mode 100644
index 00000000000..09f5f77a278
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.model;
+
+public abstract class ModelWriter {
+ abstract void write(String filePath, float[] values, int width, int height);
+
+ public ModelWriter getInstance(ModelWriterType modelFileType) {
+ switch (modelFileType) {
+ case UNCOMPRESSED_TIFF:
+ return new UnCompressedTiffModelWriter();
+ default:
+ return new CompressedTsFileModelWriter();
+ }
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriterType.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriterType.java
new file mode 100644
index 00000000000..a587a0bd568
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriterType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.model;
+
+public enum ModelWriterType {
+ COMPRESSED_TSFILE,
+ UNCOMPRESSED_TIFF
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/UnCompressedTiffModelWriter.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/UnCompressedTiffModelWriter.java
new file mode 100644
index 00000000000..629dfc8cde6
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/UnCompressedTiffModelWriter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.model;
+
+import ij.ImagePlus;
+import ij.io.FileSaver;
+import ij.process.FloatProcessor;
+
+public class UnCompressedTiffModelWriter extends ModelWriter {
+ @Override
+ void write(String filePath, float[] values, int width, int height) {
+ FloatProcessor floatProcessor = new FloatProcessor(width, height, values);
+ ImagePlus imp = new ImagePlus("first level", floatProcessor);
+ FileSaver fs = new FileSaver(imp);
+ fs.saveAsTiff(filePath);
+ }
+}
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 0a4d9418916..74f4adb3ced 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -392,6 +392,11 @@
<version>1.3.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>net.imagej</groupId>
+ <artifactId>ij</artifactId>
+ <version>1.54g</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTsFileModelReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTsFileModelReader.java
new file mode 100644
index 00000000000..6a4e66a08e1
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTsFileModelReader.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.model;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.encrypt.EncryptUtils;
+import org.apache.tsfile.encrypt.IDecryptor;
+import org.apache.tsfile.file.MetaMarker;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.reader.TsFileInput;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.tsfile.read.reader.chunk.ChunkReader.decryptAndUncompressPageData;
+
+public class CompressedTsFileModelReader extends ModelReader {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CompressedTsFileModelReader.class);
+
+ @Override
+ float[] readAll(String filePath) {
+ try {
+ float[] values = null;
+ int i = 0;
+ IDeviceID deviceID = new StringArrayDeviceID("t");
+ byte[] fileBytes = Files.readAllBytes(new File(filePath).toPath());
+ TsFileInput tsFileInput = new
ByteBufferTsFileInput(ByteBuffer.wrap(fileBytes));
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFileInput)) {
+ TimeseriesMetadata timeseriesMetadata =
+ reader.getAllTimeseriesMetadata(false).get(deviceID).stream()
+ .filter(t -> t.getMeasurementId().equals("v"))
+ .collect(Collectors.toList())
+ .get(0);
+ values = new float[timeseriesMetadata.getStatistics().getCount()];
+
+ byte marker;
+ ChunkHeader chunkHeader;
+ reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length +
1);
+ while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ case MetaMarker.TIME_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+ chunkHeader = reader.readChunkHeader(marker);
+ reader.position(reader.position() + chunkHeader.getDataSize());
+ break;
+ case MetaMarker.VALUE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+ chunkHeader = reader.readChunkHeader(marker);
+ ByteBuffer chunkDataBuffer = reader.readChunk(-1,
chunkHeader.getDataSize());
+
+ while (chunkDataBuffer.hasRemaining()) {
+ PageHeader pageHeader = null;
+ if (((byte) (chunkHeader.getChunkType() & 0x3F))
+ == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, new
FloatStatistics());
+ } else {
+ pageHeader =
+ PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
+ }
+
+ ByteBuffer pageData = readCompressedPageData(pageHeader,
chunkDataBuffer);
+ IDecryptor decryptor =
IDecryptor.getDecryptor(EncryptUtils.getEncryptParameter());
+ ByteBuffer uncompressedPageData =
+ decryptAndUncompressPageData(
+ pageHeader,
+
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
+ pageData,
+ decryptor);
+ Decoder decoder =
+ Decoder.getDecoderByType(
+ chunkHeader.getEncodingType(),
chunkHeader.getDataType());
+
+ byte[] bitmap = null;
+ if (uncompressedPageData.hasRemaining()) {
+ int size = ReadWriteIOUtils.readInt(uncompressedPageData);
+ bitmap = new byte[(size + 7) / 8];
+ uncompressedPageData.get(bitmap);
+ }
+ while (decoder.hasNext(uncompressedPageData)) {
+ values[i++] = decoder.readFloat(uncompressedPageData);
+ }
+ }
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ reader.readChunkGroupHeader();
+ break;
+ default:
+ return values;
+ }
+ }
+ }
+ return values;
+ } catch (Exception e) {
+ LOGGER.error("Read TS file failed", e);
+ return new float[] {0};
+ }
+ }
+
+ public static class ByteBufferTsFileInput implements TsFileInput {
+
+ public ByteBuffer buffer;
+
+ public ByteBufferTsFileInput(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public long size() throws IOException {
+ return buffer.limit();
+ }
+
+ @Override
+ public long position() throws IOException {
+ return buffer.position();
+ }
+
+ @Override
+ public TsFileInput position(long newPosition) throws IOException {
+ buffer.position((int) newPosition);
+ return this;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) {
+ int bytesToRead = Math.min(dst.remaining(), buffer.remaining());
+ if (bytesToRead == 0) {
+ return 0;
+ }
+ ByteBuffer slice = buffer.slice();
+ slice.limit(bytesToRead);
+ dst.put(slice);
+ buffer.position(buffer.position() + bytesToRead);
+ return bytesToRead;
+ }
+
+ @Override
+ public int read(ByteBuffer dst, long position) {
+ ByteBuffer readBuffer = buffer.slice();
+ readBuffer.position((int) position);
+ int bytesToRead = Math.min(dst.remaining(), readBuffer.remaining());
+ if (bytesToRead > 0) {
+ readBuffer.limit(readBuffer.position() + bytesToRead);
+ dst.put(readBuffer);
+ }
+ return bytesToRead;
+ }
+
+ @Override
+ public InputStream wrapAsInputStream() throws IOException {
+ return new InputStream() {
+ @Override
+ public int read() throws IOException {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+ return buffer.get() & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+ int toRead = Math.min(len, buffer.remaining());
+ buffer.get(b, off, toRead);
+ return toRead;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return buffer.remaining();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public String getFilePath() {
+ return "memory tsfile data buffer";
+ }
+ }
+
+ @Override
+ List<float[]> penetrate(String filePath, List<List<Integer>>
startAndEndTimeArray) {
+ try {
+ List<float[]> results = new ArrayList<>(startAndEndTimeArray.size());
+ int currentQueryIndex = 0;
+ int currentResultSetIndex = 0;
+ for (List<Integer> ints : startAndEndTimeArray) {
+ results.add(new float[ints.get(1) - ints.get(0) + 1]);
+ }
+ IDecryptor decryptor = null;
+ IDeviceID deviceID = new StringArrayDeviceID("t");
+ int index = 0;
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+ TimeseriesMetadata timeseriesMetadata =
reader.readTimeseriesMetadata(deviceID, "v", true);
+ for (IChunkMetadata chunkMetadata :
timeseriesMetadata.getChunkMetadataList()) {
+ TimeRange timeRange =
+ new TimeRange(chunkMetadata.getStartTime(),
chunkMetadata.getEndTime());
+ boolean overlap = false;
+ for (int i = currentQueryIndex; i < startAndEndTimeArray.size();
i++) {
+ List<Integer> startAndEnd = startAndEndTimeArray.get(i);
+ if (timeRange.overlaps(new TimeRange(startAndEnd.get(0),
startAndEnd.get(1)))) {
+ overlap = true;
+ break;
+ }
+ }
+ if (!overlap) {
+ index += chunkMetadata.getStatistics().getCount();
+ continue;
+ }
+
+ Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadata);
+ ChunkHeader chunkHeader = chunk.getHeader();
+ ByteBuffer chunkDataBuffer = chunk.getData();
+ while (chunkDataBuffer.hasRemaining()) {
+ PageHeader pageHeader = null;
+ if (((byte) (chunkHeader.getChunkType() & 0x3F))
+ == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunk.getChunkStatistic());
+ } else {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
+ }
+
+ ByteBuffer pageData = readCompressedPageData(pageHeader,
chunkDataBuffer);
+ TimeRange pageTimeRange =
+ new TimeRange(pageHeader.getStartTime(),
pageHeader.getEndTime());
+ boolean pageOverlap = false;
+ for (int i = currentQueryIndex; i < startAndEndTimeArray.size();
i++) {
+ List<Integer> startAndEnd = startAndEndTimeArray.get(i);
+ if (pageTimeRange.overlaps(new TimeRange(startAndEnd.get(0),
startAndEnd.get(1)))) {
+ pageOverlap = true;
+ break;
+ }
+ }
+ if (!pageOverlap) {
+ index += pageHeader.getStatistics().getCount();
+ continue;
+ }
+ decryptor =
+ decryptor == null ?
IDecryptor.getDecryptor(chunk.getEncryptParam()) : decryptor;
+ ByteBuffer uncompressedPageData =
+ decryptAndUncompressPageData(
+ pageHeader,
+
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
+ pageData,
+ decryptor);
+ Decoder decoder =
+ Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType());
+
+ byte[] bitmap = null;
+ if (uncompressedPageData.hasRemaining()) {
+ int size = ReadWriteIOUtils.readInt(uncompressedPageData);
+ bitmap = new byte[(size + 7) / 8];
+ uncompressedPageData.get(bitmap);
+ }
+ while (decoder.hasNext(uncompressedPageData)) {
+ float[] currentQueryResult = results.get(currentQueryIndex);
+ if (currentResultSetIndex >= currentQueryResult.length) {
+ currentQueryIndex++;
+ currentResultSetIndex = 0;
+ }
+ if (currentQueryIndex == startAndEndTimeArray.size()) {
+ return results;
+ }
+ currentQueryResult = results.get(currentQueryIndex);
+ float v = decoder.readFloat(uncompressedPageData);
+
+ List<Integer> currentQueryStartAndEnd =
startAndEndTimeArray.get(currentQueryIndex);
+ if (index >= currentQueryStartAndEnd.get(0)
+ && index <= currentQueryStartAndEnd.get(1)) {
+ currentQueryResult[currentResultSetIndex++] = v;
+ }
+ index++;
+ }
+ }
+ }
+ }
+ return results;
+ } catch (Exception e) {
+ LOGGER.error("Penetrate TS file failed", e);
+ return new ArrayList<>();
+ }
+ }
+
+ public static ByteBuffer readCompressedPageData(PageHeader pageHeader,
ByteBuffer chunkBuffer)
+ throws IOException {
+ int compressedPageBodyLength = pageHeader.getCompressedSize();
+ if (compressedPageBodyLength > chunkBuffer.remaining()) {
+ throw new IOException(
+ "do not have a complete page body. Expected:"
+ + compressedPageBodyLength
+ + ". Actual:"
+ + chunkBuffer.remaining());
+ }
+ ByteBuffer pageBodyBuffer = chunkBuffer.slice();
+ pageBodyBuffer.limit(compressedPageBodyLength);
+ chunkBuffer.position(chunkBuffer.position() + compressedPageBodyLength);
+ return pageBodyBuffer;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReader.java
new file mode 100644
index 00000000000..2dcb4c6834d
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.model;
+
+import java.util.List;
+
+public abstract class ModelReader {
+ abstract float[] readAll(String filePath);
+
+ abstract List<float[]> penetrate(String filePath, List<List<Integer>>
startAndEndTimeArray);
+
+ public ModelReader getInstance(ModelReaderType modelFileType) {
+ switch (modelFileType) {
+ case UNCOMPRESSED_TIFF:
+ return new UnCompressedTiffModelReader();
+ default:
+ return new CompressedTsFileModelReader();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReaderType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReaderType.java
new file mode 100644
index 00000000000..fe07b4d0f8e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReaderType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.model;
+
+public enum ModelReaderType {
+ COMPRESSED_TSFILE,
+ UNCOMPRESSED_TIFF
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/UnCompressedTiffModelReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/UnCompressedTiffModelReader.java
new file mode 100644
index 00000000000..edfb994db87
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/UnCompressedTiffModelReader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.model;
+
+import ij.ImagePlus;
+import ij.io.Opener;
+import ij.process.FloatProcessor;
+import ij.process.ImageProcessor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class UnCompressedTiffModelReader extends ModelReader {
+
+ @Override
+ float[] readAll(String filePath) {
+ Opener opener = new Opener();
+ ImagePlus imagePlus = opener.openImage(filePath);
+ ImageProcessor processor = imagePlus.getProcessor();
+ if (processor instanceof FloatProcessor) {
+ return (float[]) processor.getPixels();
+ } else {
+ return new float[0];
+ }
+ }
+
+ @Override
+ List<float[]> penetrate(String filePath, List<List<Integer>>
startAndEndTimeArray) {
+ return Collections.emptyList();
+ }
+}