This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c796439b6 [core] Use readFully in DeletionVectorsIndexFile and 
explicitly specify dvRanges' type as LinkedHashMap (#3283)
c796439b6 is described below

commit c796439b6a2bd16b4a89503c9199a6b416bdd249
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Apr 29 11:56:59 2024 +0800

    [core] Use readFully in DeletionVectorsIndexFile and explicitly specify 
dvRanges' type as LinkedHashMap (#3283)
---
 .../paimon/deletionvectors/DeletionVector.java     |  4 +-
 .../deletionvectors/DeletionVectorsIndexFile.java  | 43 ++++++-----
 .../org/apache/paimon/index/IndexFileHandler.java  |  6 +-
 .../org/apache/paimon/index/IndexFileMeta.java     | 13 ++--
 .../paimon/index/IndexFileMetaSerializer.java      |  8 +-
 .../DeletionVectorsIndexFileTest.java              | 85 ++++++++++++++++++----
 .../paimon/index/IndexFileMetaSerializerTest.java  |  4 +-
 7 files changed, 113 insertions(+), 50 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
index be20d3891..60d69a436 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
@@ -115,7 +115,9 @@ public interface DeletionVector {
                         "Size not match, actual size: "
                                 + actualLength
                                 + ", expert size: "
-                                + deletionFile.length());
+                                + deletionFile.length()
+                                + ", file path: "
+                                + path);
             }
             int magicNum = dis.readInt();
             if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 313435928..733fb7825 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -57,10 +57,10 @@ public class DeletionVectorsIndexFile extends IndexFile {
      * @throws UncheckedIOException If an I/O error occurs while reading from 
the file.
      */
     public Map<String, DeletionVector> readAllDeletionVectors(
-            String fileName, Map<String, Pair<Integer, Integer>> 
deletionVectorRanges) {
+            String fileName, LinkedHashMap<String, Pair<Integer, Integer>> 
deletionVectorRanges) {
         Map<String, DeletionVector> deletionVectors = new HashMap<>();
-        try (SeekableInputStream inputStream =
-                fileIO.newInputStream(pathFactory.toPath(fileName))) {
+        Path filePath = pathFactory.toPath(fileName);
+        try (SeekableInputStream inputStream = 
fileIO.newInputStream(filePath)) {
             checkVersion(inputStream);
             DataInputStream dataInputStream = new DataInputStream(inputStream);
             for (Map.Entry<String, Pair<Integer, Integer>> entry :
@@ -69,9 +69,13 @@ public class DeletionVectorsIndexFile extends IndexFile {
                         entry.getKey(),
                         readDeletionVector(dataInputStream, 
entry.getValue().getRight()));
             }
-        } catch (IOException e) {
-            throw new UncheckedIOException(
-                    "Unable to read deletion vectors from file: " + fileName, 
e);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Unable to read deletion vectors from file: "
+                            + filePath
+                            + ", deletionVectorRanges: "
+                            + deletionVectorRanges,
+                    e);
         }
         return deletionVectors;
     }
@@ -87,15 +91,19 @@ public class DeletionVectorsIndexFile extends IndexFile {
      */
     public DeletionVector readDeletionVector(
             String fileName, Pair<Integer, Integer> deletionVectorRange) {
-        try (SeekableInputStream inputStream =
-                fileIO.newInputStream(pathFactory.toPath(fileName))) {
+        Path filePath = pathFactory.toPath(fileName);
+        try (SeekableInputStream inputStream = 
fileIO.newInputStream(filePath)) {
             checkVersion(inputStream);
             inputStream.seek(deletionVectorRange.getLeft());
             DataInputStream dataInputStream = new DataInputStream(inputStream);
             return readDeletionVector(dataInputStream, 
deletionVectorRange.getRight());
-        } catch (IOException e) {
-            throw new UncheckedIOException(
-                    "Unable to read deletion vector from file: " + fileName, 
e);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Unable to read deletion vector from file: "
+                            + filePath
+                            + ", deletionVectorRange: "
+                            + deletionVectorRange,
+                    e);
         }
     }
 
@@ -111,12 +119,11 @@ public class DeletionVectorsIndexFile extends IndexFile {
      *     data is located.
      * @throws UncheckedIOException If an I/O error occurs while writing to 
the file.
      */
-    public Pair<String, Map<String, Pair<Integer, Integer>>> write(
+    public Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> write(
             Map<String, DeletionVector> input) {
         int size = input.size();
-        // use LinkedHashMap to ensure that the order of DeletionVectorRanges 
and the written
-        // DeletionVectors is consistent.
-        Map<String, Pair<Integer, Integer>> deletionVectorRanges = new 
LinkedHashMap<>(size);
+        LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
+                new LinkedHashMap<>(size);
         Path path = pathFactory.newPath();
         try (DataOutputStream dataOutputStream =
                 new DataOutputStream(fileIO.newOutputStream(path, true))) {
@@ -158,11 +165,7 @@ public class DeletionVectorsIndexFile extends IndexFile {
 
             // read DeletionVector bytes
             byte[] bytes = new byte[size];
-            int readSize = inputStream.read(bytes);
-            if (readSize != size) {
-                throw new RuntimeException(
-                        "Size not match, actual size: " + readSize + ", expert 
size: " + size);
-            }
+            inputStream.readFully(bytes);
 
             // check checksum
             int checkSum = calculateChecksum(bytes);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 97cdbf43e..fdc2fe754 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -171,7 +172,8 @@ public class IndexFileHandler {
             throw new IllegalArgumentException(
                     "Input file is not deletion vectors index " + 
fileMeta.indexType());
         }
-        Map<String, Pair<Integer, Integer>> deleteIndexRange = 
fileMeta.deletionVectorsRanges();
+        LinkedHashMap<String, Pair<Integer, Integer>> deleteIndexRange =
+                fileMeta.deletionVectorsRanges();
         if (deleteIndexRange == null || deleteIndexRange.isEmpty()) {
             return Collections.emptyMap();
         }
@@ -193,7 +195,7 @@ public class IndexFileHandler {
     }
 
     public IndexFileMeta writeDeletionVectorsIndex(Map<String, DeletionVector> 
deletionVectors) {
-        Pair<String, Map<String, Pair<Integer, Integer>>> pair =
+        Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
                 deletionVectorsIndex.write(deletionVectors);
         return new IndexFileMeta(
                 DELETION_VECTORS_INDEX,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
index 1b3f405fe..b6afd0c44 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
@@ -29,8 +29,8 @@ import org.apache.paimon.utils.Pair;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 import static org.apache.paimon.utils.SerializationUtils.newStringType;
@@ -43,8 +43,11 @@ public class IndexFileMeta {
     private final long fileSize;
     private final long rowCount;
 
-    /** Metadata only used by {@link DeletionVectorsIndexFile}. */
-    private final @Nullable Map<String, Pair<Integer, Integer>> 
deletionVectorsRanges;
+    /**
+     * Metadata only used by {@link DeletionVectorsIndexFile}, use 
LinkedHashMap to ensure that the
+     * order of DeletionVectorRanges and the written DeletionVectors is 
consistent.
+     */
+    private final @Nullable LinkedHashMap<String, Pair<Integer, Integer>> 
deletionVectorsRanges;
 
     public IndexFileMeta(String indexType, String fileName, long fileSize, 
long rowCount) {
         this(indexType, fileName, fileSize, rowCount, null);
@@ -55,7 +58,7 @@ public class IndexFileMeta {
             String fileName,
             long fileSize,
             long rowCount,
-            @Nullable Map<String, Pair<Integer, Integer>> 
deletionVectorsRanges) {
+            @Nullable LinkedHashMap<String, Pair<Integer, Integer>> 
deletionVectorsRanges) {
         this.indexType = indexType;
         this.fileName = fileName;
         this.fileSize = fileSize;
@@ -79,7 +82,7 @@ public class IndexFileMeta {
         return rowCount;
     }
 
-    public @Nullable Map<String, Pair<Integer, Integer>> 
deletionVectorsRanges() {
+    public @Nullable LinkedHashMap<String, Pair<Integer, Integer>> 
deletionVectorsRanges() {
         return deletionVectorsRanges;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
index 005868540..e9d82af46 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -28,7 +28,6 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
 import java.util.LinkedHashMap;
-import java.util.Map;
 
 /** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */
 public class IndexFileMetaSerializer extends ObjectSerializer<IndexFileMeta> {
@@ -60,7 +59,7 @@ public class IndexFileMetaSerializer extends 
ObjectSerializer<IndexFileMeta> {
     }
 
     public static InternalArray dvRangesToRowArrayData(
-            Map<String, Pair<Integer, Integer>> dvRanges) {
+            LinkedHashMap<String, Pair<Integer, Integer>> dvRanges) {
         return new GenericArray(
                 dvRanges.entrySet().stream()
                         .map(
@@ -72,9 +71,10 @@ public class IndexFileMetaSerializer extends 
ObjectSerializer<IndexFileMeta> {
                         .toArray(GenericRow[]::new));
     }
 
-    public static Map<String, Pair<Integer, Integer>> rowArrayDataToDvRanges(
+    public static LinkedHashMap<String, Pair<Integer, Integer>> 
rowArrayDataToDvRanges(
             InternalArray arrayData) {
-        Map<String, Pair<Integer, Integer>> dvRanges = new 
LinkedHashMap<>(arrayData.size());
+        LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
+                new LinkedHashMap<>(arrayData.size());
         for (int i = 0; i < arrayData.size(); i++) {
             InternalRow row = arrayData.getRow(i, 3);
             dvRanges.put(row.getString(0).toString(), Pair.of(row.getInt(1), 
row.getInt(2)));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index 228508f33..fbaa407d2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -27,7 +27,9 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Random;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -37,20 +39,8 @@ public class DeletionVectorsIndexFileTest {
     @TempDir java.nio.file.Path tempPath;
 
     @Test
-    public void test0() {
-        Path dir = new Path(tempPath.toUri());
-        PathFactory pathFactory =
-                new PathFactory() {
-                    @Override
-                    public Path newPath() {
-                        return new Path(dir, UUID.randomUUID().toString());
-                    }
-
-                    @Override
-                    public Path toPath(String fileName) {
-                        return new Path(dir, fileName);
-                    }
-                };
+    public void testReadDvIndex() {
+        PathFactory pathFactory = getPathFactory();
 
         DeletionVectorsIndexFile deletionVectorsIndexFile =
                 new DeletionVectorsIndexFile(LocalFileIO.create(), 
pathFactory);
@@ -70,10 +60,10 @@ public class DeletionVectorsIndexFileTest {
         index3.delete(3);
         deleteMap.put("file33.parquet", index3);
 
-        Pair<String, Map<String, Pair<Integer, Integer>>> pair =
+        Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
                 deletionVectorsIndexFile.write(deleteMap);
         String fileName = pair.getLeft();
-        Map<String, Pair<Integer, Integer>> deletionVectorRanges = 
pair.getRight();
+        LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges = 
pair.getRight();
 
         // read
         Map<String, DeletionVector> actualDeleteMap =
@@ -94,4 +84,67 @@ public class DeletionVectorsIndexFileTest {
         deletionVectorsIndexFile.delete(fileName);
         assertThat(deletionVectorsIndexFile.exists(fileName)).isFalse();
     }
+
+    @Test
+    public void testReadDvIndexWithCopiousDv() {
+        PathFactory pathFactory = getPathFactory();
+        DeletionVectorsIndexFile deletionVectorsIndexFile =
+                new DeletionVectorsIndexFile(LocalFileIO.create(), 
pathFactory);
+
+        // write
+        Random random = new Random();
+        HashMap<String, DeletionVector> deleteMap = new HashMap<>();
+        for (int i = 0; i < 100000; i++) {
+            BitmapDeletionVector index = new BitmapDeletionVector();
+            index.delete(random.nextInt(1000000));
+            deleteMap.put(String.format("file%s.parquet", i), index);
+        }
+
+        Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
+                deletionVectorsIndexFile.write(deleteMap);
+
+        // read
+        Map<String, DeletionVector> dvs =
+                
deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(), 
pair.getRight());
+        assertThat(dvs.size()).isEqualTo(100000);
+    }
+
+    @Test
+    public void testReadDvIndexWithEnormousDv() {
+        PathFactory pathFactory = getPathFactory();
+        DeletionVectorsIndexFile deletionVectorsIndexFile =
+                new DeletionVectorsIndexFile(LocalFileIO.create(), 
pathFactory);
+
+        // write
+        Random random = new Random();
+        HashMap<String, DeletionVector> deleteMap = new HashMap<>();
+        BitmapDeletionVector index = new BitmapDeletionVector();
+        // dv index's size is about 20M
+        for (int i = 0; i < 10000000; i++) {
+            index.delete(random.nextInt(Integer.MAX_VALUE));
+        }
+        deleteMap.put("largeFile.parquet", index);
+        Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
+                deletionVectorsIndexFile.write(deleteMap);
+
+        // read
+        Map<String, DeletionVector> dvs =
+                
deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(), 
pair.getRight());
+        assertThat(dvs.size()).isEqualTo(1);
+    }
+
+    private PathFactory getPathFactory() {
+        Path dir = new Path(tempPath.toUri());
+        return new PathFactory() {
+            @Override
+            public Path newPath() {
+                return new Path(dir, UUID.randomUUID().toString());
+            }
+
+            @Override
+            public Path toPath(String fileName) {
+                return new Path(dir, fileName);
+            }
+        };
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
index 0e8cfc430..53058a05f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
@@ -24,7 +24,6 @@ import org.apache.paimon.utils.ObjectSerializerTestBase;
 import org.apache.paimon.utils.Pair;
 
 import java.util.LinkedHashMap;
-import java.util.Map;
 import java.util.Random;
 
 /** Test for {@link org.apache.paimon.index.IndexFileMetaSerializer}. */
@@ -49,7 +48,8 @@ public class IndexFileMetaSerializerTest extends 
ObjectSerializerTestBase<IndexF
                     rnd.nextInt(),
                     rnd.nextInt());
         } else {
-            Map<String, Pair<Integer, Integer>> deletionVectorsRanges = new 
LinkedHashMap<>();
+            LinkedHashMap<String, Pair<Integer, Integer>> 
deletionVectorsRanges =
+                    new LinkedHashMap<>();
             deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(), 
rnd.nextInt()));
             deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(), 
rnd.nextInt()));
             return new IndexFileMeta(

Reply via email to