rymurr commented on a change in pull request #2286: URL: https://github.com/apache/iceberg/pull/2286#discussion_r614783252
########## File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java ########## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.arrow.vectorized; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import org.apache.arrow.vector.NullCheckingForGet; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Vectorized reader that returns an iterator of {@link ColumnarBatch}. + * See {@link #open(CloseableIterable)} ()} to learn about the + * behavior of the iterator. + * + * <p>The following Iceberg data types are supported and have been tested: + * <ul> + * <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link MinorType#BIT}</li> + * <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link MinorType#INT}</li> + * <li>Iceberg: {@link Types.LongType}, Arrow: {@link MinorType#BIGINT}</li> + * <li>Iceberg: {@link Types.FloatType}, Arrow: {@link MinorType#FLOAT4}</li> + * <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link MinorType#FLOAT8}</li> + * <li>Iceberg: {@link Types.StringType}, Arrow: {@link MinorType#VARCHAR}</li> + * <li>Iceberg: {@link Types.TimestampType} (both with and without timezone), + * Arrow: {@link MinorType#TIMEMICRO}</li> + * <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link MinorType#VARBINARY}</li> + * <li>Iceberg: {@link Types.DateType}, Arrow: {@link MinorType#DATEDAY}</li> + * </ul> + * + * <p>Features that don't work in this implementation: Review comment: can you raise a set of issues to document limitations and follow ups from this PR? Also is this javadoc still up to date? ########## File path: arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java ########## @@ -0,0 +1,904 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.arrow.vectorized; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Types; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.Files.localInput; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link ArrowReader}. + * <p>All tests create a table with monthly partitions and write 1 year of data to the table. + */ +public class ArrowReaderTest { + + private static final int NUM_ROWS_PER_MONTH = 20; + private static final ImmutableList<String> ALL_COLUMNS = + ImmutableList.of( + "timestamp", + "timestamp_nullable", + "boolean", + "boolean_nullable", + "int", + "int_nullable", + "long", + "long_nullable", + "float", + "float_nullable", + "double", + "double_nullable", + "timestamp_tz", + "timestamp_tz_nullable", + "string", + "string_nullable", + "bytes", + "bytes_nullable", + "date", + "date_nullable", + "int_promotion" + ); + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + private HadoopTables tables; + + private String tableLocation; + private List<GenericRecord> rowsWritten; + + /** + * Read all rows and columns from the table without any filter. The test asserts that the Arrow {@link + * VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the vectors + * contains expected values. The test also asserts the total number of rows match the expected value. + */ + @Test + public void testReadAll() throws Exception { + writeTableWithIncrementalRecords(); + Table table = tables.load(tableLocation); + readAndCheckVectorSchemaRoots(table.newScan(), NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ALL_COLUMNS); + } + + /** + * This test writes each partition with constant value rows. The Arrow vectors returned are mostly of type int32 + * which is unexpected. This is happening because of dictionary encoding at the storage level. + * <p> + * Following are the expected and actual Arrow schema: + * <pre> + * Expected Arrow Schema: + * timestamp: Timestamp(MICROSECOND, null) not null, + * timestamp_nullable: Timestamp(MICROSECOND, null), + * boolean: Bool not null, + * boolean_nullable: Bool, + * int: Int(32, true) not null, + * int_nullable: Int(32, true), + * long: Int(64, true) not null, + * long_nullable: Int(64, true), + * float: FloatingPoint(SINGLE) not null, + * float_nullable: FloatingPoint(SINGLE), + * double: FloatingPoint(DOUBLE) not null, + * double_nullable: FloatingPoint(DOUBLE), + * timestamp_tz: Timestamp(MICROSECOND, UTC) not null, + * timestamp_tz_nullable: Timestamp(MICROSECOND, UTC), + * string: Utf8 not null, + * string_nullable: Utf8, + * bytes: Binary not null, + * bytes_nullable: Binary, + * date: Date(DAY) not null, + * date_nullable: Date(DAY), + * int_promotion: Int(32, true) not null + * + * Actual Arrow Schema: + * timestamp: Int(32, true) not null, + * timestamp_nullable: Int(32, true), + * boolean: Bool not null, + * boolean_nullable: Bool, + * int: Int(32, true) not null, + * int_nullable: Int(32, true), + * long: Int(32, true) not null, + * long_nullable: Int(32, true), + * float: Int(32, true) not null, + * float_nullable: Int(32, true), + * double: Int(32, true) not null, + * double_nullable: Int(32, true), + * timestamp_tz: Int(32, true) not null, + * timestamp_tz_nullable: Int(32, true), + * string: Int(32, true) not null, + * string_nullable: Int(32, true), + * bytes: Int(32, true) not null, + * bytes_nullable: Int(32, true), + * date: Date(DAY) not null, + * date_nullable: Date(DAY), + * int_promotion: Int(32, true) not null + * </pre> + * <p> + * TODO: fix the returned Arrow vectors to have vector types consistent with Iceberg types. + * <p> + * Read all rows and columns from the table without any filter. The test asserts that the Arrow {@link + * VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the vectors + * contains expected values. The test also asserts the total number of rows match the expected value. + */ + @Test + @Ignore + public void testReadAllWithConstantRecords() throws Exception { + writeTableWithConstantRecords(); + Table table = tables.load(tableLocation); + readAndCheckVectorSchemaRoots(table.newScan(), NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ALL_COLUMNS); + } + + /** + * Read all rows and columns from the table without any filter. The test uses a batch size smaller than the number of + * rows in a partition. The test asserts that the Arrow {@link VectorSchemaRoot} contains the expected schema and + * expected vector types. Then the test asserts that the vectors contains expected values. The test also asserts the + * total number of rows match the expected value. + */ + @Test + public void testReadAllWithSmallerBatchSize() throws Exception { + writeTableWithIncrementalRecords(); + Table table = tables.load(tableLocation); + TableScan scan = table.newScan(); + readAndCheckVectorSchemaRoots(scan, 10, 12 * NUM_ROWS_PER_MONTH, ALL_COLUMNS); + } + + /** + * Read selected rows and all columns from the table using a time range row filter. The test asserts that the Arrow + * {@link VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the + * vectors contains expected values. The test also asserts the total number of rows match the expected value. + */ + @Test + public void testReadRangeFilter() throws Exception { + writeTableWithIncrementalRecords(); + Table table = tables.load(tableLocation); + LocalDateTime beginTime = LocalDateTime.of(2020, 1, 1, 0, 0, 0); + LocalDateTime endTime = LocalDateTime.of(2020, 2, 1, 0, 0, 0); + TableScan scan = table.newScan() + .filter(Expressions.and( + Expressions.greaterThanOrEqual("timestamp", timestampToMicros(beginTime)), + Expressions.lessThan("timestamp", timestampToMicros(endTime)))); + readAndCheckVectorSchemaRoots(scan, NUM_ROWS_PER_MONTH, NUM_ROWS_PER_MONTH, ALL_COLUMNS); + } + + /** + * Read selected rows and all columns from the table using a time range row filter. + * The test asserts that the result is empty. + */ + @Test + public void testReadRangeFilterEmptyResult() throws Exception { + writeTableWithIncrementalRecords(); + Table table = tables.load(tableLocation); + LocalDateTime beginTime = LocalDateTime.of(2021, 1, 1, 0, 0, 0); + LocalDateTime endTime = LocalDateTime.of(2021, 2, 1, 0, 0, 0); + TableScan scan = table.newScan() + .filter(Expressions.and( + Expressions.greaterThanOrEqual("timestamp", timestampToMicros(beginTime)), + Expressions.lessThan("timestamp", timestampToMicros(endTime)))); + int numRoots = 0; + try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, NUM_ROWS_PER_MONTH, false)) { + for (ColumnarBatch batch : itr) { + numRoots++; + } + } + assertEquals(0, numRoots); + } + + /** + * Read all rows and selected columns from the table with a column selection filter. The test asserts that the Arrow + * {@link VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the + * vectors contains expected values. The test also asserts the total number of rows match the expected value. + */ + @Test + public void testReadColumnFilter1() throws Exception { + writeTableWithIncrementalRecords(); + Table table = tables.load(tableLocation); + TableScan scan = table.newScan() + .select("timestamp", "int", "string"); + readAndCheckVectorSchemaRoots( + scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, + ImmutableList.of("timestamp", "int", "string")); + } + + /** + * Read all rows and a single column from the table with a column selection filter. The test asserts that the Arrow + * {@link VectorSchemaRoot} contains the expected schema and expected vector types. Then the test asserts that the + * vectors contains expected values. The test also asserts the total number of rows match the expected value. + */ + @Test + public void testReadColumnFilter2() throws Exception { + writeTableWithIncrementalRecords(); + Table table = tables.load(tableLocation); + TableScan scan = table.newScan() + .select("timestamp"); + readAndCheckVectorSchemaRoots( + scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, + ImmutableList.of("timestamp")); + } + + private void readAndCheckVectorSchemaRoots( + TableScan scan, + int numRowsPerRoot, + int expectedTotalRows, + List<String> columns) throws IOException { + Set<String> columnSet = ImmutableSet.copyOf(columns); + int rowIndex = 0; + int totalRows = 0; + try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, numRowsPerRoot, false)) { + for (ColumnarBatch batch : itr) { + List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + numRowsPerRoot); + checkColumnarBatch(numRowsPerRoot, expectedRows, batch, columns); Review comment: nit: can you split this up a bit and add some docs as to why the assertion is happening twice? Just a simple javadoc explaining the two use cases is good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
