hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write URL: https://github.com/apache/flink/pull/11112#discussion_r382381840
########## File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.vector.ColumnVector; +import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader; +import org.apache.flink.table.runtime.arrow.readers.IntFieldReader; +import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader; +import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader; +import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.ArrowIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.ArrowSmallIntColumnVector; +import org.apache.flink.table.runtime.arrow.vectors.ArrowTinyIntColumnVector; +import org.apache.flink.table.runtime.arrow.writers.BaseRowBigIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowSmallIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BaseRowTinyIntWriter; +import org.apache.flink.table.runtime.arrow.writers.BigIntWriter; +import org.apache.flink.table.runtime.arrow.writers.IntWriter; +import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter; +import org.apache.flink.table.runtime.arrow.writers.TinyIntWriter; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DayTimeIntervalType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.SymbolType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; +import org.apache.flink.table.types.logical.ZonedTimestampType; +import org.apache.flink.types.Row; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Utilities for Arrow. + */ +@Internal +public final class ArrowUtils { + + public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE); + + /** + * Returns the Arrow schema of the specified type. + */ + public static Schema toArrowSchema(RowType rowType) { + Collection<Field> fields = rowType.getFields().stream() + .map(ArrowUtils::toArrowField) + .collect(Collectors.toCollection(ArrayList::new)); + return new Schema(fields); + } + + private static Field toArrowField(RowType.RowField rowField) { + FieldType fieldType = new FieldType( + rowField.getType().isNullable(), + rowField.getType().accept(LogicalTypeToArrowTypeConverter.INSTANCE), + null); + return new Field(rowField.getName(), fieldType, null); + } + + /** + * Creates an {@link ArrowWriter} for the specified {@link VectorSchemaRoot}. + */ + public static ArrowWriter<Row> createArrowWriter(VectorSchemaRoot root) { + ArrowFieldWriter<Row>[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()]; + List<FieldVector> vectors = root.getFieldVectors(); + for (int i = 0; i < vectors.size(); i++) { + FieldVector vector = vectors.get(i); + vector.allocateNew(); + fieldWriters[i] = createArrowFieldWriter(vector); + } + + return new ArrowWriter<>(root, fieldWriters); + } + + private static ArrowFieldWriter<Row> createArrowFieldWriter(FieldVector vector) { + if (vector instanceof TinyIntVector) { + return new TinyIntWriter((TinyIntVector) vector); + } else if (vector instanceof SmallIntVector) { + return new SmallIntWriter((SmallIntVector) vector); + } else if (vector instanceof IntVector) { + return new IntWriter((IntVector) vector); + } else if (vector instanceof BigIntVector) { + return new BigIntWriter((BigIntVector) vector); + } else { + throw new UnsupportedOperationException(String.format( + "Unsupported type %s.", fromArrowField(vector.getField()))); + } + } + + /** + * Creates an {@link ArrowWriter} for blink planner for the specified {@link VectorSchemaRoot}. + */ + public static ArrowWriter<BaseRow> createBlinkArrowWriter(VectorSchemaRoot root) { + ArrowFieldWriter<BaseRow>[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()]; + List<FieldVector> vectors = root.getFieldVectors(); + for (int i = 0; i < vectors.size(); i++) { + FieldVector vector = vectors.get(i); + vector.allocateNew(); + fieldWriters[i] = createBlinkArrowFieldWriter(vector); + } + + return new ArrowWriter<>(root, fieldWriters); + } + + private static ArrowFieldWriter<BaseRow> createBlinkArrowFieldWriter(FieldVector vector) { + if (vector instanceof TinyIntVector) { + return new BaseRowTinyIntWriter((TinyIntVector) vector); + } else if (vector instanceof SmallIntVector) { + return new BaseRowSmallIntWriter((SmallIntVector) vector); + } else if (vector instanceof IntVector) { + return new BaseRowIntWriter((IntVector) vector); + } else if (vector instanceof BigIntVector) { + return new BaseRowBigIntWriter((BigIntVector) vector); + } else { + throw new UnsupportedOperationException(String.format( + "Unsupported type %s.", fromArrowField(vector.getField()))); + } + } + + /** + * Creates an {@link ArrowReader} for the specified {@link VectorSchemaRoot}. + */ + public static RowArrowReader createArrowReader(VectorSchemaRoot root) { + List<ArrowFieldReader> fieldReaders = new ArrayList<>(); + for (FieldVector vector : root.getFieldVectors()) { + fieldReaders.add(createFieldReader(vector)); + } + + return new RowArrowReader(fieldReaders.toArray(new ArrowFieldReader[0])); + } + + private static ArrowFieldReader createFieldReader(FieldVector vector) { Review comment: Rename to `createArrowFieldReader`, making it consistent with `createArrowFieldWriter`. How about rearrange the method name in this class as follows: ``` createArrowWriter -> createRowArrowWriter (corresponding with BaseRow) createArrowFieldWriter-> createRowArrowFieldWriter createBlinkArrowWriter -> createBaseRowArrowWriter (consistent with BaseRowXXXOperator and BaseRowXXXFunction) createBlinkArrowFieldWriter -> createBaseRowArrowFieldWriter createArrowReader -> createRowArrowReader createFieldReader -> createArrowFieldReader createBlinkArrowReader -> createBaseRowArrowReader fromArrowField -> fromArrowFieldToLogicalType (more explicit) fromArrowType -> fromArrowTypeToLogicalType ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
