This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 62016e483 [core] Expose VectorizedRecordIterator to accelerate append 
table read (#3103)
62016e483 is described below

commit 62016e48352f4feec793d331648529cb0b44b25b
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 9 10:24:36 2024 +0800

    [core] Expose VectorizedRecordIterator to accelerate append table read 
(#3103)
---
 .../apache/paimon/data/columnar/ColumnarRow.java   | 21 ++++--------
 .../paimon/data/columnar/ColumnarRowIterator.java  | 40 ++++++++++++----------
 .../data/columnar/VectorizedColumnBatch.java       | 24 +++++++------
 .../paimon/reader/VectorizedRecordIterator.java    | 28 +++++++++++++++
 .../apache/paimon/format/orc/OrcReaderFactory.java |  5 ++-
 .../format/parquet/ParquetReaderFactory.java       |  2 +-
 6 files changed, 72 insertions(+), 48 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
index f92ecd2f8..8bfe26606 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
@@ -25,7 +25,6 @@ import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.columnar.BytesColumnVector.Bytes;
 import org.apache.paimon.types.RowKind;
 
 import java.io.Serializable;
@@ -58,6 +57,10 @@ public final class ColumnarRow implements InternalRow, 
DataSetters, Serializable
         this.rowId = 0;
     }
 
+    public VectorizedColumnBatch batch() {
+        return vectorizedColumnBatch;
+    }
+
     public void setRowId(int rowId) {
         this.rowId = rowId;
     }
@@ -119,8 +122,7 @@ public final class ColumnarRow implements InternalRow, 
DataSetters, Serializable
 
     @Override
     public BinaryString getString(int pos) {
-        Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
-        return BinaryString.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+        return vectorizedColumnBatch.getString(rowId, pos);
     }
 
     @Override
@@ -135,14 +137,7 @@ public final class ColumnarRow implements InternalRow, 
DataSetters, Serializable
 
     @Override
     public byte[] getBinary(int pos) {
-        Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
-        if (byteArray.len == byteArray.data.length) {
-            return byteArray.data;
-        } else {
-            byte[] ret = new byte[byteArray.len];
-            System.arraycopy(byteArray.data, byteArray.offset, ret, 0, 
byteArray.len);
-            return ret;
-        }
+        return vectorizedColumnBatch.getBinary(rowId, pos);
     }
 
     @Override
@@ -222,10 +217,6 @@ public final class ColumnarRow implements InternalRow, 
DataSetters, Serializable
                 "ColumnarRowData do not support hashCode, please hash fields 
one by one!");
     }
 
