This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 629edfed7 [core] Support dv with avro format (#3105)
629edfed7 is described below

commit 629edfed75622f19fef55d620660530970118c94
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Mar 27 16:26:41 2024 +0800

    [core] Support dv with avro format (#3105)
---
 .../apache/paimon/reader/FileRecordIterator.java   |  3 +-
 .../paimon/utils/IteratorResultIterator.java       | 26 ++++++++++++-
 .../org/apache/paimon/schema/SchemaValidation.java |  6 ---
 .../apache/paimon/format/avro/AvroBulkFormat.java  |  9 ++++-
 .../paimon/format/avro/AvroFileFormatTest.java     | 43 ++++++++++++++++++++++
 .../paimon/spark/sql/DeletionVectorTest.scala      |  2 +-
 6 files changed, 78 insertions(+), 11 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
index 0cef8cc00..d22b27053 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
@@ -27,7 +27,8 @@ import java.io.IOException;
 import java.util.function.Function;
 
 /**
- * Wrap {@link RecordReader.RecordIterator} to support returning the record's 
row position.
+ * Wrap {@link RecordReader.RecordIterator} to support returning the record's 
row position and file
+ * Path.
  *
  * @param <T> The type of the record.
  */
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java
index a2bffd31d..cb42a371f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.utils;
 
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.RecordReader;
 
 import javax.annotation.Nullable;
@@ -25,22 +27,42 @@ import javax.annotation.Nullable;
 import java.util.Iterator;
 
 /** A simple {@link RecordReader.RecordIterator} that returns the elements of 
an iterator. */
-public final class IteratorResultIterator<E> extends RecyclableIterator<E> {
+public final class IteratorResultIterator<E> extends RecyclableIterator<E>
+        implements FileRecordIterator<E> {
 
     private final Iterator<E> records;
+    private final Path filePath;
+    private long nextFilePos;
 
-    public IteratorResultIterator(final Iterator<E> records, final @Nullable 
Runnable recycler) {
+    public IteratorResultIterator(
+            final Iterator<E> records,
+            final @Nullable Runnable recycler,
+            final Path filePath,
+            long pos) {
         super(recycler);
         this.records = records;
+        this.filePath = filePath;
+        this.nextFilePos = pos;
     }
 
     @Nullable
     @Override
     public E next() {
         if (records.hasNext()) {
+            nextFilePos++;
             return records.next();
         } else {
             return null;
         }
     }
+
+    @Override
+    public long returnedPosition() {
+        return nextFilePos - 1;
+    }
+
+    @Override
+    public Path filePath() {
+        return filePath;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 7e59bb7b5..18c95cd2f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -49,8 +49,6 @@ import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
-import static org.apache.paimon.CoreOptions.FileFormatType.ORC;
-import static org.apache.paimon.CoreOptions.FileFormatType.PARQUET;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
 import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
@@ -473,10 +471,6 @@ public class SchemaValidation {
                 !schema.primaryKeys().isEmpty(),
                 "Deletion vectors mode is only supported for tables with 
primary keys.");
 
-        checkArgument(
-                options.formatType().equals(ORC) || 
options.formatType().equals(PARQUET),
-                "Deletion vectors mode is only supported for orc or parquet 
file format now.");
-
         checkArgument(
                 options.changelogProducer() == ChangelogProducer.NONE
                         || options.changelogProducer() == 
ChangelogProducer.LOOKUP,
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index abf82342a..083891214 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -61,6 +61,8 @@ public class AvroBulkFormat implements FormatReaderFactory {
 
         private final long end;
         private final Pool<Object> pool;
+        private final Path filePath;
+        private long currentRowPosition;
 
         private AvroReader(FileIO fileIO, Path path, long fileSize) throws 
IOException {
             this.fileIO = fileIO;
@@ -69,6 +71,8 @@ public class AvroBulkFormat implements FormatReaderFactory {
             this.reader.sync(0);
             this.pool = new Pool<>(1);
             this.pool.add(new Object());
+            this.filePath = path;
+            this.currentRowPosition = 0;
         }
 
         private DataFileReader<InternalRow> createReaderFromPath(Path path, 
long fileSize)
@@ -101,8 +105,11 @@ public class AvroBulkFormat implements FormatReaderFactory 
{
                 return null;
             }
 
+            long rowPosition = currentRowPosition;
+            currentRowPosition += reader.getBlockCount();
             Iterator<InternalRow> iterator = new 
AvroBlockIterator(reader.getBlockCount(), reader);
-            return new IteratorResultIterator<>(iterator, () -> 
pool.recycler().recycle(ticket));
+            return new IteratorResultIterator<>(
+                    iterator, () -> pool.recycler().recycle(ticket), filePath, 
rowPosition);
         }
 
         private boolean readNextBlock() throws IOException {
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
index 0aa15f7f2..e9caf3b24 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
@@ -18,19 +18,35 @@
 
 package org.apache.paimon.format.avro;
 
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for avro file format. */
 public class AvroFileFormatTest {
 
+    @TempDir java.nio.file.Path tempPath;
+
     private static AvroFileFormat fileFormat;
 
     @BeforeAll
@@ -85,4 +101,31 @@ public class AvroFileFormatTest {
         RowType rowType = new RowType(dataFields);
         fileFormat.validateDataFields(rowType);
     }
+
+    @Test
+    void testReadRowPosition() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT().notNull());
+        FileFormat format = new AvroFileFormat(new Options());
+
+        LocalFileIO fileIO = LocalFileIO.create();
+        Path file = new Path(new Path(tempPath.toUri()), 
UUID.randomUUID().toString());
+
+        try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
+            FormatWriter writer = 
format.createWriterFactory(rowType).create(out, null);
+            for (int i = 0; i < 1000000; i++) {
+                writer.addElement(GenericRow.of(i));
+            }
+            writer.flush();
+            writer.finish();
+        }
+
+        try (RecordReader<InternalRow> reader =
+                format.createReaderFactory(rowType)
+                        .createReader(
+                                new FormatReaderContext(
+                                        fileIO, file, 
fileIO.getFileSize(file))); ) {
+            reader.forEachRemainingWithPosition(
+                    (rowPosition, row) -> assertThat(row.getInt(0) == 
rowPosition).isTrue());
+        }
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index 887b6c8df..f45a22cae 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -101,7 +101,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
   test("Paimon deletionVector: e2e random write") {
     val bucket = Random.shuffle(Seq("-1", "1", "3")).head
     val changelogProducer = Random.shuffle(Seq("none", "lookup")).head
-    val format = Random.shuffle(Seq("orc", "parquet")).head
+    val format = Random.shuffle(Seq("orc", "parquet", "avro")).head
     val batchSize = Random.nextInt(1024) + 1
 
     val dvTbl = "deletion_vector_tbl"

Reply via email to