openinx commented on a change in pull request #1346: URL: https://github.com/apache/iceberg/pull/1346#discussion_r471985446
########## File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.required; + +@RunWith(Parameterized.class) +public abstract class TestFlinkScan extends AbstractTestBase { + + private static final Schema SCHEMA = new Schema( + required(1, "data", Types.StringType.get()), + required(2, "id", Types.LongType.get()), + required(3, "dt", Types.StringType.get())); + + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + .identity("dt") + .bucket("id", 1) + .build(); + + // before variables + private Configuration conf; + String warehouse; + private HadoopCatalog catalog; + + // parametrized variables + private final FileFormat fileFormat; + + @Parameterized.Parameters + public static Object[] parameters() { + // TODO add orc and parquet + return new Object[] {"avro"}; + } + + TestFlinkScan(String fileFormat) { + this.fileFormat = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH)); + } + + @Before + public void before() throws IOException { + File warehouseFile = TEMPORARY_FOLDER.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + conf = new Configuration(); + warehouse = "file:" + warehouseFile; + catalog = new HadoopCatalog(conf, warehouse); + } + + private List<Row> execute(Table table) throws IOException { + return executeWithOptions(table, null, null, null, null, null, null, null, null); + } + + private List<Row> execute(Table table, List<String> projectFields) throws IOException { + return executeWithOptions(table, projectFields, null, null, null, null, null, null, null); + } + + protected abstract List<Row> executeWithOptions( + Table table, List<String> projectFields, CatalogLoader loader, Long snapshotId, + Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, List<Expression> filters, String sqlFilter) + throws IOException; + + protected abstract void assertResiduals(List<Row> results, List<Record> writeRecords, List<Record> filteredRecords) + throws IOException; + + @Test + public void testUnpartitionedTable() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); + List<Record> expectedRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords); + assertRecords(execute(table), expectedRecords); + } + + @Test + public void testPartitionedTable() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + List<Record> expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( + org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); + assertRecords(execute(table), expectedRecords); + } + + @Test + public void testProjection() throws Exception { Review comment: I'm not quite sure whether flink support complex data type projection, if sure we may need more unit tests to address the projection cases, such as projection by a nested struct, map, list (similar to the spark's `TestReadProjection` ). ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.data.FlinkAvroReader; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PartitionUtil; + +class RowDataIterator extends DataIterator<RowData> { + + private final String nameMapping; + + RowDataIterator(CombinedScanTask task, FileIO fileIo, EncryptionManager encryption, Schema tableSchema, + List<String> projectedFields, String nameMapping) { + super(task, fileIo, encryption, tableSchema, projectedFields); + this.nameMapping = nameMapping; + } + + @Override + protected CloseableIterator<RowData> nextTaskIterator(FileScanTask task) { + // schema or rows returned by readers Review comment: nit: is this comment still valuable ? Seems I did not get the point. ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.DateTimeUtil; + +/** + * Base class of Flink iterators. + * + * @param <T> is the Java class returned by this iterator whose objects contain one or more rows. + */ +abstract class DataIterator<T> implements CloseableIterator<T> { + + private final Iterator<FileScanTask> tasks; + private final FileIO fileIo; + private final EncryptionManager encryption; + private final Schema projectedSchema; + private final int[] fieldsReorder; + + private CloseableIterator<T> currentIterator; + + DataIterator(CombinedScanTask task, FileIO fileIo, EncryptionManager encryption, Schema tableSchema, + List<String> projectedFields) { + this.fileIo = fileIo; + this.tasks = task.files().iterator(); + this.encryption = encryption; + this.currentIterator = CloseableIterator.empty(); + + this.projectedSchema = FlinkSchemaUtil.pruneWithoutReordering(tableSchema, projectedFields); + + // The projected schema is the schema without reordering, but Flink wants its own order, so we need to reorder the + // output row. + List<String> projectedNames = projectedSchema.asStruct().fields().stream() + .map(Types.NestedField::name).collect(Collectors.toList()); + this.fieldsReorder = projectedFields == null ? + null : projectedFields.stream().mapToInt(projectedNames::indexOf).toArray(); + } + + Schema projectedSchema() { + return projectedSchema; + } + + int[] fieldsReorder() { + return fieldsReorder; + } + + InputFile getInputFile(FileScanTask task) { + Preconditions.checkArgument(!task.isDataTask(), "Invalid task type"); + return encryption.decrypt(EncryptedFiles.encryptedInput( + this.fileIo.newInputFile(task.file().path().toString()), + task.file().keyMetadata())); + } + + @Override + public boolean hasNext() { + updateCurrentIterator(); + return currentIterator.hasNext(); + } + + @Override + public T next() { + updateCurrentIterator(); + return currentIterator.next(); + } + + /** + * Updates the current iterator field to ensure that the current Iterator + * is not exhausted. + */ + private void updateCurrentIterator() { + try { + while (!currentIterator.hasNext() && tasks.hasNext()) { + currentIterator.close(); + currentIterator = nextTaskIterator(tasks.next()); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + abstract CloseableIterator<T> nextTaskIterator(FileScanTask scanTask) throws IOException; Review comment: How about renaming this method to `openTaskIterator` ? ########## File path: flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.required; + +@RunWith(Parameterized.class) +public abstract class TestFlinkScan extends AbstractTestBase { + + private static final Schema SCHEMA = new Schema( + required(1, "data", Types.StringType.get()), + required(2, "id", Types.LongType.get()), + required(3, "dt", Types.StringType.get())); + + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + .identity("dt") + .bucket("id", 1) + .build(); + + // before variables + private Configuration conf; + String warehouse; + private HadoopCatalog catalog; + + // parametrized variables + private final FileFormat fileFormat; + + @Parameterized.Parameters + public static Object[] parameters() { + // TODO add orc and parquet + return new Object[] {"avro"}; + } + + TestFlinkScan(String fileFormat) { + this.fileFormat = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH)); + } + + @Before + public void before() throws IOException { + File warehouseFile = TEMPORARY_FOLDER.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + conf = new Configuration(); + warehouse = "file:" + warehouseFile; + catalog = new HadoopCatalog(conf, warehouse); + } + + private List<Row> execute(Table table) throws IOException { + return executeWithOptions(table, null, null, null, null, null, null, null, null); + } + + private List<Row> execute(Table table, List<String> projectFields) throws IOException { + return executeWithOptions(table, projectFields, null, null, null, null, null, null, null); + } + + protected abstract List<Row> executeWithOptions( + Table table, List<String> projectFields, CatalogLoader loader, Long snapshotId, + Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, List<Expression> filters, String sqlFilter) + throws IOException; + + protected abstract void assertResiduals(List<Row> results, List<Record> writeRecords, List<Record> filteredRecords) + throws IOException; + + @Test + public void testUnpartitionedTable() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); + List<Record> expectedRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords); + assertRecords(execute(table), expectedRecords); + } + + @Test + public void testPartitionedTable() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + List<Record> expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( + org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); + assertRecords(execute(table), expectedRecords); + } + + @Test + public void testProjection() throws Exception { Review comment: Another case: Project with a new renamed schema ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
