This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 36ae4ae01 [core] Optimize write performance (#3039)
36ae4ae01 is described below

commit 36ae4ae01f951c7698f7595504b8b17f12f35c94
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 19 14:46:33 2024 +0800

    [core] Optimize write performance (#3039)
---
 .../paimon/benchmark/TableWriterBenchmark.java     |   2 +-
 .../apache/paimon/data/AbstractBinaryWriter.java   |   6 -
 .../data/serializer/InternalRowSerializer.java     |  28 +-
 .../data/serializer/InternalRowSerializerTest.java |  12 +-
 .../paimon/format/orc/writer/FieldWriter.java      |  28 ++
 .../format/orc/writer/FieldWriterFactory.java      | 310 +++++++++++++++++++++
 .../format/orc/writer/RowDataVectorizer.java       | 266 ++----------------
 7 files changed, 373 insertions(+), 279 deletions(-)

diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
index 04d1d7342..9812afebb 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
@@ -100,7 +100,7 @@ public class TableWriterBenchmark extends TableBenchmark {
          * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
          * orc:               Best/Avg Time(ms)    Row Rate(K/s)      Per 
Row(ns)   Relative
          * 
---------------------------------------------------------------------------------
-         * orc_write           36489 / 36697             82.2          12163.1 
      1.0X
+         * orc_write           31812 / 32223             94.3          10604.1 
      1.0X
          */
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
index f8de8538e..b8bc3bd93 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
@@ -177,12 +177,6 @@ abstract class AbstractBinaryWriter implements 
BinaryWriter {
         }
     }
 
-    private void zeroBytes(int offset, int size) {
-        for (int i = offset; i < offset + size; i++) {
-            segment.put(i, (byte) 0);
-        }
-    }
-
     protected void zeroOutPaddingBytes(int numBytes) {
         if ((numBytes & 0x07) > 0) {
             segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index ab2ff7c8b..8b4810f57 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -23,8 +23,10 @@ import org.apache.paimon.data.AbstractPagedOutputView;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.data.BinaryWriter.ValueSetter;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalRow.FieldGetter;
 import org.apache.paimon.data.NestedRow;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataOutputView;
@@ -33,16 +35,17 @@ import org.apache.paimon.types.RowType;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.stream.IntStream;
 
 /** Serializer for {@link InternalRow}. */
 public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow> {
+
     private static final long serialVersionUID = 1L;
 
     private final BinaryRowSerializer binarySerializer;
     private final DataType[] types;
     private final Serializer[] fieldSerializers;
-    private final InternalRow.FieldGetter[] fieldGetters;
+    private final FieldGetter[] fieldGetters;
+    private final ValueSetter[] valueSetters;
 
     private transient BinaryRow reuseRow;
     private transient BinaryRowWriter reuseWriter;
@@ -65,10 +68,13 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
         this.types = types;
         this.fieldSerializers = fieldSerializers;
         this.binarySerializer = new BinaryRowSerializer(types.length);
-        this.fieldGetters =
-                IntStream.range(0, types.length)
-                        .mapToObj(i -> InternalRow.createFieldGetter(types[i], 
i))
-                        .toArray(InternalRow.FieldGetter[]::new);
+        this.fieldGetters = new FieldGetter[types.length];
+        this.valueSetters = new ValueSetter[types.length];
+        for (int i = 0; i < types.length; i++) {
+            DataType type = types[i];
+            fieldGetters[i] = InternalRow.createFieldGetter(type, i);
+            valueSetters[i] = BinaryWriter.createValueSetter(type);
+        }
     }
 
     @Override
@@ -149,15 +155,11 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
         reuseWriter.reset();
         reuseWriter.writeRowKind(row.getRowKind());
         for (int i = 0; i < types.length; i++) {
-            if (row.isNullAt(i)) {
+            Object field = fieldGetters[i].getFieldOrNull(row);
+            if (field == null) {
                 reuseWriter.setNullAt(i);
             } else {
-                BinaryWriter.write(
-                        reuseWriter,
-                        i,
-                        fieldGetters[i].getFieldOrNull(row),
-                        types[i],
-                        fieldSerializers[i]);
+                valueSetters[i].setValue(reuseWriter, i, field);
             }
         }
         reuseWriter.complete();
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalRowSerializerTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalRowSerializerTest.java
index 192c19514..c6feb9d58 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalRowSerializerTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalRowSerializerTest.java
@@ -50,11 +50,7 @@ abstract class InternalRowSerializerTest extends 
SerializerTestInstance<Internal
 
     @Override
     protected boolean deepEquals(InternalRow o1, InternalRow o2) {
-        return deepEqualsInternalRow(
-                o1,
-                o2,
-                (InternalRowSerializer) serializer.duplicate(),
-                (InternalRowSerializer) serializer.duplicate());
+        return deepEqualsInternalRow(o1, o2, serializer.duplicate(), 
serializer.duplicate());
     }
 
     // 
----------------------------------------------------------------------------------------------
@@ -113,11 +109,7 @@ abstract class InternalRowSerializerTest extends 
SerializerTestInstance<Internal
     private void checkDeepEquals(InternalRow should, InternalRow is, boolean 
checkClass) {
         boolean equals =
                 deepEqualsInternalRow(
-                        should,
-                        is,
-                        (InternalRowSerializer) serializer.duplicate(),
-                        (InternalRowSerializer) serializer.duplicate(),
-                        checkClass);
+                        should, is, serializer.duplicate(), 
serializer.duplicate(), checkClass);
         assertThat(equals).isTrue();
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
new file mode 100644
index 000000000..bf4d0e026
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.orc.writer;
+
+import org.apache.paimon.data.DataGetters;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+
+/** Orc field writer. */
+public interface FieldWriter {
+    void write(int rowId, ColumnVector column, DataGetters getters, int 
columnId);
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
new file mode 100644
index 000000000..77fe130a1
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.orc.writer;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Factory to create {@link FieldWriter}. */
+public class FieldWriterFactory implements DataTypeVisitor<FieldWriter> {
+
+    public static final FieldWriterFactory WRITER_FACTORY = new 
FieldWriterFactory();
+
+    private static final FieldWriter STRING_WRITER =
+            (rowId, column, getters, columnId) -> {
+                BytesColumnVector vector = (BytesColumnVector) column;
+                byte[] bytes = getters.getString(columnId).toBytes();
+                vector.setVal(rowId, bytes, 0, bytes.length);
+            };
+
+    private static final FieldWriter BYTES_WRITER =
+            (rowId, column, getters, columnId) -> {
+                BytesColumnVector vector = (BytesColumnVector) column;
+                byte[] bytes = getters.getBinary(columnId);
+                vector.setVal(rowId, bytes, 0, bytes.length);
+            };
+
+    private static final FieldWriter BOOLEAN_WRITER =
+            (rowId, column, getters, columnId) ->
+                    ((LongColumnVector) column).vector[rowId] =
+                            getters.getBoolean(columnId) ? 1 : 0;
+
+    private static final FieldWriter INT_WRITER =
+            (rowId, column, getters, columnId) ->
+                    ((LongColumnVector) column).vector[rowId] = 
getters.getInt(columnId);
+
+    private static final FieldWriter TINYINT_WRITER =
+            (rowId, column, getters, columnId) ->
+                    ((LongColumnVector) column).vector[rowId] = 
getters.getByte(columnId);
+
+    private static final FieldWriter SMALLINT_WRITER =
+            (rowId, column, getters, columnId) ->
+                    ((LongColumnVector) column).vector[rowId] = 
getters.getShort(columnId);
+
+    private static final FieldWriter BIGINT_WRITER =
+            (rowId, column, getters, columnId) ->
+                    ((LongColumnVector) column).vector[rowId] = 
getters.getLong(columnId);
+
+    private static final FieldWriter FLOAT_WRITER =
+            (rowId, column, getters, columnId) ->
+                    ((DoubleColumnVector) column).vector[rowId] = 
getters.getFloat(columnId);
+
+    private static final FieldWriter DOUBLE_WRITER =
+            (rowId, column, getters, columnId) ->
+                    ((DoubleColumnVector) column).vector[rowId] = 
getters.getDouble(columnId);
+
+    @Override
+    public FieldWriter visit(CharType charType) {
+        return STRING_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(VarCharType varCharType) {
+        return STRING_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(BooleanType booleanType) {
+        return BOOLEAN_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(BinaryType binaryType) {
+        return BYTES_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(VarBinaryType varBinaryType) {
+        return BYTES_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(TinyIntType tinyIntType) {
+        return TINYINT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(SmallIntType smallIntType) {
+        return SMALLINT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(IntType intType) {
+        return INT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(BigIntType bigIntType) {
+        return BIGINT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(FloatType floatType) {
+        return FLOAT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(DoubleType doubleType) {
+        return DOUBLE_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(DateType dateType) {
+        return INT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(TimeType timeType) {
+        return INT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visit(TimestampType timestampType) {
+        return (rowId, column, getters, columnId) -> {
+            Timestamp timestamp =
+                    getters.getTimestamp(columnId, 
timestampType.getPrecision()).toSQLTimestamp();
+            TimestampColumnVector vector = (TimestampColumnVector) column;
+            vector.set(rowId, timestamp);
+        };
+    }
+
+    @Override
+    public FieldWriter visit(LocalZonedTimestampType localZonedTimestampType) {
+        return (rowId, column, getters, columnId) -> {
+            Timestamp timestamp =
+                    getters.getTimestamp(columnId, 
localZonedTimestampType.getPrecision())
+                            .toSQLTimestamp();
+            TimestampColumnVector vector = (TimestampColumnVector) column;
+            vector.set(rowId, timestamp);
+        };
+    }
+
+    @Override
+    public FieldWriter visit(DecimalType decimalType) {
+        return (rowId, column, getters, columnId) -> {
+            DecimalColumnVector vector = (DecimalColumnVector) column;
+            Decimal decimal =
+                    getters.getDecimal(
+                            columnId, decimalType.getPrecision(), 
decimalType.getScale());
+            HiveDecimal hiveDecimal = 
HiveDecimal.create(decimal.toBigDecimal());
+            vector.set(rowId, hiveDecimal);
+        };
+    }
+
+    @Override
+    public FieldWriter visit(ArrayType arrayType) {
+        FieldWriter elementWriter = arrayType.getElementType().accept(this);
+        return (rowId, column, getters, columnId) -> {
+            ListColumnVector listColumnVector = (ListColumnVector) column;
+            InternalArray arrayData = getters.getArray(columnId);
+            listColumnVector.lengths[rowId] = arrayData.size();
+            listColumnVector.offsets[rowId] = listColumnVector.childCount;
+            listColumnVector.childCount += listColumnVector.lengths[rowId];
+            ensureSize(
+                    listColumnVector.child,
+                    listColumnVector.childCount,
+                    listColumnVector.offsets[rowId] != 0);
+
+            for (int i = 0; i < arrayData.size(); i++) {
+                ColumnVector fieldColumn = listColumnVector.child;
+                int fieldIndex = (int) listColumnVector.offsets[rowId] + i;
+                if (arrayData.isNullAt(i)) {
+                    fieldColumn.noNulls = false;
+                    fieldColumn.isNull[fieldIndex] = true;
+                } else {
+                    elementWriter.write(fieldIndex, fieldColumn, arrayData, i);
+                }
+            }
+        };
+    }
+
+    @Override
+    public FieldWriter visit(MapType mapType) {
+        FieldWriter keyWriter = mapType.getKeyType().accept(this);
+        FieldWriter valueWriter = mapType.getValueType().accept(this);
+        return (rowId, column, getters, columnId) -> {
+            MapColumnVector mapColumnVector = (MapColumnVector) column;
+            InternalMap mapData = getters.getMap(columnId);
+            InternalArray keyArray = mapData.keyArray();
+            InternalArray valueArray = mapData.valueArray();
+            mapColumnVector.lengths[rowId] = mapData.size();
+            mapColumnVector.offsets[rowId] = mapColumnVector.childCount;
+            mapColumnVector.childCount += mapColumnVector.lengths[rowId];
+            ensureSize(
+                    mapColumnVector.keys,
+                    mapColumnVector.childCount,
+                    mapColumnVector.offsets[rowId] != 0);
+            ensureSize(
+                    mapColumnVector.values,
+                    mapColumnVector.childCount,
+                    mapColumnVector.offsets[rowId] != 0);
+
+            for (int i = 0; i < keyArray.size(); i++) {
+                int fieldIndex = (int) mapColumnVector.offsets[rowId] + i;
+
+                ColumnVector keyColumn = mapColumnVector.keys;
+                if (keyArray.isNullAt(i)) {
+                    keyColumn.noNulls = false;
+                    keyColumn.isNull[fieldIndex] = true;
+                } else {
+                    keyWriter.write(fieldIndex, keyColumn, keyArray, i);
+                }
+
+                ColumnVector valueColumn = mapColumnVector.values;
+                if (valueArray.isNullAt(i)) {
+                    valueColumn.noNulls = false;
+                    valueColumn.isNull[fieldIndex] = true;
+                } else {
+                    valueWriter.write(fieldIndex, valueColumn, valueArray, i);
+                }
+            }
+        };
+    }
+
+    @Override
+    public FieldWriter visit(RowType rowType) {
+        List<FieldWriter> fieldWriters =
+                rowType.getFieldTypes().stream()
+                        .map(t -> t.accept(this))
+                        .collect(Collectors.toList());
+        return (rowId, column, getters, columnId) -> {
+            StructColumnVector structColumnVector = (StructColumnVector) 
column;
+            InternalRow structRow = getters.getRow(columnId, 
structColumnVector.fields.length);
+            for (int i = 0; i < structRow.getFieldCount(); i++) {
+                ColumnVector fieldColumn = structColumnVector.fields[i];
+                if (structRow.isNullAt(i)) {
+                    fieldColumn.noNulls = false;
+                    fieldColumn.isNull[rowId] = true;
+                } else {
+                    fieldWriters.get(i).write(rowId, fieldColumn, structRow, 
i);
+                }
+            }
+        };
+    }
+
+    @Override
+    public FieldWriter visit(MultisetType multisetType) {
+        throw new UnsupportedOperationException("Unsupported multisetType: " + 
multisetType);
+    }
+
+    private static void ensureSize(ColumnVector cv, int size, boolean 
preserveData) {
+        int currentLength = cv.isNull.length;
+        if (currentLength < size) {
+            cv.ensureSize(Math.max(currentLength * 2, size), preserveData);
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
index b05b5e208..21443cdf9 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
@@ -18,274 +18,42 @@
 
 package org.apache.paimon.format.orc.writer;
 
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DecimalType;
-import org.apache.paimon.types.LocalZonedTimestampType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TimestampType;
 
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
-import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.format.orc.writer.FieldWriterFactory.WRITER_FACTORY;
 
 /** A {@link Vectorizer} of {@link InternalRow} type element. */
 public class RowDataVectorizer extends Vectorizer<InternalRow> {
 
-    private final DataType[] fieldTypes;
+    private final List<FieldWriter> fieldWriters;
 
     public RowDataVectorizer(String schema, DataType[] fieldTypes) {
         super(schema);
-        this.fieldTypes = fieldTypes;
+        this.fieldWriters =
+                Arrays.stream(fieldTypes)
+                        .map(t -> t.accept(WRITER_FACTORY))
+                        .collect(Collectors.toList());
     }
 
     @Override
     public void vectorize(InternalRow row, VectorizedRowBatch batch) {
         int rowId = batch.size++;
         for (int i = 0; i < row.getFieldCount(); ++i) {
-            setColumn(rowId, batch.cols[i], fieldTypes[i], row, i);
-        }
-    }
-
-    private static void setColumn(
-            int rowId, ColumnVector column, DataType type, InternalRow row, 
int columnId) {
-        if (row.isNullAt(columnId)) {
-            column.noNulls = false;
-            column.isNull[rowId] = true;
-            return;
-        }
-
-        switch (type.getTypeRoot()) {
-            case CHAR:
-            case VARCHAR:
-                {
-                    BytesColumnVector vector = (BytesColumnVector) column;
-                    byte[] bytes = row.getString(columnId).toBytes();
-                    vector.setVal(rowId, bytes, 0, bytes.length);
-                    break;
-                }
-            case BOOLEAN:
-                {
-                    LongColumnVector vector = (LongColumnVector) column;
-                    vector.vector[rowId] = row.getBoolean(columnId) ? 1 : 0;
-                    break;
-                }
-            case BINARY:
-            case VARBINARY:
-                {
-                    BytesColumnVector vector = (BytesColumnVector) column;
-                    byte[] bytes = row.getBinary(columnId);
-                    vector.setVal(rowId, bytes, 0, bytes.length);
-                    break;
-                }
-            case DECIMAL:
-                {
-                    DecimalType dt = (DecimalType) type;
-                    DecimalColumnVector vector = (DecimalColumnVector) column;
-                    vector.set(
-                            rowId,
-                            HiveDecimal.create(
-                                    row.getDecimal(columnId, 
dt.getPrecision(), dt.getScale())
-                                            .toBigDecimal()));
-                    break;
-                }
-            case TINYINT:
-                {
-                    LongColumnVector vector = (LongColumnVector) column;
-                    vector.vector[rowId] = row.getByte(columnId);
-                    break;
-                }
-            case SMALLINT:
-                {
-                    LongColumnVector vector = (LongColumnVector) column;
-                    vector.vector[rowId] = row.getShort(columnId);
-                    break;
-                }
-            case DATE:
-            case TIME_WITHOUT_TIME_ZONE:
-            case INTEGER:
-                {
-                    LongColumnVector vector = (LongColumnVector) column;
-                    vector.vector[rowId] = row.getInt(columnId);
-                    break;
-                }
-            case BIGINT:
-                {
-                    LongColumnVector vector = (LongColumnVector) column;
-                    vector.vector[rowId] = row.getLong(columnId);
-                    break;
-                }
-            case FLOAT:
-                {
-                    DoubleColumnVector vector = (DoubleColumnVector) column;
-                    vector.vector[rowId] = row.getFloat(columnId);
-                    break;
-                }
-            case DOUBLE:
-                {
-                    DoubleColumnVector vector = (DoubleColumnVector) column;
-                    vector.vector[rowId] = row.getDouble(columnId);
-                    break;
-                }
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                {
-                    TimestampType tt = (TimestampType) type;
-                    Timestamp timestamp =
-                            row.getTimestamp(columnId, 
tt.getPrecision()).toSQLTimestamp();
-                    TimestampColumnVector vector = (TimestampColumnVector) 
column;
-                    vector.set(rowId, timestamp);
-                    break;
-                }
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                {
-                    LocalZonedTimestampType lt = (LocalZonedTimestampType) 
type;
-                    Timestamp timestamp =
-                            row.getTimestamp(columnId, 
lt.getPrecision()).toSQLTimestamp();
-                    TimestampColumnVector vector = (TimestampColumnVector) 
column;
-                    vector.set(rowId, timestamp);
-                    break;
-                }
-            case ARRAY:
-                {
-                    ListColumnVector listColumnVector = (ListColumnVector) 
column;
-                    setColumn(rowId, listColumnVector, type, row, columnId);
-                    break;
-                }
-            case MAP:
-                {
-                    MapColumnVector mapColumnVector = (MapColumnVector) column;
-                    setColumn(rowId, mapColumnVector, type, row, columnId);
-                    break;
-                }
-            case ROW:
-                {
-                    StructColumnVector structColumnVector = 
(StructColumnVector) column;
-                    setColumn(rowId, structColumnVector, type, row, columnId);
-                    break;
-                }
-            default:
-                throw new UnsupportedOperationException("Unsupported type: " + 
type);
-        }
-    }
-
-    private static void setColumn(
-            int rowId,
-            ListColumnVector listColumnVector,
-            DataType type,
-            InternalRow row,
-            int columnId) {
-        InternalArray arrayData = row.getArray(columnId);
-        ArrayType arrayType = (ArrayType) type;
-        listColumnVector.lengths[rowId] = arrayData.size();
-        listColumnVector.offsets[rowId] = listColumnVector.childCount;
-        listColumnVector.childCount += listColumnVector.lengths[rowId];
-        ensureSize(
-                listColumnVector.child,
-                listColumnVector.childCount,
-                listColumnVector.offsets[rowId] != 0);
-
-        InternalRow convertedRowData = convert(arrayData, 
arrayType.getElementType());
-        for (int i = 0; i < arrayData.size(); i++) {
-            setColumn(
-                    (int) listColumnVector.offsets[rowId] + i,
-                    listColumnVector.child,
-                    arrayType.getElementType(),
-                    convertedRowData,
-                    i);
-        }
-    }
-
-    private static void setColumn(
-            int rowId,
-            MapColumnVector mapColumnVector,
-            DataType type,
-            InternalRow row,
-            int columnId) {
-        InternalMap mapData = row.getMap(columnId);
-        MapType mapType = (MapType) type;
-        InternalArray keyArray = mapData.keyArray();
-        InternalArray valueArray = mapData.valueArray();
-        mapColumnVector.lengths[rowId] = mapData.size();
-        mapColumnVector.offsets[rowId] = mapColumnVector.childCount;
-        mapColumnVector.childCount += mapColumnVector.lengths[rowId];
-        ensureSize(
-                mapColumnVector.keys,
-                mapColumnVector.childCount,
-                mapColumnVector.offsets[rowId] != 0);
-        ensureSize(
-                mapColumnVector.values,
-                mapColumnVector.childCount,
-                mapColumnVector.offsets[rowId] != 0);
-
-        InternalRow convertedKeyRowData = convert(keyArray, 
mapType.getKeyType());
-        InternalRow convertedValueRowData = convert(valueArray, 
mapType.getValueType());
-        for (int i = 0; i < keyArray.size(); i++) {
-            setColumn(
-                    (int) mapColumnVector.offsets[rowId] + i,
-                    mapColumnVector.keys,
-                    mapType.getKeyType(),
-                    convertedKeyRowData,
-                    i);
-            setColumn(
-                    (int) mapColumnVector.offsets[rowId] + i,
-                    mapColumnVector.values,
-                    mapType.getValueType(),
-                    convertedValueRowData,
-                    i);
-        }
-    }
-
-    private static void setColumn(
-            int rowId,
-            StructColumnVector structColumnVector,
-            DataType type,
-            InternalRow row,
-            int columnId) {
-        InternalRow structRow = row.getRow(columnId, 
structColumnVector.fields.length);
-        RowType rowType = (RowType) type;
-        for (int i = 0; i < structRow.getFieldCount(); i++) {
-            ColumnVector cv = structColumnVector.fields[i];
-            setColumn(rowId, cv, rowType.getTypeAt(i), structRow, i);
-        }
-    }
-
-    private static void ensureSize(ColumnVector cv, int size, boolean 
preserveData) {
-        int currentLength = cv.isNull.length;
-        if (currentLength < size) {
-            cv.ensureSize(Math.max(currentLength * 2, size), preserveData);
-        }
-    }
-
-    /**
-     * Converting ArrayData to RowData for calling {@link 
RowDataVectorizer#setColumn(int,
-     * ColumnVector, DataType, InternalRow, int)} recursively with array.
-     *
-     * @param arrayData input ArrayData.
-     * @param arrayFieldType DataType of input ArrayData.
-     * @return RowData.
-     */
-    private static InternalRow convert(InternalArray arrayData, DataType 
arrayFieldType) {
-        GenericRow rowData = new GenericRow(arrayData.size());
-        InternalArray.ElementGetter elementGetter =
-                InternalArray.createElementGetter(arrayFieldType);
-        for (int i = 0; i < arrayData.size(); i++) {
-            rowData.setField(i, elementGetter.getElementOrNull(arrayData, i));
+            ColumnVector fieldColumn = batch.cols[i];
+            if (row.isNullAt(i)) {
+                fieldColumn.noNulls = false;
+                fieldColumn.isNull[rowId] = true;
+            } else {
+                fieldWriters.get(i).write(rowId, fieldColumn, row, i);
+            }
         }
-        return rowData;
     }
 }

Reply via email to