This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 89666aba Fix TsFileDeviceIterator (#222)
89666aba is described below

commit 89666aba473f9712067c0ef4b0c91e880507efea
Author: shuwenwei <[email protected]>
AuthorDate: Mon Sep 2 10:45:10 2024 +0800

    Fix TsFileDeviceIterator (#222)
    
    * fix TsFileDeviceIterator
    
    * add ut
    
    * fix review
---
 .../apache/tsfile/read/TsFileDeviceIterator.java   | 173 ++++++++++++++++++---
 .../apache/tsfile/read/TsFileSequenceReader.java   | 111 +------------
 .../tsfile/read/TsFileDeviceIteratorTest.java      | 141 +++++++++++++++++
 3 files changed, 291 insertions(+), 134 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileDeviceIterator.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileDeviceIterator.java
index 232f3052..3454f5dd 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileDeviceIterator.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileDeviceIterator.java
@@ -19,35 +19,45 @@
 
 package org.apache.tsfile.read;
 
+import org.apache.tsfile.compatibility.DeserializeConfig;
+import org.apache.tsfile.exception.StopReadTsFileByInterruptException;
 import org.apache.tsfile.exception.TsFileRuntimeException;
+import org.apache.tsfile.file.IMetadataIndexEntry;
+import org.apache.tsfile.file.metadata.DeviceMetadataIndexEntry;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 
 public class TsFileDeviceIterator implements Iterator<Pair<IDeviceID, 
Boolean>> {
-  private final TsFileSequenceReader reader;
 
-  // device -> firstMeasurmentNode offset
-  private final Queue<Pair<IDeviceID, long[]>> queue;
+  private final TsFileSequenceReader reader;
+  private final DeserializeConfig deserializeConfig;
+  private final Iterator<MetadataIndexNode> tableMetadataIndexNodeIterator;
+  private final Queue<Pair<IDeviceID, long[]>> queue = new LinkedList<>();
+  private final List<long[]> leafDeviceNodeOffsetList = new LinkedList<>();
   private Pair<IDeviceID, Boolean> currentDevice = null;
   private MetadataIndexNode measurementNode;
 
-  // <startOffset, endOffset>, device leaf node offset in this file
-  private final List<long[]> leafDeviceNodeOffsetList;
+  private static final Logger logger = 
LoggerFactory.getLogger(TsFileDeviceIterator.class);
 
-  public TsFileDeviceIterator(
-      TsFileSequenceReader reader,
-      List<long[]> leafDeviceNodeOffsetList,
-      Queue<Pair<IDeviceID, long[]>> queue) {
+  public TsFileDeviceIterator(TsFileSequenceReader reader) throws IOException {
     this.reader = reader;
-    this.queue = queue;
-    this.leafDeviceNodeOffsetList = leafDeviceNodeOffsetList;
+    this.deserializeConfig = reader.getDeserializeContext();
+    this.tableMetadataIndexNodeIterator =
+        
reader.readFileMetadata().getTableMetadataIndexNodeMap().values().iterator();
   }
 
   public Pair<IDeviceID, Boolean> current() {
@@ -56,21 +66,38 @@ public class TsFileDeviceIterator implements 
Iterator<Pair<IDeviceID, Boolean>>
 
   @Override
   public boolean hasNext() {
-    if (!queue.isEmpty()) {
-      return true;
-    } else if (leafDeviceNodeOffsetList.isEmpty()) {
-      // device queue is empty and all device leaf node has been read
-      return false;
-    } else {
-      // queue is empty but there are still some devices on leaf node not 
being read yet
-      long[] nextDeviceLeafNodeOffset = leafDeviceNodeOffsetList.remove(0);
-      try {
-        reader.getDevicesAndEntriesOfOneLeafNode(
+    try {
+      prepareNextTable();
+      if (!queue.isEmpty()) {
+        return true;
+      } else if (leafDeviceNodeOffsetList.isEmpty()) {
+        // device queue is empty and all device leaf node has been read
+        return false;
+      } else {
+        // queue is empty but there are still some devices on leaf node not 
being read yet
+        long[] nextDeviceLeafNodeOffset = leafDeviceNodeOffsetList.remove(0);
+        getDevicesAndEntriesOfOneLeafNode(
             nextDeviceLeafNodeOffset[0], nextDeviceLeafNodeOffset[1], queue);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+        return true;
       }
-      return true;
+    } catch (IOException e) {
+      throw new TsFileRuntimeException(e);
+    }
+  }
+
+  private void prepareNextTable() throws IOException {
+    if (!queue.isEmpty() || !leafDeviceNodeOffsetList.isEmpty()) {
+      return;
+    }
+    if (!tableMetadataIndexNodeIterator.hasNext()) {
+      return;
+    }
+    MetadataIndexNode nextTableMetadataIndexNode = 
tableMetadataIndexNodeIterator.next();
+
+    if 
(nextTableMetadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE))
 {
+      getDevicesOfLeafNode(nextTableMetadataIndexNode, queue);
+    } else {
+      getAllDeviceLeafNodeOffset(nextTableMetadataIndexNode, 
leafDeviceNodeOffsetList);
     }
   }
 
@@ -96,4 +123,102 @@ public class TsFileDeviceIterator implements 
Iterator<Pair<IDeviceID, Boolean>>
   public MetadataIndexNode getFirstMeasurementNodeOfCurrentDevice() {
     return measurementNode;
   }
+
+  /**
+   * Get devices and first measurement node offset.
+   *
+   * @param startOffset start offset of device leaf node
+   * @param endOffset end offset of device leaf node
+   * @param measurementNodeOffsetQueue device -> first measurement node offset
+   */
+  public void getDevicesAndEntriesOfOneLeafNode(
+      Long startOffset, Long endOffset, Queue<Pair<IDeviceID, long[]>> 
measurementNodeOffsetQueue)
+      throws IOException {
+    try {
+      ByteBuffer nextBuffer = reader.readData(startOffset, endOffset);
+      MetadataIndexNode deviceLeafNode =
+          
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
+              nextBuffer, deserializeConfig);
+      getDevicesOfLeafNode(deviceLeafNode, measurementNodeOffsetQueue);
+    } catch (StopReadTsFileByInterruptException e) {
+      throw e;
+    } catch (Exception e) {
+      logger.error(
+          "Something error happened while getting all devices of file {}", 
reader.getFileName());
+      throw e;
+    }
+  }
+
+  /**
+   * Get all devices and its corresponding entries on the specific device leaf 
node.
+   *
+   * @param deviceLeafNode this node must be device leaf node
+   */
+  private void getDevicesOfLeafNode(
+      MetadataIndexNode deviceLeafNode, Queue<Pair<IDeviceID, long[]>> 
measurementNodeOffsetQueue) {
+    if 
(!deviceLeafNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
+      throw new IllegalStateException("the first param should be device leaf 
node.");
+    }
+    List<IMetadataIndexEntry> childrenEntries = deviceLeafNode.getChildren();
+    for (int i = 0; i < childrenEntries.size(); i++) {
+      IMetadataIndexEntry deviceEntry = childrenEntries.get(i);
+      long childStartOffset = deviceEntry.getOffset();
+      long childEndOffset =
+          i == childrenEntries.size() - 1
+              ? deviceLeafNode.getEndOffset()
+              : childrenEntries.get(i + 1).getOffset();
+      long[] offset = {childStartOffset, childEndOffset};
+      measurementNodeOffsetQueue.add(
+          new Pair<>(((DeviceMetadataIndexEntry) deviceEntry).getDeviceID(), 
offset));
+    }
+  }
+
+  /**
+   * Get the device leaf node offset under the specific device internal node.
+   *
+   * @param deviceInternalNode this node must be device internal node
+   */
+  private void getAllDeviceLeafNodeOffset(
+      MetadataIndexNode deviceInternalNode, List<long[]> 
leafDeviceNodeOffsets) throws IOException {
+    if 
(!deviceInternalNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE))
 {
+      throw new IllegalStateException("the first param should be device 
internal node.");
+    }
+    try {
+      int metadataIndexListSize = deviceInternalNode.getChildren().size();
+      boolean isCurrentLayerLeafNode = false;
+      for (int i = 0; i < metadataIndexListSize; i++) {
+        IMetadataIndexEntry entry = deviceInternalNode.getChildren().get(i);
+        long startOffset = entry.getOffset();
+        long endOffset = deviceInternalNode.getEndOffset();
+        if (i != metadataIndexListSize - 1) {
+          endOffset = deviceInternalNode.getChildren().get(i + 1).getOffset();
+        }
+        if (i == 0) {
+          // check is current layer device leaf node or device internal node. 
Just need to check the
+          // first entry, because the rest are the same
+          MetadataIndexNodeType nodeType =
+              MetadataIndexNodeType.deserialize(
+                  ReadWriteIOUtils.readByte(reader.readData(endOffset - 1, 
endOffset)));
+          isCurrentLayerLeafNode = 
nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE);
+        }
+        if (isCurrentLayerLeafNode) {
+          // is device leaf node
+          long[] offset = {startOffset, endOffset};
+          leafDeviceNodeOffsets.add(offset);
+          continue;
+        }
+        ByteBuffer nextBuffer = reader.readData(startOffset, endOffset);
+        getAllDeviceLeafNodeOffset(
+            
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
+                nextBuffer, deserializeConfig),
+            leafDeviceNodeOffsets);
+      }
+    } catch (StopReadTsFileByInterruptException e) {
+      throw e;
+    } catch (Exception e) {
+      logger.error(
+          "Something error happened while getting all devices of file {}", 
reader.getFileName());
+      throw e;
+    }
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index a69d7872..57c2500d 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -880,116 +880,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    *     on one device leaf node each time to save memory.
    */
   public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws 
IOException {
-    readFileMetadata();
-    Queue<Pair<IDeviceID, long[]>> queue = new LinkedList<>();
-    List<long[]> leafDeviceNodeOffsets = new ArrayList<>();
-    for (MetadataIndexNode metadataIndexNode :
-        tsFileMetaData.getTableMetadataIndexNodeMap().values()) {
-      if 
(metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
-        // the first node of index tree is device leaf node, then get the 
devices directly
-        getDevicesOfLeafNode(metadataIndexNode, queue);
-      } else {
-        // get all device leaf node offset
-        getAllDeviceLeafNodeOffset(metadataIndexNode, leafDeviceNodeOffsets);
-      }
-    }
-    return new TsFileDeviceIterator(this, leafDeviceNodeOffsets, queue);
-  }
-
-  /**
-   * Get devices and first measurement node offset.
-   *
-   * @param startOffset start offset of device leaf node
-   * @param endOffset end offset of device leaf node
-   * @param measurementNodeOffsetQueue device -> first measurement node offset
-   */
-  public void getDevicesAndEntriesOfOneLeafNode(
-      Long startOffset, Long endOffset, Queue<Pair<IDeviceID, long[]>> 
measurementNodeOffsetQueue)
-      throws IOException {
-    try {
-      ByteBuffer nextBuffer = readData(startOffset, endOffset);
-      MetadataIndexNode deviceLeafNode =
-          
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
-              nextBuffer, deserializeConfig);
-      getDevicesOfLeafNode(deviceLeafNode, measurementNodeOffsetQueue);
-    } catch (StopReadTsFileByInterruptException e) {
-      throw e;
-    } catch (Exception e) {
-      logger.error("Something error happened while getting all devices of file 
{}", file);
-      throw e;
-    }
-  }
-
-  /**
-   * Get all devices and its corresponding entries on the specific device leaf 
node.
-   *
-   * @param deviceLeafNode this node must be device leaf node
-   */
-  private void getDevicesOfLeafNode(
-      MetadataIndexNode deviceLeafNode, Queue<Pair<IDeviceID, long[]>> 
measurementNodeOffsetQueue) {
-    if 
(!deviceLeafNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
-      throw new IllegalStateException("the first param should be device leaf 
node.");
-    }
-    List<IMetadataIndexEntry> childrenEntries = deviceLeafNode.getChildren();
-    for (int i = 0; i < childrenEntries.size(); i++) {
-      IMetadataIndexEntry deviceEntry = childrenEntries.get(i);
-      long childStartOffset = deviceEntry.getOffset();
-      long childEndOffset =
-          i == childrenEntries.size() - 1
-              ? deviceLeafNode.getEndOffset()
-              : childrenEntries.get(i + 1).getOffset();
-      long[] offset = {childStartOffset, childEndOffset};
-      measurementNodeOffsetQueue.add(
-          new Pair<>(((DeviceMetadataIndexEntry) deviceEntry).getDeviceID(), 
offset));
-    }
-  }
-
-  /**
-   * Get the device leaf node offset under the specific device internal node.
-   *
-   * @param deviceInternalNode this node must be device internal node
-   */
-  private void getAllDeviceLeafNodeOffset(
-      MetadataIndexNode deviceInternalNode, List<long[]> 
leafDeviceNodeOffsets) throws IOException {
-    if 
(!deviceInternalNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE))
 {
-      throw new IllegalStateException("the first param should be device 
internal node.");
-    }
-    try {
-      int metadataIndexListSize = deviceInternalNode.getChildren().size();
-      boolean isCurrentLayerLeafNode = false;
-      for (int i = 0; i < metadataIndexListSize; i++) {
-        IMetadataIndexEntry entry = deviceInternalNode.getChildren().get(i);
-        long startOffset = entry.getOffset();
-        long endOffset = deviceInternalNode.getEndOffset();
-        if (i != metadataIndexListSize - 1) {
-          endOffset = deviceInternalNode.getChildren().get(i + 1).getOffset();
-        }
-        if (i == 0) {
-          // check is current layer device leaf node or device internal node. 
Just need to check the
-          // first entry, because the rest are the same
-          MetadataIndexNodeType nodeType =
-              MetadataIndexNodeType.deserialize(
-                  ReadWriteIOUtils.readByte(readData(endOffset - 1, 
endOffset)));
-          isCurrentLayerLeafNode = 
nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE);
-        }
-        if (isCurrentLayerLeafNode) {
-          // is device leaf node
-          long[] offset = {startOffset, endOffset};
-          leafDeviceNodeOffsets.add(offset);
-          continue;
-        }
-        ByteBuffer nextBuffer = readData(startOffset, endOffset);
-        getAllDeviceLeafNodeOffset(
-            
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
-                nextBuffer, deserializeConfig),
-            leafDeviceNodeOffsets);
-      }
-    } catch (StopReadTsFileByInterruptException e) {
-      throw e;
-    } catch (Exception e) {
-      logger.error("Something error happened while getting all devices of file 
{}", file);
-      throw e;
-    }
+    return new TsFileDeviceIterator(this);
   }
 
   /**
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java
 
b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java
new file mode 100644
index 00000000..6290d476
--- /dev/null
+++ 
b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read;
+
+import org.apache.tsfile.constant.TestConstant;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TsFileDeviceIteratorTest {
+  private static final String FILE_PATH =
+      TestConstant.BASE_OUTPUT_PATH.concat("TsFileDeviceIterator.tsfile");
+
+  @After
+  public void teardown() {
+    new File(FILE_PATH).delete();
+  }
+
+  @Test
+  public void test() throws IOException {
+    int totalDeviceNum = 0;
+    try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH))) {
+      for (int i = 1; i <= 10; i++) {
+        String tableName = "table" + i;
+        registerTableSchema(writer, tableName);
+        int deviceNum = i;
+        if (i % 2 == 0) {
+          deviceNum *= 10000;
+        } else {
+          deviceNum *= 10;
+        }
+        totalDeviceNum += deviceNum;
+        generateDevice(writer, tableName, deviceNum);
+      }
+      writer.endFile();
+    }
+    int deviceFromIterator = 0;
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+      TsFileDeviceIterator deviceIterator = 
reader.getAllDevicesIteratorWithIsAligned();
+      IDeviceID previous = null;
+      while (deviceIterator.hasNext()) {
+        Pair<IDeviceID, Boolean> next = deviceIterator.next();
+        deviceFromIterator++;
+        if (previous != null) {
+          Assert.assertTrue(previous.compareTo(next.getLeft()) < 0);
+        }
+        previous = next.getLeft();
+      }
+    }
+    Assert.assertEquals(totalDeviceNum, deviceFromIterator);
+  }
+
+  private void registerTableSchema(TsFileIOWriter writer, String tableName) {
+    List<IMeasurementSchema> schemas =
+        Arrays.asList(
+            new MeasurementSchema(
+                "id", TSDataType.TEXT, TSEncoding.PLAIN, 
CompressionType.UNCOMPRESSED),
+            new MeasurementSchema("s1", TSDataType.INT64),
+            new MeasurementSchema("s2", TSDataType.INT64),
+            new MeasurementSchema("s3", TSDataType.INT64),
+            new MeasurementSchema("s4", TSDataType.INT64));
+    List<Tablet.ColumnType> columnTypes =
+        Arrays.asList(
+            Tablet.ColumnType.ID,
+            Tablet.ColumnType.MEASUREMENT,
+            Tablet.ColumnType.MEASUREMENT,
+            Tablet.ColumnType.MEASUREMENT,
+            Tablet.ColumnType.MEASUREMENT);
+    TableSchema tableSchema = new TableSchema(tableName, schemas, columnTypes);
+    writer.getSchema().registerTableSchema(tableSchema);
+  }
+
+  private void generateDevice(TsFileIOWriter writer, String tableName, int 
deviceNum)
+      throws IOException {
+    for (int i = 0; i < deviceNum; i++) {
+      IDeviceID deviceID =
+          IDeviceID.Factory.DEFAULT_FACTORY.create(new String[] {tableName, 
"d" + i});
+      writer.startChunkGroup(deviceID);
+      generateSimpleAlignedSeriesToCurrentDevice(
+          writer, Arrays.asList("s1", "s2", "s3", "s4"), new TimeRange[] {new 
TimeRange(10, 20)});
+      writer.endChunkGroup();
+    }
+  }
+
+  public void generateSimpleAlignedSeriesToCurrentDevice(
+      TsFileIOWriter writer, List<String> measurementNames, TimeRange[] 
toGenerateChunkTimeRanges)
+      throws IOException {
+    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+    for (String measurementName : measurementNames) {
+      measurementSchemas.add(
+          new MeasurementSchema(
+              measurementName, TSDataType.INT64, TSEncoding.RLE, 
CompressionType.LZ4));
+    }
+    for (TimeRange toGenerateChunk : toGenerateChunkTimeRanges) {
+      AlignedChunkWriterImpl alignedChunkWriter = new 
AlignedChunkWriterImpl(measurementSchemas);
+      for (long time = toGenerateChunk.getMin(); time <= 
toGenerateChunk.getMax(); time++) {
+        alignedChunkWriter.getTimeChunkWriter().write(time);
+        for (int i = 0; i < measurementNames.size(); i++) {
+          alignedChunkWriter.getValueChunkWriterByIndex(i).write(time, time, 
false);
+        }
+      }
+      alignedChunkWriter.writeToFileWriter(writer);
+    }
+  }
+}

Reply via email to