This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 36ae4ae01 [core] Optimize write performance (#3039)
36ae4ae01 is described below
commit 36ae4ae01f951c7698f7595504b8b17f12f35c94
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 19 14:46:33 2024 +0800
[core] Optimize write performance (#3039)
---
.../paimon/benchmark/TableWriterBenchmark.java | 2 +-
.../apache/paimon/data/AbstractBinaryWriter.java | 6 -
.../data/serializer/InternalRowSerializer.java | 28 +-
.../data/serializer/InternalRowSerializerTest.java | 12 +-
.../paimon/format/orc/writer/FieldWriter.java | 28 ++
.../format/orc/writer/FieldWriterFactory.java | 310 +++++++++++++++++++++
.../format/orc/writer/RowDataVectorizer.java | 266 ++----------------
7 files changed, 373 insertions(+), 279 deletions(-)
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
index 04d1d7342..9812afebb 100644
---
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
@@ -100,7 +100,7 @@ public class TableWriterBenchmark extends TableBenchmark {
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
* orc: Best/Avg Time(ms) Row Rate(K/s) Per
Row(ns) Relative
*
---------------------------------------------------------------------------------
- * orc_write 36489 / 36697 82.2 12163.1
1.0X
+ * orc_write 31812 / 32223 94.3 10604.1
1.0X
*/
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
index f8de8538e..b8bc3bd93 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
@@ -177,12 +177,6 @@ abstract class AbstractBinaryWriter implements
BinaryWriter {
}
}
- private void zeroBytes(int offset, int size) {
- for (int i = offset; i < offset + size; i++) {
- segment.put(i, (byte) 0);
- }
- }
-
protected void zeroOutPaddingBytes(int numBytes) {
if ((numBytes & 0x07) > 0) {
segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index ab2ff7c8b..8b4810f57 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -23,8 +23,10 @@ import org.apache.paimon.data.AbstractPagedOutputView;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.data.BinaryWriter.ValueSetter;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.data.NestedRow;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
@@ -33,16 +35,17 @@ import org.apache.paimon.types.RowType;
import java.io.IOException;
import java.util.Arrays;
-import java.util.stream.IntStream;
/** Serializer for {@link InternalRow}. */
public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow> {
+
private static final long serialVersionUID = 1L;
private final BinaryRowSerializer binarySerializer;
private final DataType[] types;
private final Serializer[] fieldSerializers;
- private final InternalRow.FieldGetter[] fieldGetters;
+ private final FieldGetter[] fieldGetters;
+ private final ValueSetter[] valueSetters;
private transient BinaryRow reuseRow;
private transient BinaryRowWriter reuseWriter;
@@ -65,10 +68,13 @@ public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow
this.types = types;
this.fieldSerializers = fieldSerializers;
this.binarySerializer = new BinaryRowSerializer(types.length);
- this.fieldGetters =
- IntStream.range(0, types.length)
- .mapToObj(i -> InternalRow.createFieldGetter(types[i],
i))
- .toArray(InternalRow.FieldGetter[]::new);
+ this.fieldGetters = new FieldGetter[types.length];
+ this.valueSetters = new ValueSetter[types.length];
+ for (int i = 0; i < types.length; i++) {
+ DataType type = types[i];
+ fieldGetters[i] = InternalRow.createFieldGetter(type, i);
+ valueSetters[i] = BinaryWriter.createValueSetter(type);
+ }
}
@Override
@@ -149,15 +155,11 @@ public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow
reuseWriter.reset();
reuseWriter.writeRowKind(row.getRowKind());
for (int i = 0; i < types.length; i++) {
- if (row.isNullAt(i)) {
+ Object field = fieldGetters[i].getFieldOrNull(row);
+ if (field == null) {
reuseWriter.setNullAt(i);
} else {
- BinaryWriter.write(
- reuseWriter,
- i,
- fieldGetters[i].getFieldOrNull(row),
- types[i],
- fieldSerializers[i]);
+ valueSetters[i].setValue(reuseWriter, i, field);
}
}
reuseWriter.complete();
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalRowSerializerTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalRowSerializerTest.java
index 192c19514..c6feb9d58 100644
---
a/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalRowSerializerTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/InternalRowSerializerTest.java
@@ -50,11 +50,7 @@ abstract class InternalRowSerializerTest extends
SerializerTestInstance<Internal
@Override
protected boolean deepEquals(InternalRow o1, InternalRow o2) {
- return deepEqualsInternalRow(
- o1,
- o2,
- (InternalRowSerializer) serializer.duplicate(),
- (InternalRowSerializer) serializer.duplicate());
+ return deepEqualsInternalRow(o1, o2, serializer.duplicate(),
serializer.duplicate());
}
//
----------------------------------------------------------------------------------------------
@@ -113,11 +109,7 @@ abstract class InternalRowSerializerTest extends
SerializerTestInstance<Internal
private void checkDeepEquals(InternalRow should, InternalRow is, boolean
checkClass) {
boolean equals =
deepEqualsInternalRow(
- should,
- is,
- (InternalRowSerializer) serializer.duplicate(),
- (InternalRowSerializer) serializer.duplicate(),
- checkClass);
+ should, is, serializer.duplicate(),
serializer.duplicate(), checkClass);
assertThat(equals).isTrue();
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
new file mode 100644
index 000000000..bf4d0e026
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.orc.writer;
+
+import org.apache.paimon.data.DataGetters;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+
+/** Orc field writer. */
+public interface FieldWriter {
+ void write(int rowId, ColumnVector column, DataGetters getters, int
columnId);
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
new file mode 100644
index 000000000..77fe130a1
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.orc.writer;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Factory to create {@link FieldWriter}. */
+public class FieldWriterFactory implements DataTypeVisitor<FieldWriter> {
+
+ public static final FieldWriterFactory WRITER_FACTORY = new
FieldWriterFactory();
+
+ private static final FieldWriter STRING_WRITER =
+ (rowId, column, getters, columnId) -> {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ byte[] bytes = getters.getString(columnId).toBytes();
+ vector.setVal(rowId, bytes, 0, bytes.length);
+ };
+
+ private static final FieldWriter BYTES_WRITER =
+ (rowId, column, getters, columnId) -> {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ byte[] bytes = getters.getBinary(columnId);
+ vector.setVal(rowId, bytes, 0, bytes.length);
+ };
+
+ private static final FieldWriter BOOLEAN_WRITER =
+ (rowId, column, getters, columnId) ->
+ ((LongColumnVector) column).vector[rowId] =
+ getters.getBoolean(columnId) ? 1 : 0;
+
+ private static final FieldWriter INT_WRITER =
+ (rowId, column, getters, columnId) ->
+ ((LongColumnVector) column).vector[rowId] =
getters.getInt(columnId);
+
+ private static final FieldWriter TINYINT_WRITER =
+ (rowId, column, getters, columnId) ->
+ ((LongColumnVector) column).vector[rowId] =
getters.getByte(columnId);
+
+ private static final FieldWriter SMALLINT_WRITER =
+ (rowId, column, getters, columnId) ->
+ ((LongColumnVector) column).vector[rowId] =
getters.getShort(columnId);
+
+ private static final FieldWriter BIGINT_WRITER =
+ (rowId, column, getters, columnId) ->
+ ((LongColumnVector) column).vector[rowId] =
getters.getLong(columnId);
+
+ private static final FieldWriter FLOAT_WRITER =
+ (rowId, column, getters, columnId) ->
+ ((DoubleColumnVector) column).vector[rowId] =
getters.getFloat(columnId);
+
+ private static final FieldWriter DOUBLE_WRITER =
+ (rowId, column, getters, columnId) ->
+ ((DoubleColumnVector) column).vector[rowId] =
getters.getDouble(columnId);
+
+ @Override
+ public FieldWriter visit(CharType charType) {
+ return STRING_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(VarCharType varCharType) {
+ return STRING_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(BooleanType booleanType) {
+ return BOOLEAN_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(BinaryType binaryType) {
+ return BYTES_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(VarBinaryType varBinaryType) {
+ return BYTES_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(TinyIntType tinyIntType) {
+ return TINYINT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(SmallIntType smallIntType) {
+ return SMALLINT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(IntType intType) {
+ return INT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(BigIntType bigIntType) {
+ return BIGINT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(FloatType floatType) {
+ return FLOAT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(DoubleType doubleType) {
+ return DOUBLE_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(DateType dateType) {
+ return INT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(TimeType timeType) {
+ return INT_WRITER;
+ }
+
+ @Override
+ public FieldWriter visit(TimestampType timestampType) {
+ return (rowId, column, getters, columnId) -> {
+ Timestamp timestamp =
+ getters.getTimestamp(columnId,
timestampType.getPrecision()).toSQLTimestamp();
+ TimestampColumnVector vector = (TimestampColumnVector) column;
+ vector.set(rowId, timestamp);
+ };
+ }
+
+ @Override
+ public FieldWriter visit(LocalZonedTimestampType localZonedTimestampType) {
+ return (rowId, column, getters, columnId) -> {
+ Timestamp timestamp =
+ getters.getTimestamp(columnId,
localZonedTimestampType.getPrecision())
+ .toSQLTimestamp();
+ TimestampColumnVector vector = (TimestampColumnVector) column;
+ vector.set(rowId, timestamp);
+ };
+ }
+
+ @Override
+ public FieldWriter visit(DecimalType decimalType) {
+ return (rowId, column, getters, columnId) -> {
+ DecimalColumnVector vector = (DecimalColumnVector) column;
+ Decimal decimal =
+ getters.getDecimal(
+ columnId, decimalType.getPrecision(),
decimalType.getScale());
+ HiveDecimal hiveDecimal =
HiveDecimal.create(decimal.toBigDecimal());
+ vector.set(rowId, hiveDecimal);
+ };
+ }
+
+ @Override
+ public FieldWriter visit(ArrayType arrayType) {
+ FieldWriter elementWriter = arrayType.getElementType().accept(this);
+ return (rowId, column, getters, columnId) -> {
+ ListColumnVector listColumnVector = (ListColumnVector) column;
+ InternalArray arrayData = getters.getArray(columnId);
+ listColumnVector.lengths[rowId] = arrayData.size();
+ listColumnVector.offsets[rowId] = listColumnVector.childCount;
+ listColumnVector.childCount += listColumnVector.lengths[rowId];
+ ensureSize(
+ listColumnVector.child,
+ listColumnVector.childCount,
+ listColumnVector.offsets[rowId] != 0);
+
+ for (int i = 0; i < arrayData.size(); i++) {
+ ColumnVector fieldColumn = listColumnVector.child;
+ int fieldIndex = (int) listColumnVector.offsets[rowId] + i;
+ if (arrayData.isNullAt(i)) {
+ fieldColumn.noNulls = false;
+ fieldColumn.isNull[fieldIndex] = true;
+ } else {
+ elementWriter.write(fieldIndex, fieldColumn, arrayData, i);
+ }
+ }
+ };
+ }
+
+ @Override
+ public FieldWriter visit(MapType mapType) {
+ FieldWriter keyWriter = mapType.getKeyType().accept(this);
+ FieldWriter valueWriter = mapType.getValueType().accept(this);
+ return (rowId, column, getters, columnId) -> {
+ MapColumnVector mapColumnVector = (MapColumnVector) column;
+ InternalMap mapData = getters.getMap(columnId);
+ InternalArray keyArray = mapData.keyArray();
+ InternalArray valueArray = mapData.valueArray();
+ mapColumnVector.lengths[rowId] = mapData.size();
+ mapColumnVector.offsets[rowId] = mapColumnVector.childCount;
+ mapColumnVector.childCount += mapColumnVector.lengths[rowId];
+ ensureSize(
+ mapColumnVector.keys,
+ mapColumnVector.childCount,
+ mapColumnVector.offsets[rowId] != 0);
+ ensureSize(
+ mapColumnVector.values,
+ mapColumnVector.childCount,
+ mapColumnVector.offsets[rowId] != 0);
+
+ for (int i = 0; i < keyArray.size(); i++) {
+ int fieldIndex = (int) mapColumnVector.offsets[rowId] + i;
+
+ ColumnVector keyColumn = mapColumnVector.keys;
+ if (keyArray.isNullAt(i)) {
+ keyColumn.noNulls = false;
+ keyColumn.isNull[fieldIndex] = true;
+ } else {
+ keyWriter.write(fieldIndex, keyColumn, keyArray, i);
+ }
+
+ ColumnVector valueColumn = mapColumnVector.values;
+ if (valueArray.isNullAt(i)) {
+ valueColumn.noNulls = false;
+ valueColumn.isNull[fieldIndex] = true;
+ } else {
+ valueWriter.write(fieldIndex, valueColumn, valueArray, i);
+ }
+ }
+ };
+ }
+
+ @Override
+ public FieldWriter visit(RowType rowType) {
+ List<FieldWriter> fieldWriters =
+ rowType.getFieldTypes().stream()
+ .map(t -> t.accept(this))
+ .collect(Collectors.toList());
+ return (rowId, column, getters, columnId) -> {
+ StructColumnVector structColumnVector = (StructColumnVector)
column;
+ InternalRow structRow = getters.getRow(columnId,
structColumnVector.fields.length);
+ for (int i = 0; i < structRow.getFieldCount(); i++) {
+ ColumnVector fieldColumn = structColumnVector.fields[i];
+ if (structRow.isNullAt(i)) {
+ fieldColumn.noNulls = false;
+ fieldColumn.isNull[rowId] = true;
+ } else {
+ fieldWriters.get(i).write(rowId, fieldColumn, structRow,
i);
+ }
+ }
+ };
+ }
+
+ @Override
+ public FieldWriter visit(MultisetType multisetType) {
+ throw new UnsupportedOperationException("Unsupported multisetType: " +
multisetType);
+ }
+
+ private static void ensureSize(ColumnVector cv, int size, boolean
preserveData) {
+ int currentLength = cv.isNull.length;
+ if (currentLength < size) {
+ cv.ensureSize(Math.max(currentLength * 2, size), preserveData);
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
index b05b5e208..21443cdf9 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
@@ -18,274 +18,42 @@
package org.apache.paimon.format.orc.writer;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DecimalType;
-import org.apache.paimon.types.LocalZonedTimestampType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TimestampType;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.paimon.format.orc.writer.FieldWriterFactory.WRITER_FACTORY;
/** A {@link Vectorizer} of {@link InternalRow} type element. */
public class RowDataVectorizer extends Vectorizer<InternalRow> {
- private final DataType[] fieldTypes;
+ private final List<FieldWriter> fieldWriters;
public RowDataVectorizer(String schema, DataType[] fieldTypes) {
super(schema);
- this.fieldTypes = fieldTypes;
+ this.fieldWriters =
+ Arrays.stream(fieldTypes)
+ .map(t -> t.accept(WRITER_FACTORY))
+ .collect(Collectors.toList());
}
@Override
public void vectorize(InternalRow row, VectorizedRowBatch batch) {
int rowId = batch.size++;
for (int i = 0; i < row.getFieldCount(); ++i) {
- setColumn(rowId, batch.cols[i], fieldTypes[i], row, i);
- }
- }
-
- private static void setColumn(
- int rowId, ColumnVector column, DataType type, InternalRow row,
int columnId) {
- if (row.isNullAt(columnId)) {
- column.noNulls = false;
- column.isNull[rowId] = true;
- return;
- }
-
- switch (type.getTypeRoot()) {
- case CHAR:
- case VARCHAR:
- {
- BytesColumnVector vector = (BytesColumnVector) column;
- byte[] bytes = row.getString(columnId).toBytes();
- vector.setVal(rowId, bytes, 0, bytes.length);
- break;
- }
- case BOOLEAN:
- {
- LongColumnVector vector = (LongColumnVector) column;
- vector.vector[rowId] = row.getBoolean(columnId) ? 1 : 0;
- break;
- }
- case BINARY:
- case VARBINARY:
- {
- BytesColumnVector vector = (BytesColumnVector) column;
- byte[] bytes = row.getBinary(columnId);
- vector.setVal(rowId, bytes, 0, bytes.length);
- break;
- }
- case DECIMAL:
- {
- DecimalType dt = (DecimalType) type;
- DecimalColumnVector vector = (DecimalColumnVector) column;
- vector.set(
- rowId,
- HiveDecimal.create(
- row.getDecimal(columnId,
dt.getPrecision(), dt.getScale())
- .toBigDecimal()));
- break;
- }
- case TINYINT:
- {
- LongColumnVector vector = (LongColumnVector) column;
- vector.vector[rowId] = row.getByte(columnId);
- break;
- }
- case SMALLINT:
- {
- LongColumnVector vector = (LongColumnVector) column;
- vector.vector[rowId] = row.getShort(columnId);
- break;
- }
- case DATE:
- case TIME_WITHOUT_TIME_ZONE:
- case INTEGER:
- {
- LongColumnVector vector = (LongColumnVector) column;
- vector.vector[rowId] = row.getInt(columnId);
- break;
- }
- case BIGINT:
- {
- LongColumnVector vector = (LongColumnVector) column;
- vector.vector[rowId] = row.getLong(columnId);
- break;
- }
- case FLOAT:
- {
- DoubleColumnVector vector = (DoubleColumnVector) column;
- vector.vector[rowId] = row.getFloat(columnId);
- break;
- }
- case DOUBLE:
- {
- DoubleColumnVector vector = (DoubleColumnVector) column;
- vector.vector[rowId] = row.getDouble(columnId);
- break;
- }
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- {
- TimestampType tt = (TimestampType) type;
- Timestamp timestamp =
- row.getTimestamp(columnId,
tt.getPrecision()).toSQLTimestamp();
- TimestampColumnVector vector = (TimestampColumnVector)
column;
- vector.set(rowId, timestamp);
- break;
- }
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- {
- LocalZonedTimestampType lt = (LocalZonedTimestampType)
type;
- Timestamp timestamp =
- row.getTimestamp(columnId,
lt.getPrecision()).toSQLTimestamp();
- TimestampColumnVector vector = (TimestampColumnVector)
column;
- vector.set(rowId, timestamp);
- break;
- }
- case ARRAY:
- {
- ListColumnVector listColumnVector = (ListColumnVector)
column;
- setColumn(rowId, listColumnVector, type, row, columnId);
- break;
- }
- case MAP:
- {
- MapColumnVector mapColumnVector = (MapColumnVector) column;
- setColumn(rowId, mapColumnVector, type, row, columnId);
- break;
- }
- case ROW:
- {
- StructColumnVector structColumnVector =
(StructColumnVector) column;
- setColumn(rowId, structColumnVector, type, row, columnId);
- break;
- }
- default:
- throw new UnsupportedOperationException("Unsupported type: " +
type);
- }
- }
-
- private static void setColumn(
- int rowId,
- ListColumnVector listColumnVector,
- DataType type,
- InternalRow row,
- int columnId) {
- InternalArray arrayData = row.getArray(columnId);
- ArrayType arrayType = (ArrayType) type;
- listColumnVector.lengths[rowId] = arrayData.size();
- listColumnVector.offsets[rowId] = listColumnVector.childCount;
- listColumnVector.childCount += listColumnVector.lengths[rowId];
- ensureSize(
- listColumnVector.child,
- listColumnVector.childCount,
- listColumnVector.offsets[rowId] != 0);
-
- InternalRow convertedRowData = convert(arrayData,
arrayType.getElementType());
- for (int i = 0; i < arrayData.size(); i++) {
- setColumn(
- (int) listColumnVector.offsets[rowId] + i,
- listColumnVector.child,
- arrayType.getElementType(),
- convertedRowData,
- i);
- }
- }
-
- private static void setColumn(
- int rowId,
- MapColumnVector mapColumnVector,
- DataType type,
- InternalRow row,
- int columnId) {
- InternalMap mapData = row.getMap(columnId);
- MapType mapType = (MapType) type;
- InternalArray keyArray = mapData.keyArray();
- InternalArray valueArray = mapData.valueArray();
- mapColumnVector.lengths[rowId] = mapData.size();
- mapColumnVector.offsets[rowId] = mapColumnVector.childCount;
- mapColumnVector.childCount += mapColumnVector.lengths[rowId];
- ensureSize(
- mapColumnVector.keys,
- mapColumnVector.childCount,
- mapColumnVector.offsets[rowId] != 0);
- ensureSize(
- mapColumnVector.values,
- mapColumnVector.childCount,
- mapColumnVector.offsets[rowId] != 0);
-
- InternalRow convertedKeyRowData = convert(keyArray,
mapType.getKeyType());
- InternalRow convertedValueRowData = convert(valueArray,
mapType.getValueType());
- for (int i = 0; i < keyArray.size(); i++) {
- setColumn(
- (int) mapColumnVector.offsets[rowId] + i,
- mapColumnVector.keys,
- mapType.getKeyType(),
- convertedKeyRowData,
- i);
- setColumn(
- (int) mapColumnVector.offsets[rowId] + i,
- mapColumnVector.values,
- mapType.getValueType(),
- convertedValueRowData,
- i);
- }
- }
-
- private static void setColumn(
- int rowId,
- StructColumnVector structColumnVector,
- DataType type,
- InternalRow row,
- int columnId) {
- InternalRow structRow = row.getRow(columnId,
structColumnVector.fields.length);
- RowType rowType = (RowType) type;
- for (int i = 0; i < structRow.getFieldCount(); i++) {
- ColumnVector cv = structColumnVector.fields[i];
- setColumn(rowId, cv, rowType.getTypeAt(i), structRow, i);
- }
- }
-
- private static void ensureSize(ColumnVector cv, int size, boolean
preserveData) {
- int currentLength = cv.isNull.length;
- if (currentLength < size) {
- cv.ensureSize(Math.max(currentLength * 2, size), preserveData);
- }
- }
-
- /**
- * Converting ArrayData to RowData for calling {@link
RowDataVectorizer#setColumn(int,
- * ColumnVector, DataType, InternalRow, int)} recursively with array.
- *
- * @param arrayData input ArrayData.
- * @param arrayFieldType DataType of input ArrayData.
- * @return RowData.
- */
- private static InternalRow convert(InternalArray arrayData, DataType
arrayFieldType) {
- GenericRow rowData = new GenericRow(arrayData.size());
- InternalArray.ElementGetter elementGetter =
- InternalArray.createElementGetter(arrayFieldType);
- for (int i = 0; i < arrayData.size(); i++) {
- rowData.setField(i, elementGetter.getElementOrNull(arrayData, i));
+ ColumnVector fieldColumn = batch.cols[i];
+ if (row.isNullAt(i)) {
+ fieldColumn.noNulls = false;
+ fieldColumn.isNull[rowId] = true;
+ } else {
+ fieldWriters.get(i).write(rowId, fieldColumn, row, i);
+ }
}
- return rowData;
}
}