This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 62016e483 [core] Expose VectorizedRecordIterator to accelerate append
table read (#3103)
62016e483 is described below
commit 62016e48352f4feec793d331648529cb0b44b25b
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 9 10:24:36 2024 +0800
[core] Expose VectorizedRecordIterator to accelerate append table read
(#3103)
---
.../apache/paimon/data/columnar/ColumnarRow.java | 21 ++++--------
.../paimon/data/columnar/ColumnarRowIterator.java | 40 ++++++++++++----------
.../data/columnar/VectorizedColumnBatch.java | 24 +++++++------
.../paimon/reader/VectorizedRecordIterator.java | 28 +++++++++++++++
.../apache/paimon/format/orc/OrcReaderFactory.java | 5 ++-
.../format/parquet/ParquetReaderFactory.java | 2 +-
6 files changed, 72 insertions(+), 48 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
index f92ecd2f8..8bfe26606 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
@@ -25,7 +25,6 @@ import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.columnar.BytesColumnVector.Bytes;
import org.apache.paimon.types.RowKind;
import java.io.Serializable;
@@ -58,6 +57,10 @@ public final class ColumnarRow implements InternalRow,
DataSetters, Serializable
this.rowId = 0;
}
+ public VectorizedColumnBatch batch() {
+ return vectorizedColumnBatch;
+ }
+
public void setRowId(int rowId) {
this.rowId = rowId;
}
@@ -119,8 +122,7 @@ public final class ColumnarRow implements InternalRow,
DataSetters, Serializable
@Override
public BinaryString getString(int pos) {
- Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
- return BinaryString.fromBytes(byteArray.data, byteArray.offset,
byteArray.len);
+ return vectorizedColumnBatch.getString(rowId, pos);
}
@Override
@@ -135,14 +137,7 @@ public final class ColumnarRow implements InternalRow,
DataSetters, Serializable
@Override
public byte[] getBinary(int pos) {
- Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
- if (byteArray.len == byteArray.data.length) {
- return byteArray.data;
- } else {
- byte[] ret = new byte[byteArray.len];
- System.arraycopy(byteArray.data, byteArray.offset, ret, 0,
byteArray.len);
- return ret;
- }
+ return vectorizedColumnBatch.getBinary(rowId, pos);
}
@Override
@@ -222,10 +217,6 @@ public final class ColumnarRow implements InternalRow,
DataSetters, Serializable
"ColumnarRowData do not support hashCode, please hash fields
one by one!");
}
- VectorizedColumnBatch vectorizedColumnBatch() {
- return vectorizedColumnBatch;
- }
-
public ColumnarRow copy(ColumnVector[] vectors) {
VectorizedColumnBatch vectorizedColumnBatchCopy =
vectorizedColumnBatch.copy(vectors);
ColumnarRow columnarRow = new ColumnarRow(vectorizedColumnBatchCopy,
rowId);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 13d706cf6..27e3d1c1d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.utils.RecyclableIterator;
import org.apache.paimon.utils.VectorMappingUtils;
@@ -33,40 +34,36 @@ import javax.annotation.Nullable;
* {@link ColumnarRow#setRowId}.
*/
public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
- implements FileRecordIterator<InternalRow> {
+ implements FileRecordIterator<InternalRow>, VectorizedRecordIterator {
private final Path filePath;
- private final ColumnarRow rowData;
+ private final ColumnarRow row;
private final Runnable recycler;
private int num;
private int nextPos;
- private long nextGlobalPos;
+ private long nextFilePos;
- public ColumnarRowIterator(Path filePath, ColumnarRow rowData, @Nullable
Runnable recycler) {
+ public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable
Runnable recycler) {
super(recycler);
this.filePath = filePath;
- this.rowData = rowData;
+ this.row = row;
this.recycler = recycler;
}
- /**
- * Reset the number of rows in the vectorized batch, the start position in
this batch and the
- * global position.
- */
- public void reset(int num, long nextGlobalPos) {
- this.num = num;
+ public void reset(long nextFilePos) {
+ this.num = row.batch().getNumRows();
this.nextPos = 0;
- this.nextGlobalPos = nextGlobalPos;
+ this.nextFilePos = nextFilePos;
}
@Nullable
@Override
public InternalRow next() {
if (nextPos < num) {
- rowData.setRowId(nextPos++);
- nextGlobalPos++;
- return rowData;
+ row.setRowId(nextPos++);
+ nextFilePos++;
+ return row;
} else {
return null;
}
@@ -74,7 +71,7 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
@Override
public long returnedPosition() {
- return nextGlobalPos - 1;
+ return nextFilePos - 1;
}
@Override
@@ -84,15 +81,15 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
public ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator =
- new ColumnarRowIterator(filePath, rowData.copy(vectors),
recycler);
- newIterator.reset(num, nextGlobalPos);
+ new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
+ newIterator.reset(nextFilePos);
return newIterator;
}
public ColumnarRowIterator mapping(
@Nullable PartitionInfo partitionInfo, @Nullable int[]
indexMapping) {
if (partitionInfo != null || indexMapping != null) {
- VectorizedColumnBatch vectorizedColumnBatch =
rowData.vectorizedColumnBatch();
+ VectorizedColumnBatch vectorizedColumnBatch = row.batch();
ColumnVector[] vectors = vectorizedColumnBatch.columns;
if (partitionInfo != null) {
vectors =
VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors);
@@ -104,4 +101,9 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
}
return this;
}
+
+ @Override
+ public VectorizedColumnBatch batch() {
+ return row.batch();
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
index 22882094e..dab535643 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
@@ -18,6 +18,7 @@
package org.apache.paimon.data.columnar;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -26,7 +27,6 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.BytesColumnVector.Bytes;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
/**
* A VectorizedColumnBatch is a set of rows, organized with each column as a
vector. It is the unit
@@ -35,6 +35,7 @@ import java.nio.charset.StandardCharsets;
* <p>{@code VectorizedColumnBatch}s are influenced by Apache Hive
VectorizedRowBatch.
*/
public class VectorizedColumnBatch implements Serializable {
+
private static final long serialVersionUID = 8180323238728166155L;
/**
@@ -44,7 +45,8 @@ public class VectorizedColumnBatch implements Serializable {
public static final int DEFAULT_SIZE = 2048;
private int numRows;
- public final org.apache.paimon.data.columnar.ColumnVector[] columns;
+
+ public final ColumnVector[] columns;
public VectorizedColumnBatch(ColumnVector[] vectors) {
this.columns = vectors;
@@ -94,22 +96,24 @@ public class VectorizedColumnBatch implements Serializable {
return ((DoubleColumnVector) columns[colId]).getDouble(rowId);
}
- public Bytes getByteArray(int rowId, int colId) {
- return ((BytesColumnVector) columns[colId]).getBytes(rowId);
+ public BinaryString getString(int rowId, int pos) {
+ Bytes byteArray = getByteArray(rowId, pos);
+ return BinaryString.fromBytes(byteArray.data, byteArray.offset,
byteArray.len);
}
- private byte[] getBytes(int rowId, int colId) {
- Bytes byteArray = getByteArray(rowId, colId);
+ public byte[] getBinary(int rowId, int pos) {
+ Bytes byteArray = getByteArray(rowId, pos);
if (byteArray.len == byteArray.data.length) {
return byteArray.data;
} else {
- return byteArray.getBytes();
+ byte[] ret = new byte[byteArray.len];
+ System.arraycopy(byteArray.data, byteArray.offset, ret, 0,
byteArray.len);
+ return ret;
}
}
- public String getString(int rowId, int colId) {
- Bytes byteArray = getByteArray(rowId, colId);
- return new String(byteArray.data, byteArray.offset, byteArray.len,
StandardCharsets.UTF_8);
+ public Bytes getByteArray(int rowId, int colId) {
+ return ((BytesColumnVector) columns[colId]).getBytes(rowId);
}
public Decimal getDecimal(int rowId, int colId, int precision, int scale) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/VectorizedRecordIterator.java
b/paimon-common/src/main/java/org/apache/paimon/reader/VectorizedRecordIterator.java
new file mode 100644
index 000000000..0288ffbce
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/VectorizedRecordIterator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+
+/** Wrap {@link RecordReader.RecordIterator} to support returning batch
directly. */
+public interface VectorizedRecordIterator extends
RecordReader.RecordIterator<InternalRow> {
+
+ VectorizedColumnBatch batch();
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 8cf95fad3..c084c7250 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -183,9 +183,8 @@ public class OrcReaderFactory implements
FormatReaderFactory {
VectorizedRowBatch orcBatch, long rowNumber) {
// no copying from the ORC column vectors to the Paimon columns
vectors necessary,
// because they point to the same data arrays internally design
- int batchSize = orcBatch.size;
- paimonColumnBatch.setNumRows(batchSize);
- result.reset(batchSize, rowNumber);
+ paimonColumnBatch.setNumRows(orcBatch.size);
+ result.reset(rowNumber);
return result;
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 004a0d655..64e1e2296 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -399,7 +399,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
}
public RecordIterator<InternalRow> convertAndGetIterator(long
rowNumber) {
- result.reset(columnarBatch.getNumRows(), rowNumber);
+ result.reset(rowNumber);
return result;
}
}