openinx commented on a change in pull request #2566: URL: https://github.com/apache/iceberg/pull/2566#discussion_r632330112
########## File path: flink/src/main/java/org/apache/iceberg/flink/data/vectorized/ConstantColumnVectors.java ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.vector.BooleanColumnVector; +import org.apache.flink.table.data.vector.BytesColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.DecimalColumnVector; +import org.apache.flink.table.data.vector.DoubleColumnVector; +import org.apache.flink.table.data.vector.FloatColumnVector; +import org.apache.flink.table.data.vector.IntColumnVector; +import org.apache.flink.table.data.vector.LongColumnVector; +import org.apache.flink.table.data.vector.TimestampColumnVector; + +class ConstantColumnVectors { + private ConstantColumnVectors() { + } + + static ColumnVector ints(Object constant) { + return new ConstantIntColumnVector(constant); + } + + static ColumnVector longs(Object constant) { + return new ConstantLongColumnVongector(constant); + } + + static ColumnVector booleans(Object constant) { + return new ConstantBooleanColumnVector(constant); + } + + static ColumnVector doubles(Object constant) { + return new ConstantDoubleColumnVector(constant); + } + + static ColumnVector floats(Object constant) { + return new ConstantFloatColumnVector(constant); + } + + static ColumnVector decimals(Object constant) { + return new ConstantDecimalColumnVector(constant); + } + + static ColumnVector timestamps(Object constant) { + return new ConstantTimestampColumnVector(constant); + } + + static ColumnVector bytes(Object constant) { + return new ConstantBytesColumnVector(constant); + } + + private static class ConstantIntColumnVector implements IntColumnVector { + + private final Object constant; + + private ConstantIntColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + + @Override + public int getInt(int i) { + return (int) constant; + } + } + + private static class ConstantLongColumnVongector implements LongColumnVector { Review comment: Typo ? `ConstantLongColumnVongector` -> `ConstantLongColumnVector` ########## File path: flink/src/main/java/org/apache/iceberg/flink/data/vectorized/VectorizedFlinkOrcReaders.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import java.util.List; +import java.util.Map; +import org.apache.flink.orc.nohive.vector.OrcNoHiveBytesVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDecimalVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDoubleVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveLongVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveTimestampVector; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.ColumnarArrayData; +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ArrayColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.RowColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.OrcBatchReader; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class VectorizedFlinkOrcReaders { + private VectorizedFlinkOrcReaders() { + } + + public static OrcBatchReader<VectorizedColumnBatch> buildReader(Schema expectedSchema, TypeDescription fileSchema, + Map<Integer, ?> idToConstant) { + Converter converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, fileSchema, new ReadBuilder(idToConstant)); + + return new OrcBatchReader<VectorizedColumnBatch>() { + private long batchOffsetInFile; + + @Override + public VectorizedColumnBatch read(VectorizedRowBatch batch) { + FlinkRowColumnVector cv = (FlinkRowColumnVector) converter.convert( + new StructColumnVector(batch.size, batch.cols), batch.size, batchOffsetInFile); + + VectorizedColumnBatch columnarBatch = new VectorizedColumnBatch(cv.getFieldVectors()); + columnarBatch.setNumRows(batch.size); + return columnarBatch; + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + this.batchOffsetInFile = batchOffsetInFile; + } + }; + } + + private interface Converter { + ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector columnVector, int batchSize, + long batchOffsetInFile); + } + + private static class ReadBuilder extends OrcSchemaWithTypeVisitor<Converter> { + private final Map<Integer, ?> idToConstant; + + private ReadBuilder(Map<Integer, ?> idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public Converter record(Types.StructType iStruct, TypeDescription record, List<String> names, + List<Converter> fields) { + return new StructConverter(iStruct, fields, idToConstant); + } + + @Override + public Converter list(Types.ListType iList, TypeDescription array, Converter element) { + return new StructConverter.ArrayConverter(element); + } + + @Override + public Converter map(Types.MapType iMap, TypeDescription map, Converter key, Converter value) { + throw new UnsupportedOperationException("Unsupported vectorized read for map type."); + } + + @Override + public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + return (vector, batchSize, batchOffsetInFile) -> { + if (vector instanceof LongColumnVector) { + return new OrcNoHiveLongVector((LongColumnVector) vector); + } else if (vector instanceof DoubleColumnVector) { + return new OrcNoHiveDoubleVector((DoubleColumnVector) vector); + } else if (vector instanceof BytesColumnVector) { + return new OrcNoHiveBytesVector((BytesColumnVector) vector); + } else if (vector instanceof DecimalColumnVector) { + return new OrcNoHiveDecimalVector((DecimalColumnVector) vector); + } else if (vector instanceof TimestampColumnVector) { + return new OrcNoHiveTimestampVector((TimestampColumnVector) vector); + } else { + throw new UnsupportedOperationException( + "Unsupported vector: " + vector.getClass().getName()); + } + }; + } + } + + private static class RowPositionColumnVector implements org.apache.flink.table.data.vector.LongColumnVector { + private final long batchOffsetInFile; + + RowPositionColumnVector(long batchOffsetInFile) { + this.batchOffsetInFile = batchOffsetInFile; + } + + @Override + public boolean isNullAt(int i) { + return false; + } + + @Override + public long getLong(int i) { + return batchOffsetInFile + i; + } + } + + private static class StructConverter implements Converter { + private final Types.StructType structType; + private final List<Converter> fieldConverters; + private final Map<Integer, ?> idToConstant; + + private StructConverter(Types.StructType structType, List<Converter> fieldConverters, + Map<Integer, ?> idToConstant) { + this.structType = structType; + this.fieldConverters = fieldConverters; + this.idToConstant = idToConstant; + } + + @Override + public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector vector, int batchSize, + long batchOffsetInFile) { + StructColumnVector structVector = (StructColumnVector) vector; + List<Types.NestedField> fields = structType.fields(); + ColumnVector[] fieldVectors = new ColumnVector[fields.size()]; + for (int pos = 0; pos < fields.size(); pos++) { + Types.NestedField field = fields.get(pos); + if (idToConstant.containsKey(field.fieldId())) { + fieldVectors[pos] = toConstantColumnVector(field.type(), idToConstant.get(field.fieldId())); + } else if (field.equals(MetadataColumns.ROW_POSITION)) { + fieldVectors[pos] = new RowPositionColumnVector(batchOffsetInFile); + } else { + fieldVectors[pos] = fieldConverters.get(pos) + .convert(structVector.fields[pos], batchSize, batchOffsetInFile); + } + } + + return new FlinkRowColumnVector(fieldVectors, structVector); + } + + private ColumnVector toConstantColumnVector(Type type, Object constant) { + Type.TypeID typeID = type.typeId(); + switch (typeID) { + case INTEGER: + case DATE: + case TIME: + return ConstantColumnVectors.ints(constant); + + case LONG: + return ConstantColumnVectors.longs(constant); + + case BOOLEAN: + return ConstantColumnVectors.booleans(constant); + + case DOUBLE: + return ConstantColumnVectors.doubles(constant); + + case FLOAT: + return ConstantColumnVectors.floats(constant); + + case DECIMAL: + return ConstantColumnVectors.decimals(constant); + + case TIMESTAMP: + return ConstantColumnVectors.timestamps(constant); + + case FIXED: + case UUID: + case BINARY: Review comment: Be careful that the `FIXED` data type has three constant data type: * byte[] * GenerateData.Fixed * ByteBuffer We will need to make the `ConstantBytesColumnVector` handle all those types ! ########## File path: flink/src/main/java/org/apache/iceberg/flink/data/vectorized/VectorizedFlinkOrcReaders.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import java.util.List; +import java.util.Map; +import org.apache.flink.orc.nohive.vector.OrcNoHiveBytesVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDecimalVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDoubleVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveLongVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveTimestampVector; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.ColumnarArrayData; +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ArrayColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.RowColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.OrcBatchReader; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class VectorizedFlinkOrcReaders { + private VectorizedFlinkOrcReaders() { + } + + public static OrcBatchReader<VectorizedColumnBatch> buildReader(Schema expectedSchema, TypeDescription fileSchema, + Map<Integer, ?> idToConstant) { + Converter converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, fileSchema, new ReadBuilder(idToConstant)); + + return new OrcBatchReader<VectorizedColumnBatch>() { + private long batchOffsetInFile; + + @Override + public VectorizedColumnBatch read(VectorizedRowBatch batch) { + FlinkRowColumnVector cv = (FlinkRowColumnVector) converter.convert( + new StructColumnVector(batch.size, batch.cols), batch.size, batchOffsetInFile); + + VectorizedColumnBatch columnarBatch = new VectorizedColumnBatch(cv.getFieldVectors()); + columnarBatch.setNumRows(batch.size); + return columnarBatch; + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + this.batchOffsetInFile = batchOffsetInFile; + } + }; + } + + private interface Converter { + ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector columnVector, int batchSize, + long batchOffsetInFile); + } + + private static class ReadBuilder extends OrcSchemaWithTypeVisitor<Converter> { + private final Map<Integer, ?> idToConstant; + + private ReadBuilder(Map<Integer, ?> idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public Converter record(Types.StructType iStruct, TypeDescription record, List<String> names, + List<Converter> fields) { + return new StructConverter(iStruct, fields, idToConstant); + } + + @Override + public Converter list(Types.ListType iList, TypeDescription array, Converter element) { + return new StructConverter.ArrayConverter(element); + } + + @Override + public Converter map(Types.MapType iMap, TypeDescription map, Converter key, Converter value) { + throw new UnsupportedOperationException("Unsupported vectorized read for map type."); + } + + @Override + public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + return (vector, batchSize, batchOffsetInFile) -> { + if (vector instanceof LongColumnVector) { + return new OrcNoHiveLongVector((LongColumnVector) vector); + } else if (vector instanceof DoubleColumnVector) { + return new OrcNoHiveDoubleVector((DoubleColumnVector) vector); + } else if (vector instanceof BytesColumnVector) { + return new OrcNoHiveBytesVector((BytesColumnVector) vector); + } else if (vector instanceof DecimalColumnVector) { + return new OrcNoHiveDecimalVector((DecimalColumnVector) vector); + } else if (vector instanceof TimestampColumnVector) { + return new OrcNoHiveTimestampVector((TimestampColumnVector) vector); + } else { + throw new UnsupportedOperationException( + "Unsupported vector: " + vector.getClass().getName()); + } + }; + } + } + + private static class RowPositionColumnVector implements org.apache.flink.table.data.vector.LongColumnVector { + private final long batchOffsetInFile; + + RowPositionColumnVector(long batchOffsetInFile) { + this.batchOffsetInFile = batchOffsetInFile; + } + + @Override + public boolean isNullAt(int i) { + return false; + } + + @Override + public long getLong(int i) { + return batchOffsetInFile + i; + } + } + + private static class StructConverter implements Converter { + private final Types.StructType structType; + private final List<Converter> fieldConverters; + private final Map<Integer, ?> idToConstant; + + private StructConverter(Types.StructType structType, List<Converter> fieldConverters, + Map<Integer, ?> idToConstant) { + this.structType = structType; + this.fieldConverters = fieldConverters; + this.idToConstant = idToConstant; + } + + @Override + public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector vector, int batchSize, + long batchOffsetInFile) { + StructColumnVector structVector = (StructColumnVector) vector; + List<Types.NestedField> fields = structType.fields(); + ColumnVector[] fieldVectors = new ColumnVector[fields.size()]; + for (int pos = 0; pos < fields.size(); pos++) { + Types.NestedField field = fields.get(pos); + if (idToConstant.containsKey(field.fieldId())) { + fieldVectors[pos] = toConstantColumnVector(field.type(), idToConstant.get(field.fieldId())); + } else if (field.equals(MetadataColumns.ROW_POSITION)) { + fieldVectors[pos] = new RowPositionColumnVector(batchOffsetInFile); + } else { + fieldVectors[pos] = fieldConverters.get(pos) + .convert(structVector.fields[pos], batchSize, batchOffsetInFile); + } + } + + return new FlinkRowColumnVector(fieldVectors, structVector); + } + + private ColumnVector toConstantColumnVector(Type type, Object constant) { + Type.TypeID typeID = type.typeId(); + switch (typeID) { + case INTEGER: + case DATE: + case TIME: + return ConstantColumnVectors.ints(constant); + + case LONG: + return ConstantColumnVectors.longs(constant); + + case BOOLEAN: + return ConstantColumnVectors.booleans(constant); + + case DOUBLE: + return ConstantColumnVectors.doubles(constant); + + case FLOAT: + return ConstantColumnVectors.floats(constant); + + case DECIMAL: + return ConstantColumnVectors.decimals(constant); + + case TIMESTAMP: + return ConstantColumnVectors.timestamps(constant); + + case FIXED: + case UUID: + case BINARY: Review comment: I still don't think we could use the `ConstantBytesColumnVector` to read the `BINARY` & `FIXED` data type because its constant object is a [StringData](https://github.com/apache/iceberg/blob/90225d6c9413016d611e2ce5eff37db1bc1b4fc5/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java#L66), we will definitely encounter a class cast exception when cast the `byte[]` to `BinaryStringData` , do we have an unit test to cover this ? ########## File path: flink/src/main/java/org/apache/iceberg/flink/data/vectorized/VectorizedFlinkOrcReaders.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import java.util.List; +import java.util.Map; +import org.apache.flink.orc.nohive.vector.OrcNoHiveBytesVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDecimalVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDoubleVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveLongVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveTimestampVector; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.ColumnarArrayData; +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ArrayColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.RowColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.OrcBatchReader; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class VectorizedFlinkOrcReaders { + private VectorizedFlinkOrcReaders() { + } + + public static OrcBatchReader<VectorizedColumnBatch> buildReader(Schema expectedSchema, TypeDescription fileSchema, + Map<Integer, ?> idToConstant) { + Converter converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, fileSchema, new ReadBuilder(idToConstant)); + + return new OrcBatchReader<VectorizedColumnBatch>() { + private long batchOffsetInFile; + + @Override + public VectorizedColumnBatch read(VectorizedRowBatch batch) { + FlinkRowColumnVector cv = (FlinkRowColumnVector) converter.convert( + new StructColumnVector(batch.size, batch.cols), batch.size, batchOffsetInFile); + + VectorizedColumnBatch columnarBatch = new VectorizedColumnBatch(cv.getFieldVectors()); + columnarBatch.setNumRows(batch.size); + return columnarBatch; + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + this.batchOffsetInFile = batchOffsetInFile; + } + }; + } + + private interface Converter { + ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector columnVector, int batchSize, + long batchOffsetInFile); + } + + private static class ReadBuilder extends OrcSchemaWithTypeVisitor<Converter> { + private final Map<Integer, ?> idToConstant; + + private ReadBuilder(Map<Integer, ?> idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public Converter record(Types.StructType iStruct, TypeDescription record, List<String> names, + List<Converter> fields) { + return new StructConverter(iStruct, fields, idToConstant); + } + + @Override + public Converter list(Types.ListType iList, TypeDescription array, Converter element) { + return new StructConverter.ArrayConverter(element); + } + + @Override + public Converter map(Types.MapType iMap, TypeDescription map, Converter key, Converter value) { + throw new UnsupportedOperationException("Unsupported vectorized read for map type."); + } + + @Override + public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + return (vector, batchSize, batchOffsetInFile) -> { + if (vector instanceof LongColumnVector) { + return new OrcNoHiveLongVector((LongColumnVector) vector); + } else if (vector instanceof DoubleColumnVector) { Review comment: The flink's `OrcNoHiveDoubleVector` have a very strange design, sounds like it could read both `float` & `double` data type from it. I was confused by this name a lot. ########## File path: flink/src/main/java/org/apache/iceberg/flink/data/vectorized/ConstantColumnVectors.java ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.vector.BooleanColumnVector; +import org.apache.flink.table.data.vector.BytesColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.DecimalColumnVector; +import org.apache.flink.table.data.vector.DoubleColumnVector; +import org.apache.flink.table.data.vector.FloatColumnVector; +import org.apache.flink.table.data.vector.IntColumnVector; +import org.apache.flink.table.data.vector.LongColumnVector; +import org.apache.flink.table.data.vector.TimestampColumnVector; + +class ConstantColumnVectors { + private ConstantColumnVectors() { + } + + static ColumnVector ints(Object constant) { + return new ConstantIntColumnVector(constant); + } + + static ColumnVector longs(Object constant) { + return new ConstantLongColumnVongector(constant); + } + + static ColumnVector booleans(Object constant) { + return new ConstantBooleanColumnVector(constant); + } + + static ColumnVector doubles(Object constant) { + return new ConstantDoubleColumnVector(constant); + } + + static ColumnVector floats(Object constant) { + return new ConstantFloatColumnVector(constant); + } + + static ColumnVector decimals(Object constant) { + return new ConstantDecimalColumnVector(constant); + } + + static ColumnVector timestamps(Object constant) { + return new ConstantTimestampColumnVector(constant); + } + + static ColumnVector bytes(Object constant) { + return new ConstantBytesColumnVector(constant); + } + + private static class ConstantIntColumnVector implements IntColumnVector { + + private final Object constant; + + private ConstantIntColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + + @Override + public int getInt(int i) { + return (int) constant; + } + } + + private static class ConstantLongColumnVongector implements LongColumnVector { + + private final Object constant; + + private ConstantLongColumnVongector(Object constant) { + this.constant = constant; + } + + @Override + public long getLong(int i) { + return (long) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantBooleanColumnVector implements BooleanColumnVector { + private final Object constant; + + private ConstantBooleanColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public boolean getBoolean(int i) { + return (boolean) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantDoubleColumnVector implements DoubleColumnVector { + private final Object constant; + + private ConstantDoubleColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + + @Override + public double getDouble(int i) { + return (double) constant; + } + } + + private static class ConstantFloatColumnVector implements FloatColumnVector { + private final Object constant; + + private ConstantFloatColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public float getFloat(int i) { + return (float) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantDecimalColumnVector implements DecimalColumnVector { + private final Object constant; + + private ConstantDecimalColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public DecimalData getDecimal(int i, int precision, int scale) { + return (DecimalData) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantTimestampColumnVector implements TimestampColumnVector { + private final Object constant; + + private ConstantTimestampColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public TimestampData getTimestamp(int i, int precision) { + return (TimestampData) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantBytesColumnVector implements BytesColumnVector { + private final Object constant; + + private ConstantBytesColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public Bytes getBytes(int i) { + BinaryStringData str = (BinaryStringData) constant; + return new Bytes(str.toBytes(), 0, str.getSizeInBytes()); Review comment: The `str.toBytes()` will create a new object byte array each time, for a constant value it's not worth to new byte[] every time. I will suggest to allocate that data bytes in the constructor, and then always return the same instance. ########## File path: flink/src/main/java/org/apache/iceberg/flink/data/vectorized/ConstantColumnVectors.java ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.vector.BooleanColumnVector; +import org.apache.flink.table.data.vector.BytesColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.DecimalColumnVector; +import org.apache.flink.table.data.vector.DoubleColumnVector; +import org.apache.flink.table.data.vector.FloatColumnVector; +import org.apache.flink.table.data.vector.IntColumnVector; +import org.apache.flink.table.data.vector.LongColumnVector; +import org.apache.flink.table.data.vector.TimestampColumnVector; + +class ConstantColumnVectors { + private ConstantColumnVectors() { + } + + static ColumnVector ints(Object constant) { + return new ConstantIntColumnVector(constant); + } + + static ColumnVector longs(Object constant) { + return new ConstantLongColumnVongector(constant); + } + + static ColumnVector booleans(Object constant) { + return new ConstantBooleanColumnVector(constant); + } + + static ColumnVector doubles(Object constant) { + return new ConstantDoubleColumnVector(constant); + } + + static ColumnVector floats(Object constant) { + return new ConstantFloatColumnVector(constant); + } + + static ColumnVector decimals(Object constant) { + return new ConstantDecimalColumnVector(constant); + } + + static ColumnVector timestamps(Object constant) { + return new ConstantTimestampColumnVector(constant); + } + + static ColumnVector bytes(Object constant) { + return new ConstantBytesColumnVector(constant); + } + + private static class ConstantIntColumnVector implements IntColumnVector { + + private final Object constant; + + private ConstantIntColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + + @Override + public int getInt(int i) { + return (int) constant; + } + } + + private static class ConstantLongColumnVongector implements LongColumnVector { + + private final Object constant; + + private ConstantLongColumnVongector(Object constant) { + this.constant = constant; + } + + @Override + public long getLong(int i) { + return (long) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } Review comment: Nit: Could we keep all those ConstantXXColumnVector has the same order for `getLong` method and `isNullAt` method ? I see `ConstantIntColumnVector` put the `isNullAt` ahead of `getInt`, but `ConstantLongColumnVongector` is the reversed order. ########## File path: flink/src/main/java/org/apache/iceberg/flink/data/vectorized/VectorizedFlinkOrcReaders.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import java.util.List; +import java.util.Map; +import org.apache.flink.orc.nohive.vector.OrcNoHiveBytesVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDecimalVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDoubleVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveLongVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveTimestampVector; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.ColumnarArrayData; +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ArrayColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.RowColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.OrcBatchReader; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class VectorizedFlinkOrcReaders { + private VectorizedFlinkOrcReaders() { + } + + public static OrcBatchReader<VectorizedColumnBatch> buildReader(Schema expectedSchema, TypeDescription fileSchema, + Map<Integer, ?> idToConstant) { + Converter converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, fileSchema, new ReadBuilder(idToConstant)); + + return new OrcBatchReader<VectorizedColumnBatch>() { + private long batchOffsetInFile; + + @Override + public VectorizedColumnBatch read(VectorizedRowBatch batch) { + FlinkRowColumnVector cv = (FlinkRowColumnVector) converter.convert( + new StructColumnVector(batch.size, batch.cols), batch.size, batchOffsetInFile); + + VectorizedColumnBatch columnarBatch = new VectorizedColumnBatch(cv.getFieldVectors()); + columnarBatch.setNumRows(batch.size); + return columnarBatch; + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + this.batchOffsetInFile = batchOffsetInFile; + } + }; + } + + private interface Converter { + ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector columnVector, int batchSize, + long batchOffsetInFile); + } + + private static class ReadBuilder extends OrcSchemaWithTypeVisitor<Converter> { + private final Map<Integer, ?> idToConstant; + + private ReadBuilder(Map<Integer, ?> idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public Converter record(Types.StructType iStruct, TypeDescription record, List<String> names, + List<Converter> fields) { + return new StructConverter(iStruct, fields, idToConstant); + } + + @Override + public Converter list(Types.ListType iList, TypeDescription array, Converter element) { + return new StructConverter.ArrayConverter(element); + } + + @Override + public Converter map(Types.MapType iMap, TypeDescription map, Converter key, Converter value) { + throw new UnsupportedOperationException("Unsupported vectorized read for map type."); + } + + @Override + public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + return (vector, batchSize, batchOffsetInFile) -> { + if (vector instanceof LongColumnVector) { + return new OrcNoHiveLongVector((LongColumnVector) vector); + } else if (vector instanceof DoubleColumnVector) { + return new OrcNoHiveDoubleVector((DoubleColumnVector) vector); + } else if (vector instanceof BytesColumnVector) { + return new OrcNoHiveBytesVector((BytesColumnVector) vector); + } else if (vector instanceof DecimalColumnVector) { + return new OrcNoHiveDecimalVector((DecimalColumnVector) vector); + } else if (vector instanceof TimestampColumnVector) { + return new OrcNoHiveTimestampVector((TimestampColumnVector) vector); + } else { + throw new UnsupportedOperationException( + "Unsupported vector: " + vector.getClass().getName()); Review comment: Please add the iceberg `iPrimitive` data type & ORC `primitive` data type in this error message. ########## File path: flink/src/main/java/org/apache/iceberg/flink/data/vectorized/VectorizedFlinkOrcReaders.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import java.util.List; +import java.util.Map; +import org.apache.flink.orc.nohive.vector.OrcNoHiveBytesVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDecimalVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDoubleVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveLongVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveTimestampVector; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.ColumnarArrayData; +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ArrayColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.RowColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.OrcBatchReader; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class VectorizedFlinkOrcReaders { + private VectorizedFlinkOrcReaders() { + } + + public static OrcBatchReader<VectorizedColumnBatch> buildReader(Schema expectedSchema, TypeDescription fileSchema, + Map<Integer, ?> idToConstant) { + Converter converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, fileSchema, new ReadBuilder(idToConstant)); + + return new OrcBatchReader<VectorizedColumnBatch>() { + private long batchOffsetInFile; + + @Override + public VectorizedColumnBatch read(VectorizedRowBatch batch) { + FlinkRowColumnVector cv = (FlinkRowColumnVector) converter.convert( + new StructColumnVector(batch.size, batch.cols), batch.size, batchOffsetInFile); + + VectorizedColumnBatch columnarBatch = new VectorizedColumnBatch(cv.getFieldVectors()); + columnarBatch.setNumRows(batch.size); + return columnarBatch; + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + this.batchOffsetInFile = batchOffsetInFile; + } + }; + } + + private interface Converter { + ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector columnVector, int batchSize, + long batchOffsetInFile); + } + + private static class ReadBuilder extends OrcSchemaWithTypeVisitor<Converter> { + private final Map<Integer, ?> idToConstant; + + private ReadBuilder(Map<Integer, ?> idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public Converter record(Types.StructType iStruct, TypeDescription record, List<String> names, + List<Converter> fields) { + return new StructConverter(iStruct, fields, idToConstant); + } + + @Override + public Converter list(Types.ListType iList, TypeDescription array, Converter element) { + return new StructConverter.ArrayConverter(element); + } + + @Override + public Converter map(Types.MapType iMap, TypeDescription map, Converter key, Converter value) { + throw new UnsupportedOperationException("Unsupported vectorized read for map type."); + } + + @Override + public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + return (vector, batchSize, batchOffsetInFile) -> { + if (vector instanceof LongColumnVector) { + return new OrcNoHiveLongVector((LongColumnVector) vector); + } else if (vector instanceof DoubleColumnVector) { + return new OrcNoHiveDoubleVector((DoubleColumnVector) vector); + } else if (vector instanceof BytesColumnVector) { + return new OrcNoHiveBytesVector((BytesColumnVector) vector); + } else if (vector instanceof DecimalColumnVector) { + return new OrcNoHiveDecimalVector((DecimalColumnVector) vector); + } else if (vector instanceof TimestampColumnVector) { + return new OrcNoHiveTimestampVector((TimestampColumnVector) vector); + } else { + throw new UnsupportedOperationException( + "Unsupported vector: " + vector.getClass().getName()); + } + }; + } + } + + private static class RowPositionColumnVector implements org.apache.flink.table.data.vector.LongColumnVector { + private final long batchOffsetInFile; + + RowPositionColumnVector(long batchOffsetInFile) { + this.batchOffsetInFile = batchOffsetInFile; + } + + @Override + public boolean isNullAt(int i) { + return false; + } + + @Override + public long getLong(int i) { + return batchOffsetInFile + i; + } + } + + private static class StructConverter implements Converter { + private final Types.StructType structType; + private final List<Converter> fieldConverters; + private final Map<Integer, ?> idToConstant; + + private StructConverter(Types.StructType structType, List<Converter> fieldConverters, + Map<Integer, ?> idToConstant) { + this.structType = structType; + this.fieldConverters = fieldConverters; + this.idToConstant = idToConstant; + } + + @Override + public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector vector, int batchSize, + long batchOffsetInFile) { + StructColumnVector structVector = (StructColumnVector) vector; + List<Types.NestedField> fields = structType.fields(); + ColumnVector[] fieldVectors = new ColumnVector[fields.size()]; + for (int pos = 0; pos < fields.size(); pos++) { + Types.NestedField field = fields.get(pos); + if (idToConstant.containsKey(field.fieldId())) { + fieldVectors[pos] = toConstantColumnVector(field.type(), idToConstant.get(field.fieldId())); + } else if (field.equals(MetadataColumns.ROW_POSITION)) { + fieldVectors[pos] = new RowPositionColumnVector(batchOffsetInFile); + } else { + fieldVectors[pos] = fieldConverters.get(pos) + .convert(structVector.fields[pos], batchSize, batchOffsetInFile); + } + } + + return new FlinkRowColumnVector(fieldVectors, structVector); + } + + private ColumnVector toConstantColumnVector(Type type, Object constant) { + Type.TypeID typeID = type.typeId(); + switch (typeID) { + case INTEGER: + case DATE: + case TIME: + return ConstantColumnVectors.ints(constant); + + case LONG: + return ConstantColumnVectors.longs(constant); + + case BOOLEAN: + return ConstantColumnVectors.booleans(constant); + + case DOUBLE: + return ConstantColumnVectors.doubles(constant); + + case FLOAT: + return ConstantColumnVectors.floats(constant); + + case DECIMAL: + return ConstantColumnVectors.decimals(constant); + + case TIMESTAMP: + return ConstantColumnVectors.timestamps(constant); + + case FIXED: + case UUID: + case BINARY: + case STRING: + return ConstantColumnVectors.bytes(constant); + + default: + throw new UnsupportedOperationException("Unsupported data type for constant."); + } + } + + private static class ArrayConverter implements Converter { + private final Converter elementConverter; + + private ArrayConverter(Converter elementConverter) { + this.elementConverter = elementConverter; + } + + @Override + public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector vector, int batchSize, + long batchOffsetInFile) { + ListColumnVector listVector = (ListColumnVector) vector; + ColumnVector elementVector = elementConverter.convert(listVector.child, batchSize, batchOffsetInFile); + + return new ArrayColumnVector() { + @Override + public ArrayData getArray(int rowId) { + int index = getRowIndex(rowId); + return new ColumnarArrayData(elementVector, (int) listVector.offsets[index], + (int) listVector.lengths[index]); + } + + @Override + public boolean isNullAt(int rowId) { + return vector.isNull[getRowIndex(rowId)]; + } + + private int getRowIndex(int rowId) { + return vector.isRepeating ? 0 : rowId; + } + }; + } + } + } + + private static class FlinkRowColumnVector implements RowColumnVector { + + private final ColumnVector[] fieldVectors; + private final StructColumnVector structVector; + private final VectorizedColumnBatch vectorizedColumnBatch; + + FlinkRowColumnVector(ColumnVector[] fieldVectors, + StructColumnVector structVector) { + this.fieldVectors = fieldVectors; + this.structVector = structVector; + vectorizedColumnBatch = new VectorizedColumnBatch(fieldVectors); Review comment: Nit: It will be good if we could algin the code style with the above line : `this.vectorizedColumnBatch=new VectorizedColumnBatch(fieldVectors)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
