This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/object_type by this push:
     new a060b095694 Add Model Writer and Reader
     new 259fbb2948f Merge remote-tracking branch 'origin/object_type' into 
object_type
a060b095694 is described below

commit a060b09569471b95aeaa21f201d9962e20f0b0cc
Author: spricoder <[email protected]>
AuthorDate: Mon Jul 7 13:14:01 2025 +0800

    Add Model Writer and Reader
---
 iotdb-client/service-rpc/pom.xml                   |   5 +
 .../rpc/model/CompressedTsFileModelWriter.java     | 188 +++++++++++
 .../org/apache/iotdb/rpc/model/ModelWriter.java    |  33 ++
 .../apache/iotdb/rpc/model/ModelWriterType.java    |  25 ++
 .../rpc/model/UnCompressedTiffModelWriter.java     |  34 ++
 iotdb-core/datanode/pom.xml                        |   5 +
 .../utils/model/CompressedTsFileModelReader.java   | 343 +++++++++++++++++++++
 .../apache/iotdb/db/utils/model/ModelReader.java   |  37 +++
 .../iotdb/db/utils/model/ModelReaderType.java      |  25 ++
 .../utils/model/UnCompressedTiffModelReader.java   |  48 +++
 10 files changed, 743 insertions(+)

diff --git a/iotdb-client/service-rpc/pom.xml b/iotdb-client/service-rpc/pom.xml
index d7efd73817c..c525afa7cfc 100644
--- a/iotdb-client/service-rpc/pom.xml
+++ b/iotdb-client/service-rpc/pom.xml
@@ -84,6 +84,11 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>net.imagej</groupId>
+            <artifactId>ij</artifactId>
+            <version>1.54g</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTsFileModelWriter.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTsFileModelWriter.java
new file mode 100644
index 00000000000..27468c696af
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTsFileModelWriter.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.model;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.ColumnSchemaBuilder;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.v4.DeviceTableModelWriter;
+import org.apache.tsfile.write.v4.ITsFileWriter;
+import org.apache.tsfile.write.writer.TsFileOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+
+public class CompressedTsFileModelWriter extends ModelWriter {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CompressedTsFileModelWriter.class);
+  private static final int DEFAULT_CHUNK_NUMBER = 128 * 128;
+
+  @Override
+  void write(String filePath, float[] values, int width, int height) {
+    try {
+
+      TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(1);
+      String tableName = "t";
+      MemoryTsFileOutput tsFileOutput = new MemoryTsFileOutput(60 * 1024 * 
1024);
+
+      TableSchema tableSchema =
+          new TableSchema(
+              tableName,
+              Collections.singletonList(
+                  new ColumnSchemaBuilder()
+                      .name("v")
+                      .dataType(TSDataType.FLOAT)
+                      .category(ColumnCategory.FIELD)
+                      .build()));
+      try (ITsFileWriter writer = new DeviceTableModelWriter(tsFileOutput, 
tableSchema, 1)) {
+        Tablet tablet =
+            new Tablet(
+                Collections.singletonList("v"),
+                Collections.singletonList(TSDataType.FLOAT),
+                DEFAULT_CHUNK_NUMBER);
+
+        for (int i = 0; i < values.length; i++) {
+          int row = tablet.getRowSize();
+          tablet.addTimestamp(row, i);
+          tablet.addValue(row, "v", values[i]);
+          // write
+          if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
+            writer.write(tablet);
+            tablet.reset();
+          }
+        }
+        // write
+        if (tablet.getRowSize() != 0) {
+          writer.write(tablet);
+          tablet.reset();
+        }
+      }
+      // SpriCoder write byteBuffer to file
+      ByteBuffer buffer = tsFileOutput.getByteBuffer();
+      createFile(filePath);
+      buffer.position(0); // 重置读取位置(安全措施)
+
+      try (FileChannel channel = FileChannel.open(Paths.get(filePath), 
StandardOpenOption.WRITE)) {
+
+        while (buffer.hasRemaining()) {
+          channel.write(buffer); // 零拷贝直接写入
+        }
+        buffer.flip(); // 恢复原始状态
+
+        // 强制磁盘同步(可靠性要求高的场景使用)
+        channel.force(true); // 强制元数据和内容写入磁盘
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void createFile(String filePath) throws TException {
+    File file = new File(filePath);
+    File directory = file.getParentFile();
+
+    if (directory != null && !directory.exists()) {
+      boolean isCreated = directory.mkdirs();
+      if (!isCreated) {
+        LOGGER.error("directory create failed please check");
+        throw new TException("insert Grid error ");
+      }
+    }
+    try {
+      boolean newFile = file.createNewFile();
+      if (!newFile) {
+        LOGGER.error("createNewFile failed please check");
+        throw new TException("insert Grid error ");
+      }
+    } catch (IOException e) {
+      LOGGER.error("createNewFile failed please check");
+      throw new TException("insert Grid error ");
+    }
+  }
+
+  private static class MemoryTsFileOutput implements TsFileOutput {
+
+    private ByteArrayOutputStream baos;
+
+    public MemoryTsFileOutput(int initialSize) {
+      this.baos = new PublicBAOS(initialSize);
+    }
+
+    public ByteBuffer getByteBuffer() {
+      return ByteBuffer.wrap(baos.toByteArray());
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+      baos.write(b);
+    }
+
+    @Override
+    public void write(byte b) throws IOException {
+      baos.write(b);
+    }
+
+    @Override
+    public void write(ByteBuffer b) throws IOException {
+      baos.write(b.array());
+    }
+
+    @Override
+    public long getPosition() throws IOException {
+      return baos.size();
+    }
+
+    @Override
+    public void close() throws IOException {
+      baos.close();
+    }
+
+    @Override
+    public OutputStream wrapAsStream() throws IOException {
+      return baos;
+    }
+
+    @Override
+    public void flush() throws IOException {
+      baos.flush();
+    }
+
+    @Override
+    public void truncate(long size) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void force() throws IOException {}
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriter.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriter.java
new file mode 100644
index 00000000000..09f5f77a278
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.model;
+
+public abstract class ModelWriter {
+  abstract void write(String filePath, float[] values, int width, int height);
+
+  public ModelWriter getInstance(ModelWriterType modelFileType) {
+    switch (modelFileType) {
+      case UNCOMPRESSED_TIFF:
+        return new UnCompressedTiffModelWriter();
+      default:
+        return new CompressedTsFileModelWriter();
+    }
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriterType.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriterType.java
new file mode 100644
index 00000000000..a587a0bd568
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/ModelWriterType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.model;
+
+public enum ModelWriterType {
+  COMPRESSED_TSFILE,
+  UNCOMPRESSED_TIFF
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/UnCompressedTiffModelWriter.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/UnCompressedTiffModelWriter.java
new file mode 100644
index 00000000000..629dfc8cde6
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/UnCompressedTiffModelWriter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.model;
+
+import ij.ImagePlus;
+import ij.io.FileSaver;
+import ij.process.FloatProcessor;
+
+public class UnCompressedTiffModelWriter extends ModelWriter {
+  @Override
+  void write(String filePath, float[] values, int width, int height) {
+    FloatProcessor floatProcessor = new FloatProcessor(width, height, values);
+    ImagePlus imp = new ImagePlus("first level", floatProcessor);
+    FileSaver fs = new FileSaver(imp);
+    fs.saveAsTiff(filePath);
+  }
+}
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 0a4d9418916..74f4adb3ced 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -392,6 +392,11 @@
             <version>1.3.0</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>net.imagej</groupId>
+            <artifactId>ij</artifactId>
+            <version>1.54g</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTsFileModelReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTsFileModelReader.java
new file mode 100644
index 00000000000..6a4e66a08e1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTsFileModelReader.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.model;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.encrypt.EncryptUtils;
+import org.apache.tsfile.encrypt.IDecryptor;
+import org.apache.tsfile.file.MetaMarker;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.reader.TsFileInput;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.tsfile.read.reader.chunk.ChunkReader.decryptAndUncompressPageData;
+
+public class CompressedTsFileModelReader extends ModelReader {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CompressedTsFileModelReader.class);
+
+  @Override
+  float[] readAll(String filePath) {
+    try {
+      float[] values = null;
+      int i = 0;
+      IDeviceID deviceID = new StringArrayDeviceID("t");
+      byte[] fileBytes = Files.readAllBytes(new File(filePath).toPath());
+      TsFileInput tsFileInput = new 
ByteBufferTsFileInput(ByteBuffer.wrap(fileBytes));
+      try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFileInput)) {
+        TimeseriesMetadata timeseriesMetadata =
+            reader.getAllTimeseriesMetadata(false).get(deviceID).stream()
+                .filter(t -> t.getMeasurementId().equals("v"))
+                .collect(Collectors.toList())
+                .get(0);
+        values = new float[timeseriesMetadata.getStatistics().getCount()];
+
+        byte marker;
+        ChunkHeader chunkHeader;
+        reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 
1);
+        while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+          switch (marker) {
+            case MetaMarker.CHUNK_HEADER:
+            case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+            case MetaMarker.TIME_CHUNK_HEADER:
+            case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+              chunkHeader = reader.readChunkHeader(marker);
+              reader.position(reader.position() + chunkHeader.getDataSize());
+              break;
+            case MetaMarker.VALUE_CHUNK_HEADER:
+            case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+              chunkHeader = reader.readChunkHeader(marker);
+              ByteBuffer chunkDataBuffer = reader.readChunk(-1, 
chunkHeader.getDataSize());
+
+              while (chunkDataBuffer.hasRemaining()) {
+                PageHeader pageHeader = null;
+                if (((byte) (chunkHeader.getChunkType() & 0x3F))
+                    == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+                  pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, new 
FloatStatistics());
+                } else {
+                  pageHeader =
+                      PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
+                }
+
+                ByteBuffer pageData = readCompressedPageData(pageHeader, 
chunkDataBuffer);
+                IDecryptor decryptor = 
IDecryptor.getDecryptor(EncryptUtils.getEncryptParameter());
+                ByteBuffer uncompressedPageData =
+                    decryptAndUncompressPageData(
+                        pageHeader,
+                        
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
+                        pageData,
+                        decryptor);
+                Decoder decoder =
+                    Decoder.getDecoderByType(
+                        chunkHeader.getEncodingType(), 
chunkHeader.getDataType());
+
+                byte[] bitmap = null;
+                if (uncompressedPageData.hasRemaining()) {
+                  int size = ReadWriteIOUtils.readInt(uncompressedPageData);
+                  bitmap = new byte[(size + 7) / 8];
+                  uncompressedPageData.get(bitmap);
+                }
+                while (decoder.hasNext(uncompressedPageData)) {
+                  values[i++] = decoder.readFloat(uncompressedPageData);
+                }
+              }
+              break;
+            case MetaMarker.CHUNK_GROUP_HEADER:
+              reader.readChunkGroupHeader();
+              break;
+            default:
+              return values;
+          }
+        }
+      }
+      return values;
+    } catch (Exception e) {
+      LOGGER.error("Read TS file failed", e);
+      return new float[] {0};
+    }
+  }
+
+  public static class ByteBufferTsFileInput implements TsFileInput {
+
+    public ByteBuffer buffer;
+
+    public ByteBufferTsFileInput(ByteBuffer buffer) {
+      this.buffer = buffer;
+    }
+
+    @Override
+    public long size() throws IOException {
+      return buffer.limit();
+    }
+
+    @Override
+    public long position() throws IOException {
+      return buffer.position();
+    }
+
+    @Override
+    public TsFileInput position(long newPosition) throws IOException {
+      buffer.position((int) newPosition);
+      return this;
+    }
+
+    @Override
+    public int read(ByteBuffer dst) {
+      int bytesToRead = Math.min(dst.remaining(), buffer.remaining());
+      if (bytesToRead == 0) {
+        return 0;
+      }
+      ByteBuffer slice = buffer.slice();
+      slice.limit(bytesToRead);
+      dst.put(slice);
+      buffer.position(buffer.position() + bytesToRead);
+      return bytesToRead;
+    }
+
+    @Override
+    public int read(ByteBuffer dst, long position) {
+      ByteBuffer readBuffer = buffer.slice();
+      readBuffer.position((int) position);
+      int bytesToRead = Math.min(dst.remaining(), readBuffer.remaining());
+      if (bytesToRead > 0) {
+        readBuffer.limit(readBuffer.position() + bytesToRead);
+        dst.put(readBuffer);
+      }
+      return bytesToRead;
+    }
+
+    @Override
+    public InputStream wrapAsInputStream() throws IOException {
+      return new InputStream() {
+        @Override
+        public int read() throws IOException {
+          if (!buffer.hasRemaining()) {
+            return -1;
+          }
+          return buffer.get() & 0xFF;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+          if (!buffer.hasRemaining()) {
+            return -1;
+          }
+          int toRead = Math.min(len, buffer.remaining());
+          buffer.get(b, off, toRead);
+          return toRead;
+        }
+
+        @Override
+        public int available() throws IOException {
+          return buffer.remaining();
+        }
+      };
+    }
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    public String getFilePath() {
+      return "memory tsfile data buffer";
+    }
+  }
+
+  @Override
+  List<float[]> penetrate(String filePath, List<List<Integer>> 
startAndEndTimeArray) {
+    try {
+      List<float[]> results = new ArrayList<>(startAndEndTimeArray.size());
+      int currentQueryIndex = 0;
+      int currentResultSetIndex = 0;
+      for (List<Integer> ints : startAndEndTimeArray) {
+        results.add(new float[ints.get(1) - ints.get(0) + 1]);
+      }
+      IDecryptor decryptor = null;
+      IDeviceID deviceID = new StringArrayDeviceID("t");
+      int index = 0;
+      try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+        TimeseriesMetadata timeseriesMetadata = 
reader.readTimeseriesMetadata(deviceID, "v", true);
+        for (IChunkMetadata chunkMetadata : 
timeseriesMetadata.getChunkMetadataList()) {
+          TimeRange timeRange =
+              new TimeRange(chunkMetadata.getStartTime(), 
chunkMetadata.getEndTime());
+          boolean overlap = false;
+          for (int i = currentQueryIndex; i < startAndEndTimeArray.size(); 
i++) {
+            List<Integer> startAndEnd = startAndEndTimeArray.get(i);
+            if (timeRange.overlaps(new TimeRange(startAndEnd.get(0), 
startAndEnd.get(1)))) {
+              overlap = true;
+              break;
+            }
+          }
+          if (!overlap) {
+            index += chunkMetadata.getStatistics().getCount();
+            continue;
+          }
+
+          Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadata);
+          ChunkHeader chunkHeader = chunk.getHeader();
+          ByteBuffer chunkDataBuffer = chunk.getData();
+          while (chunkDataBuffer.hasRemaining()) {
+            PageHeader pageHeader = null;
+            if (((byte) (chunkHeader.getChunkType() & 0x3F))
+                == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+              pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunk.getChunkStatistic());
+            } else {
+              pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
+            }
+
+            ByteBuffer pageData = readCompressedPageData(pageHeader, 
chunkDataBuffer);
+            TimeRange pageTimeRange =
+                new TimeRange(pageHeader.getStartTime(), 
pageHeader.getEndTime());
+            boolean pageOverlap = false;
+            for (int i = currentQueryIndex; i < startAndEndTimeArray.size(); 
i++) {
+              List<Integer> startAndEnd = startAndEndTimeArray.get(i);
+              if (pageTimeRange.overlaps(new TimeRange(startAndEnd.get(0), 
startAndEnd.get(1)))) {
+                pageOverlap = true;
+                break;
+              }
+            }
+            if (!pageOverlap) {
+              index += pageHeader.getStatistics().getCount();
+              continue;
+            }
+            decryptor =
+                decryptor == null ? 
IDecryptor.getDecryptor(chunk.getEncryptParam()) : decryptor;
+            ByteBuffer uncompressedPageData =
+                decryptAndUncompressPageData(
+                    pageHeader,
+                    
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
+                    pageData,
+                    decryptor);
+            Decoder decoder =
+                Decoder.getDecoderByType(chunkHeader.getEncodingType(), 
chunkHeader.getDataType());
+
+            byte[] bitmap = null;
+            if (uncompressedPageData.hasRemaining()) {
+              int size = ReadWriteIOUtils.readInt(uncompressedPageData);
+              bitmap = new byte[(size + 7) / 8];
+              uncompressedPageData.get(bitmap);
+            }
+            while (decoder.hasNext(uncompressedPageData)) {
+              float[] currentQueryResult = results.get(currentQueryIndex);
+              if (currentResultSetIndex >= currentQueryResult.length) {
+                currentQueryIndex++;
+                currentResultSetIndex = 0;
+              }
+              if (currentQueryIndex == startAndEndTimeArray.size()) {
+                return results;
+              }
+              currentQueryResult = results.get(currentQueryIndex);
+              float v = decoder.readFloat(uncompressedPageData);
+
+              List<Integer> currentQueryStartAndEnd = 
startAndEndTimeArray.get(currentQueryIndex);
+              if (index >= currentQueryStartAndEnd.get(0)
+                  && index <= currentQueryStartAndEnd.get(1)) {
+                currentQueryResult[currentResultSetIndex++] = v;
+              }
+              index++;
+            }
+          }
+        }
+      }
+      return results;
+    } catch (Exception e) {
+      LOGGER.error("Penetrate TS file failed", e);
+      return new ArrayList<>();
+    }
+  }
+
+  public static ByteBuffer readCompressedPageData(PageHeader pageHeader, 
ByteBuffer chunkBuffer)
+      throws IOException {
+    int compressedPageBodyLength = pageHeader.getCompressedSize();
+    if (compressedPageBodyLength > chunkBuffer.remaining()) {
+      throw new IOException(
+          "do not have a complete page body. Expected:"
+              + compressedPageBodyLength
+              + ". Actual:"
+              + chunkBuffer.remaining());
+    }
+    ByteBuffer pageBodyBuffer = chunkBuffer.slice();
+    pageBodyBuffer.limit(compressedPageBodyLength);
+    chunkBuffer.position(chunkBuffer.position() + compressedPageBodyLength);
+    return pageBodyBuffer;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReader.java
new file mode 100644
index 00000000000..2dcb4c6834d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.model;
+
+import java.util.List;
+
+public abstract class ModelReader {
+  abstract float[] readAll(String filePath);
+
+  abstract List<float[]> penetrate(String filePath, List<List<Integer>> 
startAndEndTimeArray);
+
+  public ModelReader getInstance(ModelReaderType modelFileType) {
+    switch (modelFileType) {
+      case UNCOMPRESSED_TIFF:
+        return new UnCompressedTiffModelReader();
+      default:
+        return new CompressedTsFileModelReader();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReaderType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReaderType.java
new file mode 100644
index 00000000000..fe07b4d0f8e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/ModelReaderType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.model;
+
+public enum ModelReaderType {
+  COMPRESSED_TSFILE,
+  UNCOMPRESSED_TIFF
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/UnCompressedTiffModelReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/UnCompressedTiffModelReader.java
new file mode 100644
index 00000000000..edfb994db87
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/UnCompressedTiffModelReader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.model;
+
+import ij.ImagePlus;
+import ij.io.Opener;
+import ij.process.FloatProcessor;
+import ij.process.ImageProcessor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class UnCompressedTiffModelReader extends ModelReader {
+
+  @Override
+  float[] readAll(String filePath) {
+    Opener opener = new Opener();
+    ImagePlus imagePlus = opener.openImage(filePath);
+    ImageProcessor processor = imagePlus.getProcessor();
+    if (processor instanceof FloatProcessor) {
+      return (float[]) processor.getPixels();
+    } else {
+      return new float[0];
+    }
+  }
+
+  @Override
+  List<float[]> penetrate(String filePath, List<List<Integer>> 
startAndEndTimeArray) {
+    return Collections.emptyList();
+  }
+}

Reply via email to