This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c796439b6 [core] Use readFully in DeletionVectorsIndexFile and
explicitly specify dvRanges' type as LinkedHashMap (#3283)
c796439b6 is described below
commit c796439b6a2bd16b4a89503c9199a6b416bdd249
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Apr 29 11:56:59 2024 +0800
[core] Use readFully in DeletionVectorsIndexFile and explicitly specify
dvRanges' type as LinkedHashMap (#3283)
---
.../paimon/deletionvectors/DeletionVector.java | 4 +-
.../deletionvectors/DeletionVectorsIndexFile.java | 43 ++++++-----
.../org/apache/paimon/index/IndexFileHandler.java | 6 +-
.../org/apache/paimon/index/IndexFileMeta.java | 13 ++--
.../paimon/index/IndexFileMetaSerializer.java | 8 +-
.../DeletionVectorsIndexFileTest.java | 85 ++++++++++++++++++----
.../paimon/index/IndexFileMetaSerializerTest.java | 4 +-
7 files changed, 113 insertions(+), 50 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
index be20d3891..60d69a436 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
@@ -115,7 +115,9 @@ public interface DeletionVector {
"Size not match, actual size: "
+ actualLength
+ ", expert size: "
- + deletionFile.length());
+ + deletionFile.length()
+ + ", file path: "
+ + path);
}
int magicNum = dis.readInt();
if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 313435928..733fb7825 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -57,10 +57,10 @@ public class DeletionVectorsIndexFile extends IndexFile {
* @throws UncheckedIOException If an I/O error occurs while reading from
the file.
*/
public Map<String, DeletionVector> readAllDeletionVectors(
- String fileName, Map<String, Pair<Integer, Integer>>
deletionVectorRanges) {
+ String fileName, LinkedHashMap<String, Pair<Integer, Integer>>
deletionVectorRanges) {
Map<String, DeletionVector> deletionVectors = new HashMap<>();
- try (SeekableInputStream inputStream =
- fileIO.newInputStream(pathFactory.toPath(fileName))) {
+ Path filePath = pathFactory.toPath(fileName);
+ try (SeekableInputStream inputStream =
fileIO.newInputStream(filePath)) {
checkVersion(inputStream);
DataInputStream dataInputStream = new DataInputStream(inputStream);
for (Map.Entry<String, Pair<Integer, Integer>> entry :
@@ -69,9 +69,13 @@ public class DeletionVectorsIndexFile extends IndexFile {
entry.getKey(),
readDeletionVector(dataInputStream,
entry.getValue().getRight()));
}
- } catch (IOException e) {
- throw new UncheckedIOException(
- "Unable to read deletion vectors from file: " + fileName,
e);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to read deletion vectors from file: "
+ + filePath
+ + ", deletionVectorRanges: "
+ + deletionVectorRanges,
+ e);
}
return deletionVectors;
}
@@ -87,15 +91,19 @@ public class DeletionVectorsIndexFile extends IndexFile {
*/
public DeletionVector readDeletionVector(
String fileName, Pair<Integer, Integer> deletionVectorRange) {
- try (SeekableInputStream inputStream =
- fileIO.newInputStream(pathFactory.toPath(fileName))) {
+ Path filePath = pathFactory.toPath(fileName);
+ try (SeekableInputStream inputStream =
fileIO.newInputStream(filePath)) {
checkVersion(inputStream);
inputStream.seek(deletionVectorRange.getLeft());
DataInputStream dataInputStream = new DataInputStream(inputStream);
return readDeletionVector(dataInputStream,
deletionVectorRange.getRight());
- } catch (IOException e) {
- throw new UncheckedIOException(
- "Unable to read deletion vector from file: " + fileName,
e);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to read deletion vector from file: "
+ + filePath
+ + ", deletionVectorRange: "
+ + deletionVectorRange,
+ e);
}
}
@@ -111,12 +119,11 @@ public class DeletionVectorsIndexFile extends IndexFile {
* data is located.
* @throws UncheckedIOException If an I/O error occurs while writing to
the file.
*/
- public Pair<String, Map<String, Pair<Integer, Integer>>> write(
+ public Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> write(
Map<String, DeletionVector> input) {
int size = input.size();
- // use LinkedHashMap to ensure that the order of DeletionVectorRanges
and the written
- // DeletionVectors is consistent.
- Map<String, Pair<Integer, Integer>> deletionVectorRanges = new
LinkedHashMap<>(size);
+ LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
+ new LinkedHashMap<>(size);
Path path = pathFactory.newPath();
try (DataOutputStream dataOutputStream =
new DataOutputStream(fileIO.newOutputStream(path, true))) {
@@ -158,11 +165,7 @@ public class DeletionVectorsIndexFile extends IndexFile {
// read DeletionVector bytes
byte[] bytes = new byte[size];
- int readSize = inputStream.read(bytes);
- if (readSize != size) {
- throw new RuntimeException(
- "Size not match, actual size: " + readSize + ", expert
size: " + size);
- }
+ inputStream.readFully(bytes);
// check checksum
int checkSum = calculateChecksum(bytes);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 97cdbf43e..fdc2fe754 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -171,7 +172,8 @@ public class IndexFileHandler {
throw new IllegalArgumentException(
"Input file is not deletion vectors index " +
fileMeta.indexType());
}
- Map<String, Pair<Integer, Integer>> deleteIndexRange =
fileMeta.deletionVectorsRanges();
+ LinkedHashMap<String, Pair<Integer, Integer>> deleteIndexRange =
+ fileMeta.deletionVectorsRanges();
if (deleteIndexRange == null || deleteIndexRange.isEmpty()) {
return Collections.emptyMap();
}
@@ -193,7 +195,7 @@ public class IndexFileHandler {
}
public IndexFileMeta writeDeletionVectorsIndex(Map<String, DeletionVector>
deletionVectors) {
- Pair<String, Map<String, Pair<Integer, Integer>>> pair =
+ Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
deletionVectorsIndex.write(deletionVectors);
return new IndexFileMeta(
DELETION_VECTORS_INDEX,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
index 1b3f405fe..b6afd0c44 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
@@ -29,8 +29,8 @@ import org.apache.paimon.utils.Pair;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
@@ -43,8 +43,11 @@ public class IndexFileMeta {
private final long fileSize;
private final long rowCount;
- /** Metadata only used by {@link DeletionVectorsIndexFile}. */
- private final @Nullable Map<String, Pair<Integer, Integer>>
deletionVectorsRanges;
+ /**
+ * Metadata only used by {@link DeletionVectorsIndexFile}, use
LinkedHashMap to ensure that the
+ * order of DeletionVectorRanges and the written DeletionVectors is
consistent.
+ */
+ private final @Nullable LinkedHashMap<String, Pair<Integer, Integer>>
deletionVectorsRanges;
public IndexFileMeta(String indexType, String fileName, long fileSize,
long rowCount) {
this(indexType, fileName, fileSize, rowCount, null);
@@ -55,7 +58,7 @@ public class IndexFileMeta {
String fileName,
long fileSize,
long rowCount,
- @Nullable Map<String, Pair<Integer, Integer>>
deletionVectorsRanges) {
+ @Nullable LinkedHashMap<String, Pair<Integer, Integer>>
deletionVectorsRanges) {
this.indexType = indexType;
this.fileName = fileName;
this.fileSize = fileSize;
@@ -79,7 +82,7 @@ public class IndexFileMeta {
return rowCount;
}
- public @Nullable Map<String, Pair<Integer, Integer>>
deletionVectorsRanges() {
+ public @Nullable LinkedHashMap<String, Pair<Integer, Integer>>
deletionVectorsRanges() {
return deletionVectorsRanges;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
index 005868540..e9d82af46 100644
---
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -28,7 +28,6 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.VersionedObjectSerializer;
import java.util.LinkedHashMap;
-import java.util.Map;
/** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */
public class IndexFileMetaSerializer extends ObjectSerializer<IndexFileMeta> {
@@ -60,7 +59,7 @@ public class IndexFileMetaSerializer extends
ObjectSerializer<IndexFileMeta> {
}
public static InternalArray dvRangesToRowArrayData(
- Map<String, Pair<Integer, Integer>> dvRanges) {
+ LinkedHashMap<String, Pair<Integer, Integer>> dvRanges) {
return new GenericArray(
dvRanges.entrySet().stream()
.map(
@@ -72,9 +71,10 @@ public class IndexFileMetaSerializer extends
ObjectSerializer<IndexFileMeta> {
.toArray(GenericRow[]::new));
}
- public static Map<String, Pair<Integer, Integer>> rowArrayDataToDvRanges(
+ public static LinkedHashMap<String, Pair<Integer, Integer>>
rowArrayDataToDvRanges(
InternalArray arrayData) {
- Map<String, Pair<Integer, Integer>> dvRanges = new
LinkedHashMap<>(arrayData.size());
+ LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
+ new LinkedHashMap<>(arrayData.size());
for (int i = 0; i < arrayData.size(); i++) {
InternalRow row = arrayData.getRow(i, 3);
dvRanges.put(row.getString(0).toString(), Pair.of(row.getInt(1),
row.getInt(2)));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index 228508f33..fbaa407d2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -27,7 +27,9 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Random;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -37,20 +39,8 @@ public class DeletionVectorsIndexFileTest {
@TempDir java.nio.file.Path tempPath;
@Test
- public void test0() {
- Path dir = new Path(tempPath.toUri());
- PathFactory pathFactory =
- new PathFactory() {
- @Override
- public Path newPath() {
- return new Path(dir, UUID.randomUUID().toString());
- }
-
- @Override
- public Path toPath(String fileName) {
- return new Path(dir, fileName);
- }
- };
+ public void testReadDvIndex() {
+ PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
new DeletionVectorsIndexFile(LocalFileIO.create(),
pathFactory);
@@ -70,10 +60,10 @@ public class DeletionVectorsIndexFileTest {
index3.delete(3);
deleteMap.put("file33.parquet", index3);
- Pair<String, Map<String, Pair<Integer, Integer>>> pair =
+ Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
deletionVectorsIndexFile.write(deleteMap);
String fileName = pair.getLeft();
- Map<String, Pair<Integer, Integer>> deletionVectorRanges =
pair.getRight();
+ LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
pair.getRight();
// read
Map<String, DeletionVector> actualDeleteMap =
@@ -94,4 +84,67 @@ public class DeletionVectorsIndexFileTest {
deletionVectorsIndexFile.delete(fileName);
assertThat(deletionVectorsIndexFile.exists(fileName)).isFalse();
}
+
+ @Test
+ public void testReadDvIndexWithCopiousDv() {
+ PathFactory pathFactory = getPathFactory();
+ DeletionVectorsIndexFile deletionVectorsIndexFile =
+ new DeletionVectorsIndexFile(LocalFileIO.create(),
pathFactory);
+
+ // write
+ Random random = new Random();
+ HashMap<String, DeletionVector> deleteMap = new HashMap<>();
+ for (int i = 0; i < 100000; i++) {
+ BitmapDeletionVector index = new BitmapDeletionVector();
+ index.delete(random.nextInt(1000000));
+ deleteMap.put(String.format("file%s.parquet", i), index);
+ }
+
+ Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
+ deletionVectorsIndexFile.write(deleteMap);
+
+ // read
+ Map<String, DeletionVector> dvs =
+
deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(),
pair.getRight());
+ assertThat(dvs.size()).isEqualTo(100000);
+ }
+
+ @Test
+ public void testReadDvIndexWithEnormousDv() {
+ PathFactory pathFactory = getPathFactory();
+ DeletionVectorsIndexFile deletionVectorsIndexFile =
+ new DeletionVectorsIndexFile(LocalFileIO.create(),
pathFactory);
+
+ // write
+ Random random = new Random();
+ HashMap<String, DeletionVector> deleteMap = new HashMap<>();
+ BitmapDeletionVector index = new BitmapDeletionVector();
+ // dv index's size is about 20M
+ for (int i = 0; i < 10000000; i++) {
+ index.delete(random.nextInt(Integer.MAX_VALUE));
+ }
+ deleteMap.put("largeFile.parquet", index);
+ Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
+ deletionVectorsIndexFile.write(deleteMap);
+
+ // read
+ Map<String, DeletionVector> dvs =
+
deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(),
pair.getRight());
+ assertThat(dvs.size()).isEqualTo(1);
+ }
+
+ private PathFactory getPathFactory() {
+ Path dir = new Path(tempPath.toUri());
+ return new PathFactory() {
+ @Override
+ public Path newPath() {
+ return new Path(dir, UUID.randomUUID().toString());
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return new Path(dir, fileName);
+ }
+ };
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
index 0e8cfc430..53058a05f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
@@ -24,7 +24,6 @@ import org.apache.paimon.utils.ObjectSerializerTestBase;
import org.apache.paimon.utils.Pair;
import java.util.LinkedHashMap;
-import java.util.Map;
import java.util.Random;
/** Test for {@link org.apache.paimon.index.IndexFileMetaSerializer}. */
@@ -49,7 +48,8 @@ public class IndexFileMetaSerializerTest extends
ObjectSerializerTestBase<IndexF
rnd.nextInt(),
rnd.nextInt());
} else {
- Map<String, Pair<Integer, Integer>> deletionVectorsRanges = new
LinkedHashMap<>();
+ LinkedHashMap<String, Pair<Integer, Integer>>
deletionVectorsRanges =
+ new LinkedHashMap<>();
deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(),
rnd.nextInt()));
deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(),
rnd.nextInt()));
return new IndexFileMeta(