This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 590a66fdf [arrow] Add ArrowFormatWriter: writing internal row to arrow 
vector (#3986)
590a66fdf is described below

commit 590a66fdf379d45a243c45a151e11c77068db39c
Author: YeJunHao <[email protected]>
AuthorDate: Mon Aug 19 11:14:45 2024 +0800

    [arrow] Add ArrowFormatWriter: writing internal row to arrow vector (#3986)
---
 paimon-arrow/pom.xml                               |  30 +++--
 .../java/org/apache/paimon/arrow/ArrowUtils.java   |  24 ++++
 .../apache/paimon/arrow/vector/ArrowCStruct.java   |  46 +++++++
 .../paimon/arrow/vector/ArrowFormatWriter.java     | 110 ++++++++++++++++
 .../paimon/arrow/vector/ArrowFormatWriterTest.java | 144 +++++++++++++++++++++
 .../apache/paimon/format/orc/OrcFileFormat.java    |   2 +-
 6 files changed, 346 insertions(+), 10 deletions(-)

diff --git a/paimon-arrow/pom.xml b/paimon-arrow/pom.xml
index 81183895c..78e1f7008 100644
--- a/paimon-arrow/pom.xml
+++ b/paimon-arrow/pom.xml
@@ -60,6 +60,27 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-c-data</artifactId>
+            <version>${arrow.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-memory-core</artifactId>
+            <version>${arrow.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-memory-unsafe</artifactId>
+            <version>${arrow.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- test dependencies -->
 
         <dependency>
@@ -76,13 +97,6 @@ under the License.
             <scope>test</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.arrow</groupId>
-            <artifactId>arrow-memory-unsafe</artifactId>
-            <version>${arrow.version}</version>
-            <scope>test</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
@@ -106,7 +120,5 @@ under the License.
             <version>${hadoop.version}</version>
             <scope>test</scope>
         </dependency>
-
     </dependencies>
-
 </project>
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index 462de41f1..235ab9531 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.arrow;
 
+import org.apache.paimon.arrow.vector.ArrowCStruct;
 import org.apache.paimon.arrow.writer.ArrowFieldWriter;
 import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
 import org.apache.paimon.data.Timestamp;
@@ -26,11 +27,15 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowType;
 
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
@@ -38,6 +43,8 @@ import org.apache.arrow.vector.types.pojo.Schema;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Arrays;
@@ -122,6 +129,23 @@ public class ArrowUtils {
                 : zoneCastedTimestampZoneCastToEpoch(timestamp, precision, 
castZoneId);
     }
 
+    public static ArrowCStruct serializeToCStruct(
+            VectorSchemaRoot vsr, ArrowArray array, ArrowSchema schema) {
+        BufferAllocator bufferAllocator = vsr.getVector(0).getAllocator();
+        Data.exportVectorSchemaRoot(bufferAllocator, vsr, null, array, schema);
+        return ArrowCStruct.of(array, schema);
+    }
+
+    public void serializeToIpc(VectorSchemaRoot vsr, OutputStream out) {
+        try (ArrowStreamWriter writer = new ArrowStreamWriter(vsr, null, out)) 
{
+            writer.start();
+            writer.writeBatch();
+            writer.end();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to serialize VectorSchemaRoot 
to IPC", e);
+        }
+    }
+
     private static long nonCastedTimestampToEpoch(Timestamp timestamp, int 
precision) {
         if (precision == 0) {
             return timestamp.getMillisecond() / 1000;
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowCStruct.java 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowCStruct.java
new file mode 100644
index 000000000..f6c0da0d9
--- /dev/null
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowCStruct.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow.vector;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+
+/** Cache for Arrow c struct. */
+public class ArrowCStruct {
+
+    private final ArrowArray array;
+    private final ArrowSchema schema;
+
+    public ArrowCStruct(ArrowArray array, ArrowSchema schema) {
+        this.array = array;
+        this.schema = schema;
+    }
+
+    public long arrayAddress() {
+        return array.memoryAddress();
+    }
+
+    public long schemaAddress() {
+        return schema.memoryAddress();
+    }
+
+    public static ArrowCStruct of(ArrowArray array, ArrowSchema schema) {
+        return new ArrowCStruct(array, schema);
+    }
+}
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
new file mode 100644
index 000000000..b148bfabe
--- /dev/null
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow.vector;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.arrow.ArrowUtils;
+import org.apache.paimon.arrow.writer.ArrowFieldWriter;
+import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.util.OversizedAllocationException;
+
+/** Write from {@link InternalRow} to {@link VectorSchemaRoot}. */
+public class ArrowFormatWriter implements AutoCloseable {
+
+    private final VectorSchemaRoot vectorSchemaRoot;
+    private final ArrowFieldWriter[] fieldWriters;
+    private final ArrowArray array;
+    private final ArrowSchema schema;
+
+    private final int batchSize;
+
+    private final RootAllocator allocator;
+    private int rowId;
+
+    public ArrowFormatWriter(RowType rowType, int writeBatchSize) {
+        allocator = new RootAllocator();
+        array = ArrowArray.allocateNew(allocator);
+        schema = ArrowSchema.allocateNew(allocator);
+
+        vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, 
allocator, false);
+
+        fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
+
+        for (int i = 0; i < fieldWriters.length; i++) {
+            fieldWriters[i] =
+                    rowType.getFields()
+                            .get(i)
+                            .type()
+                            .accept(ArrowFieldWriterFactoryVisitor.INSTANCE)
+                            .create(vectorSchemaRoot.getVector(i));
+        }
+
+        this.batchSize = writeBatchSize;
+    }
+
+    public ArrowCStruct flush() {
+        vectorSchemaRoot.setRowCount(rowId);
+        ArrowCStruct arrowCStruct = 
ArrowUtils.serializeToCStruct(vectorSchemaRoot, array, schema);
+        rowId = 0;
+        return arrowCStruct;
+    }
+
+    public boolean write(InternalRow currentRow) {
+        if (rowId >= batchSize) {
+            return false;
+        }
+        for (int i = 0; i < currentRow.getFieldCount(); i++) {
+            try {
+                fieldWriters[i].write(rowId, currentRow, i);
+            } catch (OversizedAllocationException | IndexOutOfBoundsException 
e) {
+                // maybe out of memory
+                return false;
+            }
+        }
+
+        rowId++;
+        return true;
+    }
+
+    public boolean empty() {
+        return rowId == 0;
+    }
+
+    @Override
+    public void close() {
+        array.release();
+        schema.release();
+        array.close();
+        schema.close();
+        vectorSchemaRoot.close();
+        allocator.close();
+    }
+
+    @VisibleForTesting
+    VectorSchemaRoot getVectorSchemaRoot() {
+        return vectorSchemaRoot;
+    }
+}
diff --git 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
new file mode 100644
index 000000000..1a256f91c
--- /dev/null
+++ 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow.vector;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Test for {@link org.apache.paimon.arrow.vector.ArrowFormatWriter}. */
+public class ArrowFormatWriterTest {
+
+    private static final Random RND = ThreadLocalRandom.current();
+    private static final boolean[] NULLABLE;
+    private static final RowType PRIMITIVE_TYPE;
+
+    static {
+        int cnt = 18;
+        NULLABLE = new boolean[cnt];
+        for (int i = 0; i < cnt; i++) {
+            NULLABLE[i] = RND.nextBoolean();
+        }
+
+        List<DataField> dataFields = new ArrayList<>();
+        dataFields.add(new DataField(0, "char", 
DataTypes.CHAR(10).copy(NULLABLE[0])));
+        dataFields.add(new DataField(1, "varchar", 
DataTypes.VARCHAR(20).copy(NULLABLE[1])));
+        dataFields.add(new DataField(2, "boolean", 
DataTypes.BOOLEAN().copy(NULLABLE[2])));
+        dataFields.add(new DataField(3, "binary", 
DataTypes.BINARY(10).copy(NULLABLE[3])));
+        dataFields.add(new DataField(4, "varbinary", 
DataTypes.VARBINARY(20).copy(NULLABLE[4])));
+        dataFields.add(new DataField(5, "decimal1", DataTypes.DECIMAL(2, 
2).copy(NULLABLE[5])));
+        dataFields.add(new DataField(6, "decimal2", DataTypes.DECIMAL(38, 
2).copy(NULLABLE[6])));
+        dataFields.add(new DataField(7, "decimal3", DataTypes.DECIMAL(10, 
1).copy(NULLABLE[7])));
+        dataFields.add(new DataField(8, "tinyint", 
DataTypes.TINYINT().copy(NULLABLE[8])));
+        dataFields.add(new DataField(9, "smallint", 
DataTypes.SMALLINT().copy(NULLABLE[9])));
+        dataFields.add(new DataField(10, "int", 
DataTypes.INT().copy(NULLABLE[10])));
+        dataFields.add(new DataField(11, "bigint", 
DataTypes.BIGINT().copy(NULLABLE[11])));
+        dataFields.add(new DataField(12, "float", 
DataTypes.FLOAT().copy(NULLABLE[12])));
+        dataFields.add(new DataField(13, "double", 
DataTypes.DOUBLE().copy(NULLABLE[13])));
+        dataFields.add(new DataField(14, "date", 
DataTypes.DATE().copy(NULLABLE[14])));
+        dataFields.add(new DataField(15, "timestamp3", 
DataTypes.TIMESTAMP(3).copy(NULLABLE[15])));
+        dataFields.add(new DataField(16, "timestamp6", 
DataTypes.TIMESTAMP(6).copy(NULLABLE[16])));
+        dataFields.add(
+                new DataField(
+                        17,
+                        "timestampLZ9",
+                        
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).copy(NULLABLE[17])));
+        PRIMITIVE_TYPE = new RowType(dataFields);
+    }
+
+    @Test
+    public void testWrite() {
+        try (ArrowFormatWriter writer = new ArrowFormatWriter(PRIMITIVE_TYPE, 
4096)) {
+            List<InternalRow> list = new ArrayList<>();
+            for (int i = 0; i < 1000; i++) {
+                list.add(GenericRow.of(randomRowValues(null)));
+            }
+
+            list.forEach(writer::write);
+
+            writer.flush();
+            VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();
+
+            for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+                Assertions.assertThat(list.get(i).getString(0).toString())
+                        
.isEqualTo(vectorSchemaRoot.getVector(0).getObject(i).toString());
+            }
+
+            for (int i = 0; i < vectorSchemaRoot.getRowCount(); i++) {
+                Assertions.assertThat(list.get(i).getInt(14))
+                        
.isEqualTo(vectorSchemaRoot.getVector(14).getObject(i));
+            }
+        }
+    }
+
+    private Object[] randomRowValues(boolean[] nullable) {
+        Object[] values = new Object[18];
+        values[0] = BinaryString.fromString(StringUtils.getRandomString(RND, 
10, 10));
+        values[1] = BinaryString.fromString(StringUtils.getRandomString(RND, 
1, 20));
+        values[2] = RND.nextBoolean();
+        values[3] = randomBytes(10, 10);
+        values[4] = randomBytes(1, 20);
+        values[5] = Decimal.fromBigDecimal(new BigDecimal("0.22"), 2, 2);
+        values[6] = Decimal.fromBigDecimal(new BigDecimal("12312455.22"), 38, 
2);
+        values[7] = Decimal.fromBigDecimal(new BigDecimal("12455.1"), 10, 1);
+        values[8] = (byte) RND.nextInt(Byte.MAX_VALUE);
+        values[9] = (short) RND.nextInt(Short.MAX_VALUE);
+        values[10] = RND.nextInt();
+        values[11] = RND.nextLong();
+        values[12] = RND.nextFloat();
+        values[13] = RND.nextDouble();
+        values[14] = RND.nextInt();
+        values[15] = Timestamp.fromEpochMillis(RND.nextInt(1000));
+        values[16] = Timestamp.fromEpochMillis(RND.nextInt(1000), 
RND.nextInt(1000) * 1000);
+        values[17] = Timestamp.fromEpochMillis(RND.nextInt(1000), 
RND.nextInt(1000_000));
+
+        for (int i = 0; i < 18; i++) {
+            if (nullable != null && nullable[i] && RND.nextBoolean()) {
+                values[i] = null;
+            }
+        }
+
+        return values;
+    }
+
+    private byte[] randomBytes(int minLength, int maxLength) {
+        int len = RND.nextInt(maxLength - minLength + 1) + minLength;
+        byte[] bytes = new byte[len];
+        for (int i = 0; i < len; i++) {
+            bytes[i] = (byte) RND.nextInt(10);
+        }
+        return bytes;
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index bfc6a447d..de28487b7 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -165,7 +165,7 @@ public class OrcFileFormat extends FileFormat {
         return orcProperties;
     }
 
-    private static DataType refineDataType(DataType type) {
+    public static DataType refineDataType(DataType type) {
         switch (type.getTypeRoot()) {
             case BINARY:
             case VARBINARY:

Reply via email to