This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 590a66fdf [arrow] Add ArrowFormatWriter: writing internal row to arrow
vector (#3986)
590a66fdf is described below
commit 590a66fdf379d45a243c45a151e11c77068db39c
Author: YeJunHao <[email protected]>
AuthorDate: Mon Aug 19 11:14:45 2024 +0800
[arrow] Add ArrowFormatWriter: writing internal row to arrow vector (#3986)
---
paimon-arrow/pom.xml | 30 +++--
.../java/org/apache/paimon/arrow/ArrowUtils.java | 24 ++++
.../apache/paimon/arrow/vector/ArrowCStruct.java | 46 +++++++
.../paimon/arrow/vector/ArrowFormatWriter.java | 110 ++++++++++++++++
.../paimon/arrow/vector/ArrowFormatWriterTest.java | 144 +++++++++++++++++++++
.../apache/paimon/format/orc/OrcFileFormat.java | 2 +-
6 files changed, 346 insertions(+), 10 deletions(-)
diff --git a/paimon-arrow/pom.xml b/paimon-arrow/pom.xml
index 81183895c..78e1f7008 100644
--- a/paimon-arrow/pom.xml
+++ b/paimon-arrow/pom.xml
@@ -60,6 +60,27 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-c-data</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-unsafe</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
@@ -76,13 +97,6 @@ under the License.
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-memory-unsafe</artifactId>
- <version>${arrow.version}</version>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
@@ -106,7 +120,5 @@ under the License.
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
-
</dependencies>
-
</project>
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index 462de41f1..235ab9531 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -18,6 +18,7 @@
package org.apache.paimon.arrow;
+import org.apache.paimon.arrow.vector.ArrowCStruct;
import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
import org.apache.paimon.data.Timestamp;
@@ -26,11 +27,15 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
@@ -38,6 +43,8 @@ import org.apache.arrow.vector.types.pojo.Schema;
import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.OutputStream;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Arrays;
@@ -122,6 +129,23 @@ public class ArrowUtils {
: zoneCastedTimestampZoneCastToEpoch(timestamp, precision,
castZoneId);
}
+ public static ArrowCStruct serializeToCStruct(
+ VectorSchemaRoot vsr, ArrowArray array, ArrowSchema schema) {
+ BufferAllocator bufferAllocator = vsr.getVector(0).getAllocator();
+ Data.exportVectorSchemaRoot(bufferAllocator, vsr, null, array, schema);
+ return ArrowCStruct.of(array, schema);
+ }
+
+ public void serializeToIpc(VectorSchemaRoot vsr, OutputStream out) {
+ try (ArrowStreamWriter writer = new ArrowStreamWriter(vsr, null, out))
{
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to serialize VectorSchemaRoot
to IPC", e);
+ }
+ }
+
private static long nonCastedTimestampToEpoch(Timestamp timestamp, int
precision) {
if (precision == 0) {
return timestamp.getMillisecond() / 1000;
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowCStruct.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowCStruct.java
new file mode 100644
index 000000000..f6c0da0d9
--- /dev/null
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowCStruct.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow.vector;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+
+/** Cache for Arrow c struct. */
+public class ArrowCStruct {
+
+ private final ArrowArray array;
+ private final ArrowSchema schema;
+
+ public ArrowCStruct(ArrowArray array, ArrowSchema schema) {
+ this.array = array;
+ this.schema = schema;
+ }
+
+ public long arrayAddress() {
+ return array.memoryAddress();
+ }
+
+ public long schemaAddress() {
+ return schema.memoryAddress();
+ }
+
+ public static ArrowCStruct of(ArrowArray array, ArrowSchema schema) {
+ return new ArrowCStruct(array, schema);
+ }
+}
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
new file mode 100644
index 000000000..b148bfabe
--- /dev/null
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow.vector;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.arrow.ArrowUtils;
+import org.apache.paimon.arrow.writer.ArrowFieldWriter;
+import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.util.OversizedAllocationException;
+
+/** Write from {@link InternalRow} to {@link VectorSchemaRoot}. */
+public class ArrowFormatWriter implements AutoCloseable {
+
+ private final VectorSchemaRoot vectorSchemaRoot;
+ private final ArrowFieldWriter[] fieldWriters;
+ private final ArrowArray array;
+ private final ArrowSchema schema;
+
+ private final int batchSize;
+
+ private final RootAllocator allocator;
+ private int rowId;
+
+ public ArrowFormatWriter(RowType rowType, int writeBatchSize) {
+ allocator = new RootAllocator();
+ array = ArrowArray.allocateNew(allocator);
+ schema = ArrowSchema.allocateNew(allocator);
+
+ vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType,
allocator, false);
+
+ fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
+
+ for (int i = 0; i < fieldWriters.length; i++) {
+ fieldWriters[i] =
+ rowType.getFields()
+ .get(i)
+ .type()
+ .accept(ArrowFieldWriterFactoryVisitor.INSTANCE)
+ .create(vectorSchemaRoot.getVector(i));
+ }
+
+ this.batchSize = writeBatchSize;
+ }
+
+ public ArrowCStruct flush() {
+ vectorSchemaRoot.setRowCount(rowId);
+ ArrowCStruct arrowCStruct =
ArrowUtils.serializeToCStruct(vectorSchemaRoot, array, schema);
+ rowId = 0;
+ return arrowCStruct;
+ }
+
+ public boolean write(InternalRow currentRow) {
+ if (rowId >= batchSize) {
+ return false;
+ }
+ for (int i = 0; i < currentRow.getFieldCount(); i++) {
+ try {
+ fieldWriters[i].write(rowId, currentRow, i);
+ } catch (OversizedAllocationException | IndexOutOfBoundsException
e) {
+ // maybe out of memory
+ return false;
+ }
+ }
+
+ rowId++;
+ return true;
+ }
+
+ public boolean empty() {
+ return rowId == 0;
+ }
+
+ @Override
+ public void close() {
+ array.release();
+ schema.release();
+ array.close();
+ schema.close();
+ vectorSchemaRoot.close();
+ allocator.close();
+ }
+
+ @VisibleForTesting
+ VectorSchemaRoot getVectorSchemaRoot() {
+ return vectorSchemaRoot;
+ }
+}
diff --git
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
new file mode 100644
index 000000000..1a256f91c
--- /dev/null
+++
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow.vector;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Test for {@link org.apache.paimon.arrow.vector.ArrowFormatWriter}. */
+public class ArrowFormatWriterTest {
+
+ private static final Random RND = ThreadLocalRandom.current();
+ private static final boolean[] NULLABLE;
+ private static final RowType PRIMITIVE_TYPE;
+
+ static {
+ int cnt = 18;
+ NULLABLE = new boolean[cnt];
+ for (int i = 0; i < cnt; i++) {
+ NULLABLE[i] = RND.nextBoolean();
+ }
+
+ List<DataField> dataFields = new ArrayList<>();
+ dataFields.add(new DataField(0, "char",
DataTypes.CHAR(10).copy(NULLABLE[0])));
+ dataFields.add(new DataField(1, "varchar",
DataTypes.VARCHAR(20).copy(NULLABLE[1])));
+ dataFields.add(new DataField(2, "boolean",
DataTypes.BOOLEAN().copy(NULLABLE[2])));
+ dataFields.add(new DataField(3, "binary",
DataTypes.BINARY(10).copy(NULLABLE[3])));
+ dataFields.add(new DataField(4, "varbinary",
DataTypes.VARBINARY(20).copy(NULLABLE[4])));
+ dataFields.add(new DataField(5, "decimal1", DataTypes.DECIMAL(2,
2).copy(NULLABLE[5])));
+ dataFields.add(new DataField(6, "decimal2", DataTypes.DECIMAL(38,
2).copy(NULLABLE[6])));
+ dataFields.add(new DataField(7, "decimal3", DataTypes.DECIMAL(10,
1).copy(NULLABLE[7])));
+ dataFields.add(new DataField(8, "tinyint",
DataTypes.TINYINT().copy(NULLABLE[8])));
+ dataFields.add(new DataField(9, "smallint",
DataTypes.SMALLINT().copy(NULLABLE[9])));
+ dataFields.add(new DataField(10, "int",
DataTypes.INT().copy(NULLABLE[10])));
+ dataFields.add(new DataField(11, "bigint",
DataTypes.BIGINT().copy(NULLABLE[11])));
+ dataFields.add(new DataField(12, "float",
DataTypes.FLOAT().copy(NULLABLE[12])));
+ dataFields.add(new DataField(13, "double",
DataTypes.DOUBLE().copy(NULLABLE[13])));
+ dataFields.add(new DataField(14, "date",
DataTypes.DATE().copy(NULLABLE[14])));
+ dataFields.add(new DataField(15, "timestamp3",
DataTypes.TIMESTAMP(3).copy(NULLABLE[15])));
+ dataFields.add(new DataField(16, "timestamp6",
DataTypes.TIMESTAMP(6).copy(NULLABLE[16])));
+ dataFields.add(
+ new DataField(
+ 17,
+ "timestampLZ9",
+
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).copy(NULLABLE[17])));
+ PRIMITIVE_TYPE = new RowType(dataFields);
+ }
+
+ @Test
+ public void testWrite() {
+ try (ArrowFormatWriter writer = new ArrowFormatWriter(PRIMITIVE_TYPE,
4096)) {
+ List<InternalRow> list = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ list.add(GenericRow.of(randomRowValues(null)));
+ }
+
+ list.forEach(writer::write);
+
+ writer.flush();
+ VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();
+
+ for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+ Assertions.assertThat(list.get(i).getString(0).toString())
+
.isEqualTo(vectorSchemaRoot.getVector(0).getObject(i).toString());
+ }
+
+ for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+ Assertions.assertThat(list.get(i).getInt(14))
+
.isEqualTo(vectorSchemaRoot.getVector(14).getObject(i));
+ }
+ }
+ }
+
+ private Object[] randomRowValues(boolean[] nullable) {
+ Object[] values = new Object[18];
+ values[0] = BinaryString.fromString(StringUtils.getRandomString(RND,
10, 10));
+ values[1] = BinaryString.fromString(StringUtils.getRandomString(RND,
1, 20));
+ values[2] = RND.nextBoolean();
+ values[3] = randomBytes(10, 10);
+ values[4] = randomBytes(1, 20);
+ values[5] = Decimal.fromBigDecimal(new BigDecimal("0.22"), 2, 2);
+ values[6] = Decimal.fromBigDecimal(new BigDecimal("12312455.22"), 38,
2);
+ values[7] = Decimal.fromBigDecimal(new BigDecimal("12455.1"), 10, 1);
+ values[8] = (byte) RND.nextInt(Byte.MAX_VALUE);
+ values[9] = (short) RND.nextInt(Short.MAX_VALUE);
+ values[10] = RND.nextInt();
+ values[11] = RND.nextLong();
+ values[12] = RND.nextFloat();
+ values[13] = RND.nextDouble();
+ values[14] = RND.nextInt();
+ values[15] = Timestamp.fromEpochMillis(RND.nextInt(1000));
+ values[16] = Timestamp.fromEpochMillis(RND.nextInt(1000),
RND.nextInt(1000) * 1000);
+ values[17] = Timestamp.fromEpochMillis(RND.nextInt(1000),
RND.nextInt(1000_000));
+
+ for (int i = 0; i < 18; i++) {
+ if (nullable != null && nullable[i] && RND.nextBoolean()) {
+ values[i] = null;
+ }
+ }
+
+ return values;
+ }
+
+ private byte[] randomBytes(int minLength, int maxLength) {
+ int len = RND.nextInt(maxLength - minLength + 1) + minLength;
+ byte[] bytes = new byte[len];
+ for (int i = 0; i < len; i++) {
+ bytes[i] = (byte) RND.nextInt(10);
+ }
+ return bytes;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index bfc6a447d..de28487b7 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -165,7 +165,7 @@ public class OrcFileFormat extends FileFormat {
return orcProperties;
}
- private static DataType refineDataType(DataType type) {
+ public static DataType refineDataType(DataType type) {
switch (type.getTypeRoot()) {
case BINARY:
case VARBINARY: