This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 629edfed7 [core] Support dv with avro format (#3105)
629edfed7 is described below
commit 629edfed75622f19fef55d620660530970118c94
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Mar 27 16:26:41 2024 +0800
[core] Support dv with avro format (#3105)
---
.../apache/paimon/reader/FileRecordIterator.java | 3 +-
.../paimon/utils/IteratorResultIterator.java | 26 ++++++++++++-
.../org/apache/paimon/schema/SchemaValidation.java | 6 ---
.../apache/paimon/format/avro/AvroBulkFormat.java | 9 ++++-
.../paimon/format/avro/AvroFileFormatTest.java | 43 ++++++++++++++++++++++
.../paimon/spark/sql/DeletionVectorTest.scala | 2 +-
6 files changed, 78 insertions(+), 11 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
index 0cef8cc00..d22b27053 100644
---
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
@@ -27,7 +27,8 @@ import java.io.IOException;
import java.util.function.Function;
/**
- * Wrap {@link RecordReader.RecordIterator} to support returning the record's
row position.
+ * Wrap {@link RecordReader.RecordIterator} to support returning the record's
row position and file
+ * Path.
*
* @param <T> The type of the record.
*/
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java
b/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java
index a2bffd31d..cb42a371f 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java
@@ -18,6 +18,8 @@
package org.apache.paimon.utils;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import javax.annotation.Nullable;
@@ -25,22 +27,42 @@ import javax.annotation.Nullable;
import java.util.Iterator;
/** A simple {@link RecordReader.RecordIterator} that returns the elements of
an iterator. */
-public final class IteratorResultIterator<E> extends RecyclableIterator<E> {
+public final class IteratorResultIterator<E> extends RecyclableIterator<E>
+ implements FileRecordIterator<E> {
private final Iterator<E> records;
+ private final Path filePath;
+ private long nextFilePos;
- public IteratorResultIterator(final Iterator<E> records, final @Nullable
Runnable recycler) {
+ public IteratorResultIterator(
+ final Iterator<E> records,
+ final @Nullable Runnable recycler,
+ final Path filePath,
+ long pos) {
super(recycler);
this.records = records;
+ this.filePath = filePath;
+ this.nextFilePos = pos;
}
@Nullable
@Override
public E next() {
if (records.hasNext()) {
+ nextFilePos++;
return records.next();
} else {
return null;
}
}
+
+ @Override
+ public long returnedPosition() {
+ return nextFilePos - 1;
+ }
+
+ @Override
+ public Path filePath() {
+ return filePath;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 7e59bb7b5..18c95cd2f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -49,8 +49,6 @@ import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
-import static org.apache.paimon.CoreOptions.FileFormatType.ORC;
-import static org.apache.paimon.CoreOptions.FileFormatType.PARQUET;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
@@ -473,10 +471,6 @@ public class SchemaValidation {
!schema.primaryKeys().isEmpty(),
"Deletion vectors mode is only supported for tables with
primary keys.");
- checkArgument(
- options.formatType().equals(ORC) ||
options.formatType().equals(PARQUET),
- "Deletion vectors mode is only supported for orc or parquet
file format now.");
-
checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
|| options.changelogProducer() ==
ChangelogProducer.LOOKUP,
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index abf82342a..083891214 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -61,6 +61,8 @@ public class AvroBulkFormat implements FormatReaderFactory {
private final long end;
private final Pool<Object> pool;
+ private final Path filePath;
+ private long currentRowPosition;
private AvroReader(FileIO fileIO, Path path, long fileSize) throws
IOException {
this.fileIO = fileIO;
@@ -69,6 +71,8 @@ public class AvroBulkFormat implements FormatReaderFactory {
this.reader.sync(0);
this.pool = new Pool<>(1);
this.pool.add(new Object());
+ this.filePath = path;
+ this.currentRowPosition = 0;
}
private DataFileReader<InternalRow> createReaderFromPath(Path path,
long fileSize)
@@ -101,8 +105,11 @@ public class AvroBulkFormat implements FormatReaderFactory
{
return null;
}
+ long rowPosition = currentRowPosition;
+ currentRowPosition += reader.getBlockCount();
Iterator<InternalRow> iterator = new
AvroBlockIterator(reader.getBlockCount(), reader);
- return new IteratorResultIterator<>(iterator, () ->
pool.recycler().recycle(ticket));
+ return new IteratorResultIterator<>(
+ iterator, () -> pool.recycler().recycle(ticket), filePath,
rowPosition);
}
private boolean readNextBlock() throws IOException {
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
index 0aa15f7f2..e9caf3b24 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
@@ -18,19 +18,35 @@
package org.apache.paimon.format.avro;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
/** Test for avro file format. */
public class AvroFileFormatTest {
+ @TempDir java.nio.file.Path tempPath;
+
private static AvroFileFormat fileFormat;
@BeforeAll
@@ -85,4 +101,31 @@ public class AvroFileFormatTest {
RowType rowType = new RowType(dataFields);
fileFormat.validateDataFields(rowType);
}
+
+ @Test
+ void testReadRowPosition() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT().notNull());
+ FileFormat format = new AvroFileFormat(new Options());
+
+ LocalFileIO fileIO = LocalFileIO.create();
+ Path file = new Path(new Path(tempPath.toUri()),
UUID.randomUUID().toString());
+
+ try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
+ FormatWriter writer =
format.createWriterFactory(rowType).create(out, null);
+ for (int i = 0; i < 1000000; i++) {
+ writer.addElement(GenericRow.of(i));
+ }
+ writer.flush();
+ writer.finish();
+ }
+
+ try (RecordReader<InternalRow> reader =
+ format.createReaderFactory(rowType)
+ .createReader(
+ new FormatReaderContext(
+ fileIO, file,
fileIO.getFileSize(file))); ) {
+ reader.forEachRemainingWithPosition(
+ (rowPosition, row) -> assertThat(row.getInt(0) ==
rowPosition).isTrue());
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index 887b6c8df..f45a22cae 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -101,7 +101,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
test("Paimon deletionVector: e2e random write") {
val bucket = Random.shuffle(Seq("-1", "1", "3")).head
val changelogProducer = Random.shuffle(Seq("none", "lookup")).head
- val format = Random.shuffle(Seq("orc", "parquet")).head
+ val format = Random.shuffle(Seq("orc", "parquet", "avro")).head
val batchSize = Random.nextInt(1024) + 1
val dvTbl = "deletion_vector_tbl"