weiqingy commented on code in PR #2063: URL: https://github.com/apache/auron/pull/2063#discussion_r2879742893
########## auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java: ########## @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +/** Unit tests for {@link FlinkArrowReader}. */ +public class FlinkArrowReaderTest { + + @Test + public void testBooleanVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testBoolean", 0, Long.MAX_VALUE)) { + BitVector bitVector = new BitVector("col", allocator); + bitVector.allocateNew(3); + bitVector.setSafe(0, 1); + bitVector.setNull(1); + bitVector.setSafe(2, 0); + bitVector.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(bitVector)); + RowType rowType = RowType.of(new BooleanType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(3, reader.getRowCount()); + assertTrue(reader.read(0).getBoolean(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertFalse(reader.read(2).getBoolean(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testTinyIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTinyInt", 0, Long.MAX_VALUE)) { + TinyIntVector vec = new TinyIntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 1); + vec.setNull(1); + vec.setSafe(2, -1); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TinyIntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals((byte) 1, reader.read(0).getByte(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals((byte) -1, reader.read(2).getByte(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testSmallIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testSmallInt", 0, Long.MAX_VALUE)) { + SmallIntVector vec = new SmallIntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 100); + vec.setNull(1); + vec.setSafe(2, -100); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new SmallIntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals((short) 100, reader.read(0).getShort(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals((short) -100, reader.read(2).getShort(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testInt", 0, Long.MAX_VALUE)) { + IntVector vec = new IntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 42); + vec.setNull(1); + vec.setSafe(2, Integer.MAX_VALUE); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new IntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(42, reader.read(0).getInt(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(Integer.MAX_VALUE, reader.read(2).getInt(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testBigIntVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testBigInt", 0, Long.MAX_VALUE)) { + BigIntVector vec = new BigIntVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, Long.MAX_VALUE); + vec.setNull(1); + vec.setSafe(2, -1L); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new BigIntType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(Long.MAX_VALUE, reader.read(0).getLong(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(-1L, reader.read(2).getLong(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testFloatVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testFloat", 0, Long.MAX_VALUE)) { + Float4Vector vec = new Float4Vector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 3.14f); + vec.setNull(1); + vec.setSafe(2, Float.NaN); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new FloatType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(3.14f, reader.read(0).getFloat(0), 0.001f); + assertTrue(reader.read(1).isNullAt(0)); + assertTrue(Float.isNaN(reader.read(2).getFloat(0))); + + reader.close(); + root.close(); + } + } + + @Test + public void testDoubleVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDouble", 0, Long.MAX_VALUE)) { + Float8Vector vec = new Float8Vector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 2.718); + vec.setNull(1); + vec.setSafe(2, Double.MAX_VALUE); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new DoubleType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(2.718, reader.read(0).getDouble(0), 0.001); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(Double.MAX_VALUE, reader.read(2).getDouble(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testVarCharVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testVarChar", 0, Long.MAX_VALUE)) { + VarCharVector vec = new VarCharVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, "hello".getBytes(StandardCharsets.UTF_8)); + vec.setNull(1); + vec.setSafe(2, "".getBytes(StandardCharsets.UTF_8)); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new VarCharType(100)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + RowData row0 = reader.read(0); + assertArrayEquals( + "hello".getBytes(StandardCharsets.UTF_8), row0.getString(0).toBytes()); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(0, reader.read(2).getString(0).toBytes().length); + + reader.close(); + root.close(); + } + } + + @Test + public void testVarBinaryVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testVarBinary", 0, Long.MAX_VALUE)) { + VarBinaryVector vec = new VarBinaryVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, new byte[] {0x01, 0x02}); + vec.setNull(1); + vec.setSafe(2, new byte[] {}); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new VarBinaryType(100)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertArrayEquals(new byte[] {0x01, 0x02}, reader.read(0).getBinary(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertArrayEquals(new byte[] {}, reader.read(2).getBinary(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testDecimalVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDecimal", 0, Long.MAX_VALUE)) { + // Compact path: precision 10 (<= 18) + DecimalVector compactVec = new DecimalVector("compact", allocator, 10, 2); + compactVec.allocateNew(2); + compactVec.setSafe(0, new BigDecimal("123.45")); + compactVec.setNull(1); + compactVec.setValueCount(2); + + VectorSchemaRoot compactRoot = new VectorSchemaRoot(Collections.singletonList(compactVec)); + RowType compactType = RowType.of(new DecimalType(10, 2)); + FlinkArrowReader compactReader = FlinkArrowReader.create(compactRoot, compactType); + + DecimalData compactVal = compactReader.read(0).getDecimal(0, 10, 2); + assertEquals(new BigDecimal("123.45"), compactVal.toBigDecimal()); + assertTrue(compactReader.read(1).isNullAt(0)); + + compactReader.close(); + compactRoot.close(); + + // Wide path: precision 20 (> 18) + DecimalVector wideVec = new DecimalVector("wide", allocator, 20, 2); + wideVec.allocateNew(1); + wideVec.setSafe(0, new BigDecimal("123456789012345678.90")); + wideVec.setValueCount(1); + + VectorSchemaRoot wideRoot = new VectorSchemaRoot(Collections.singletonList(wideVec)); + RowType wideType = RowType.of(new DecimalType(20, 2)); + FlinkArrowReader wideReader = FlinkArrowReader.create(wideRoot, wideType); + + DecimalData wideVal = wideReader.read(0).getDecimal(0, 20, 2); + assertEquals(0, new BigDecimal("123456789012345678.90").compareTo(wideVal.toBigDecimal())); + + wideReader.close(); + wideRoot.close(); + } + } + + @Test + public void testDateVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testDate", 0, Long.MAX_VALUE)) { + DateDayVector vec = new DateDayVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 18000); + vec.setNull(1); + vec.setSafe(2, 0); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new DateType()); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + assertEquals(18000, reader.read(0).getInt(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(0, reader.read(2).getInt(0)); + + reader.close(); + root.close(); + } + } + + @Test + public void testTimeVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTime", 0, Long.MAX_VALUE)) { + TimeMicroVector vec = new TimeMicroVector("col", allocator); + vec.allocateNew(3); + vec.setSafe(0, 45_296_000_000L); // 45296000000 micros -> 45296000 millis + vec.setNull(1); + vec.setSafe(2, 0L); + vec.setValueCount(3); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TimeType(6)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + // ArrowTimeColumnVector divides micros by 1000 to get millis + assertEquals(45_296_000, reader.read(0).getInt(0)); + assertTrue(reader.read(1).isNullAt(0)); + assertEquals(0, reader.read(2).getInt(0)); + + reader.close(); + root.close(); + } + } Review Comment: The tests cover microsecond precision because that's what the writer (PR #1930) produces. The reader is the inverse of the writer, which normalizes all TIME/TIMESTAMP to microseconds. Testing other precisions would require vectors that the writer never generates. If the writer's normalization approach changes, both the reader and tests would be updated together. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