-    VectorizedColumnBatch vectorizedColumnBatch() {
-        return vectorizedColumnBatch;
-    }
-
     public ColumnarRow copy(ColumnVector[] vectors) {
         VectorizedColumnBatch vectorizedColumnBatchCopy = 
vectorizedColumnBatch.copy(vectors);
         ColumnarRow columnarRow = new ColumnarRow(vectorizedColumnBatchCopy, 
rowId);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 13d706cf6..27e3d1c1d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.VectorizedRecordIterator;
 import org.apache.paimon.utils.RecyclableIterator;
 import org.apache.paimon.utils.VectorMappingUtils;
 
@@ -33,40 +34,36 @@ import javax.annotation.Nullable;
  * {@link ColumnarRow#setRowId}.
  */
 public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
-        implements FileRecordIterator<InternalRow> {
+        implements FileRecordIterator<InternalRow>, VectorizedRecordIterator {
 
     private final Path filePath;
-    private final ColumnarRow rowData;
+    private final ColumnarRow row;
     private final Runnable recycler;
 
     private int num;
     private int nextPos;
-    private long nextGlobalPos;
+    private long nextFilePos;
 
-    public ColumnarRowIterator(Path filePath, ColumnarRow rowData, @Nullable 
Runnable recycler) {
+    public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable 
Runnable recycler) {
         super(recycler);
         this.filePath = filePath;
-        this.rowData = rowData;
+        this.row = row;
         this.recycler = recycler;
     }
 
-    /**
-     * Reset the number of rows in the vectorized batch, the start position in 
this batch and the
-     * global position.
-     */
-    public void reset(int num, long nextGlobalPos) {
-        this.num = num;
+    public void reset(long nextFilePos) {
+        this.num = row.batch().getNumRows();
         this.nextPos = 0;
-        this.nextGlobalPos = nextGlobalPos;
+        this.nextFilePos = nextFilePos;
     }
 
     @Nullable
     @Override
     public InternalRow next() {
         if (nextPos < num) {
-            rowData.setRowId(nextPos++);
-            nextGlobalPos++;
-            return rowData;
+            row.setRowId(nextPos++);
+            nextFilePos++;
+            return row;
         } else {
             return null;
         }
@@ -74,7 +71,7 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
 
     @Override
     public long returnedPosition() {
-        return nextGlobalPos - 1;
+        return nextFilePos - 1;
     }
 
     @Override
@@ -84,15 +81,15 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
 
     public ColumnarRowIterator copy(ColumnVector[] vectors) {
         ColumnarRowIterator newIterator =
-                new ColumnarRowIterator(filePath, rowData.copy(vectors), 
recycler);
-        newIterator.reset(num, nextGlobalPos);
+                new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
+        newIterator.reset(nextFilePos);
         return newIterator;
     }
 
     public ColumnarRowIterator mapping(
             @Nullable PartitionInfo partitionInfo, @Nullable int[] 
indexMapping) {
         if (partitionInfo != null || indexMapping != null) {
-            VectorizedColumnBatch vectorizedColumnBatch = 
rowData.vectorizedColumnBatch();
+            VectorizedColumnBatch vectorizedColumnBatch = row.batch();
             ColumnVector[] vectors = vectorizedColumnBatch.columns;
             if (partitionInfo != null) {
                 vectors = 
VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors);
@@ -104,4 +101,9 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
         }
         return this;
     }
+
+    @Override
+    public VectorizedColumnBatch batch() {
+        return row.batch();
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
index 22882094e..dab535643 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.data.columnar;
 
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
@@ -26,7 +27,6 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.columnar.BytesColumnVector.Bytes;
 
 import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
 
 /**
  * A VectorizedColumnBatch is a set of rows, organized with each column as a 
vector. It is the unit
@@ -35,6 +35,7 @@ import java.nio.charset.StandardCharsets;
  * <p>{@code VectorizedColumnBatch}s are influenced by Apache Hive 
VectorizedRowBatch.
  */
 public class VectorizedColumnBatch implements Serializable {
+
     private static final long serialVersionUID = 8180323238728166155L;
 
     /**
@@ -44,7 +45,8 @@ public class VectorizedColumnBatch implements Serializable {
     public static final int DEFAULT_SIZE = 2048;
 
     private int numRows;
-    public final org.apache.paimon.data.columnar.ColumnVector[] columns;
+
+    public final ColumnVector[] columns;
 
     public VectorizedColumnBatch(ColumnVector[] vectors) {
         this.columns = vectors;
@@ -94,22 +96,24 @@ public class VectorizedColumnBatch implements Serializable {
         return ((DoubleColumnVector) columns[colId]).getDouble(rowId);
     }
 
-    public Bytes getByteArray(int rowId, int colId) {
-        return ((BytesColumnVector) columns[colId]).getBytes(rowId);
+    public BinaryString getString(int rowId, int pos) {
+        Bytes byteArray = getByteArray(rowId, pos);
+        return BinaryString.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
     }
 
-    private byte[] getBytes(int rowId, int colId) {
-        Bytes byteArray = getByteArray(rowId, colId);
+    public byte[] getBinary(int rowId, int pos) {
+        Bytes byteArray = getByteArray(rowId, pos);
         if (byteArray.len == byteArray.data.length) {
             return byteArray.data;
         } else {
-            return byteArray.getBytes();
+            byte[] ret = new byte[byteArray.len];
+            System.arraycopy(byteArray.data, byteArray.offset, ret, 0, 
byteArray.len);
+            return ret;
         }
     }
 
-    public String getString(int rowId, int colId) {
-        Bytes byteArray = getByteArray(rowId, colId);
-        return new String(byteArray.data, byteArray.offset, byteArray.len, 
StandardCharsets.UTF_8);
+    public Bytes getByteArray(int rowId, int colId) {
+        return ((BytesColumnVector) columns[colId]).getBytes(rowId);
     }
 
     public Decimal getDecimal(int rowId, int colId, int precision, int scale) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/VectorizedRecordIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/reader/VectorizedRecordIterator.java
new file mode 100644
index 000000000..0288ffbce
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/VectorizedRecordIterator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+
+/** Wrap {@link RecordReader.RecordIterator} to support returning batch 
directly. */
+public interface VectorizedRecordIterator extends 
RecordReader.RecordIterator<InternalRow> {
+
+    VectorizedColumnBatch batch();
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 8cf95fad3..c084c7250 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -183,9 +183,8 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
                 VectorizedRowBatch orcBatch, long rowNumber) {
             // no copying from the ORC column vectors to the Paimon columns 
vectors necessary,
             // because they point to the same data arrays internally design
-            int batchSize = orcBatch.size;
-            paimonColumnBatch.setNumRows(batchSize);
-            result.reset(batchSize, rowNumber);
+            paimonColumnBatch.setNumRows(orcBatch.size);
+            result.reset(rowNumber);
             return result;
         }
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 004a0d655..64e1e2296 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -399,7 +399,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         }
 
         public RecordIterator<InternalRow> convertAndGetIterator(long 
rowNumber) {
-            result.reset(columnarBatch.getNumRows(), rowNumber);
+            result.reset(rowNumber);
             return result;
         }
     }

Reply via email to