rdblue commented on a change in pull request #4060: URL: https://github.com/apache/iceberg/pull/4060#discussion_r810168566
########## File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayPoolDataIteratorBatcherRowData.java ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.IOException; +import java.util.List; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SourceReaderOptions; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.flink.source.DataIterator; +import org.apache.iceberg.flink.source.FlinkSplitPlanner; +import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.SplitHelpers; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestRule; + +public class TestArrayPoolDataIteratorBatcherRowData { + + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + public static final HadoopTableResource TABLE_RESOURCE = new HadoopTableResource(TEMPORARY_FOLDER, + TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); + + @ClassRule + public static final TestRule chain = RuleChain + .outerRule(TEMPORARY_FOLDER) + .around(TABLE_RESOURCE); + + private static final ScanContext scanContext = ScanContext.builder().build(); + private static final FileFormat fileFormat = FileFormat.PARQUET; + + private static List<List<Record>> recordBatchList; + private static IcebergSourceSplit icebergSplit; + + @BeforeClass + public static void beforeClass() throws IOException { + GenericAppenderHelper dataAppender = new GenericAppenderHelper( + TABLE_RESOURCE.table(), fileFormat, TEMPORARY_FOLDER); + recordBatchList = Lists.newArrayListWithCapacity(3); + List<DataFile> dataFileList = Lists.newArrayListWithCapacity(2); + for (int i = 0; i < 2; ++i) { + List<Record> records = RandomGenericData.generate(TestFixtures.SCHEMA, 3, i); + recordBatchList.add(records); + DataFile dataFile = dataAppender.writeFile(null, records); + dataFileList.add(dataFile); + dataAppender.appendToTable(dataFile); + } + + List<IcebergSourceSplit> splits = FlinkSplitPlanner + .planIcebergSourceSplits(TABLE_RESOURCE.table(), scanContext); + Assert.assertEquals(1, splits.size()); + Assert.assertEquals(2, splits.get(0).task().files().size()); + icebergSplit = SplitHelpers.sortFilesAsAppendOrder(splits.get(0), dataFileList); + } + + private final DataIteratorBatcher<RowData> batcher; + + public TestArrayPoolDataIteratorBatcherRowData() { + Configuration config = new Configuration(); + config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1); + config.set(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 2); + this.batcher = new ArrayPoolDataIteratorBatcher<>(config, + new RowDataRecordFactory(TestFixtures.ROW_TYPE)); + } + + @Test + public void testBatcher() { Review comment: This test is extremely difficult to read because it uses names like `batch0` and `recordAndPosition00`. It isn't obvious to the reader why the test assertions are correct. It also appears to run an end-to-end test using a real table rather than constructing intermediate objects directly to do a minimal unit test. That leads to more code and more complexity, like the use of `SplitHelpers.sortFilesAsAppendOrder` above. I think this should be rewritten as a unit test rather than an integration test. In addition, this test is very long and only tests small files. I think it would be better to have separate tests for end of file behavior and files with multiple batches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
