stevenzwu commented on code in PR #11497: URL: https://github.com/apache/iceberg/pull/11497#discussion_r2072517185
########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + +import java.util.Collections; +import java.util.Set; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; +import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.RewriteFileGroup; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.TaskWriterFactory; +import org.apache.iceberg.flink.source.DataIterator; +import org.apache.iceberg.flink.source.FileScanTaskReader; +import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Executes a rewrite for a single {@link PlannedGroup}. Reads the files with the standard {@link + * FileScanTaskReader}, so the delete files are considered, and writes using the {@link + * TaskWriterFactory}. The output is an {@link ExecutedGroup}. + */ +@Internal +public class DataFileRewriteRunner + extends ProcessFunction<PlannedGroup, DataFileRewriteRunner.ExecutedGroup> { + private static final Logger LOG = LoggerFactory.getLogger(DataFileRewriteRunner.class); + + private final String tableName; + private final String taskName; + private final int taskIndex; + + private transient int subTaskId; + private transient int attemptId; + private transient Counter errorCounter; + + public DataFileRewriteRunner(String tableName, String taskName, int taskIndex) { + Preconditions.checkNotNull(tableName, "Table name should no be null"); + Preconditions.checkNotNull(taskName, "Task name should no be null"); + this.tableName = tableName; + this.taskName = taskName; + this.taskIndex = taskIndex; + } + + @Override + public void open(Configuration parameters) { + this.errorCounter = + TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex) + .counter(TableMaintenanceMetrics.ERROR_COUNTER); + + this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber(); + } + + @Override + public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGroup> out) + throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug( + DataFileRewritePlanner.MESSAGE_PREFIX + "Rewriting files for group {} with {} files", Review Comment: nit: `{} files" -> `files: {}` as the files can be very long string. clearer to put in the very end. ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.api.Test; + +class TestDataFileRewritePlanner extends OperatorTestBase { + @Test + void testUnpartitioned() throws Exception { + Set<DataFile> expected = Sets.newHashSetWithExpectedSize(3); + Table table = createTable(); + insert(table, 1, "a"); + expected.addAll(newDataFiles(table)); + insert(table, 2, "b"); + expected.addAll(newDataFiles(table)); + insert(table, 3, "c"); + expected.addAll(newDataFiles(table)); + + List<DataFileRewritePlanner.PlannedGroup> actual = planDataFileRewrite(tableLoader()); + + assertThat(actual).hasSize(1); + assertRewriteFileGroup(actual.get(0), table, expected); + } + + @Test + void testPartitioned() throws Exception { + Set<DataFile> expectedP1 = Sets.newHashSetWithExpectedSize(2); + Set<DataFile> expectedP2 = Sets.newHashSetWithExpectedSize(2); + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + expectedP1.addAll(newDataFiles(table)); + insertPartitioned(table, 2, "p1"); + expectedP1.addAll(newDataFiles(table)); + + insertPartitioned(table, 3, "p2"); + expectedP2.addAll(newDataFiles(table)); + insertPartitioned(table, 4, "p2"); + expectedP2.addAll(newDataFiles(table)); + + // This should not participate in compaction, as there is no more files in the partition + insertPartitioned(table, 5, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> actual = planDataFileRewrite(tableLoader()); + + assertThat(actual).hasSize(2); + if (actual.get(0).group().info().partition().get(0, String.class).equals("p1")) { + assertRewriteFileGroup(actual.get(0), table, expectedP1); + assertRewriteFileGroup(actual.get(1), table, expectedP2); + } else { + assertRewriteFileGroup(actual.get(0), table, expectedP2); + assertRewriteFileGroup(actual.get(1), table, expectedP1); + } + } + + @Test + void testError() throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + + try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewritePlanner( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader(), + 11, + 1L, + ImmutableMap.of(MIN_INPUT_FILES, "2")))) { + testHarness.open(); + + // Cause an exception + dropTable(); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + trigger(testHarness); + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).hasSize(1); + assertThat( + testHarness + .getSideOutput(TaskResultAggregator.ERROR_STREAM) + .poll() + .getValue() + .getMessage()) + .contains("Table does not exist: "); + } + } + + @Test + void testV2Table() throws Exception { + Table table = createTableWithDelete(); + update(table, 1, null, "a", "b"); + update(table, 1, "b", "c"); + + List<DataFileRewritePlanner.PlannedGroup> actual = planDataFileRewrite(tableLoader()); + + assertThat(actual).hasSize(1); + List<FileScanTask> tasks = actual.get(0).group().fileScanTasks(); + assertThat(tasks).hasSize(2); + // Find the task with the deletes + FileScanTask withDelete = tasks.get(0).deletes().isEmpty() ? tasks.get(1) : tasks.get(0); + assertThat(withDelete.deletes()).hasSize(2); + // Find the equality delete and the positional delete + if (withDelete.deletes().get(0).content() == FileContent.EQUALITY_DELETES) { Review Comment: just extract the content type into an collection and use `containsExactlyInAnyOrderElementsOf` that there should be one position delete and one equality delete type. ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.actions.RewriteDataFiles.TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.executeRewrite; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class TestDataFileRewriteRunner extends OperatorTestBase { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testExecute(boolean partitioned) throws Exception { + Table table; + PartitionData partition; + if (partitioned) { + table = createPartitionedTable(); + partition = new PartitionData(table.spec().partitionType()); + partition.set(0, "p1"); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p1"); + } else { + table = createTable(); + partition = new PartitionData(PartitionSpec.unpartitioned().partitionType()); + insert(table, 1, "p1"); + insert(table, 2, "p1"); + insert(table, 3, "p1"); + } + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned); + assertThat(actual).hasSize(1); + + assertRewriteFileGroup( + actual.get(0), + table, + records( + table.schema(), + ImmutableSet.of( + ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"), ImmutableList.of(3, "p1"))), + 1, + ImmutableSet.of(partition)); + } + + @Test + void testPartitionSpecChange() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + PartitionData oldPartition = new PartitionData(table.spec().partitionType()); + oldPartition.set(0, "p1"); + + try (OneInputStreamOperatorTestHarness< + DataFileRewritePlanner.PlannedGroup, DataFileRewriteRunner.ExecutedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewriteRunner( + OperatorTestBase.DUMMY_TABLE_NAME, OperatorTestBase.DUMMY_TABLE_NAME, 0))) { + testHarness.open(); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + + testHarness.processElement(planned.get(0), System.currentTimeMillis()); + List<DataFileRewriteRunner.ExecutedGroup> actual = testHarness.extractOutputValues(); + assertThat(actual).hasSize(1); + assertRewriteFileGroup( + actual.get(0), + table, + records( + table.schema(), + ImmutableSet.of(ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"))), + 1, + ImmutableSet.of(oldPartition)); + + insertPartitioned(table, 3, "p1"); + + planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + + testHarness.processElement(planned.get(0), System.currentTimeMillis()); + actual = testHarness.extractOutputValues(); + assertThat(actual).hasSize(2); + assertRewriteFileGroup( + actual.get(1), + table, + records( + table.schema(), + ImmutableSet.of( + ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"), ImmutableList.of(3, "p1"))), + 1, + ImmutableSet.of(oldPartition)); + + // Alter the table schema + table.updateSpec().addField("id").commit(); + // Insert some now data + insertFullPartitioned(table, 4, "p1"); + insertFullPartitioned(table, 4, "p1"); + PartitionData newPartition = new PartitionData(table.spec().partitionType()); + newPartition.set(0, "p1"); + newPartition.set(1, 4); + PartitionData[] transformedPartitions = { + newPartition.copy(), newPartition.copy(), newPartition.copy() + }; + transformedPartitions[0].set(1, 1); + transformedPartitions[1].set(1, 2); + transformedPartitions[2].set(1, 3); + table.refresh(); + + planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(2); + DataFileRewritePlanner.PlannedGroup oldCompact = planned.get(0); + DataFileRewritePlanner.PlannedGroup newCompact = planned.get(1); + if (oldCompact.group().inputFileNum() == 2) { + newCompact = planned.get(0); + oldCompact = planned.get(1); + } + + testHarness.processElement(newCompact, System.currentTimeMillis()); + actual = testHarness.extractOutputValues(); + assertThat(actual).hasSize(3); + assertRewriteFileGroup( + actual.get(2), + table, + records( + table.schema(), + ImmutableList.of(ImmutableList.of(4, "p1"), ImmutableList.of(4, "p1"))), + 1, + ImmutableSet.of(newPartition)); + + testHarness.processElement(oldCompact, System.currentTimeMillis()); + actual = testHarness.extractOutputValues(); + assertThat(actual).hasSize(4); + assertRewriteFileGroup( + actual.get(3), + table, + records( + table.schema(), + ImmutableSet.of( + ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"), ImmutableList.of(3, "p1"))), + 3, + Sets.newHashSet(transformedPartitions)); + } + } + + @Test + void testError() throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + + try (OneInputStreamOperatorTestHarness< + DataFileRewritePlanner.PlannedGroup, DataFileRewriteRunner.ExecutedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewriteRunner( + OperatorTestBase.DUMMY_TABLE_NAME, OperatorTestBase.DUMMY_TABLE_NAME, 0))) { + testHarness.open(); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + // Cause an exception + dropTable(); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + testHarness.processElement(planned.get(0), System.currentTimeMillis()); + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).hasSize(1); + assertThat( + testHarness + .getSideOutput(TaskResultAggregator.ERROR_STREAM) + .poll() + .getValue() + .getMessage()) + .contains("File does not exist: "); + } + } + + @Test + void testV2Table() throws Exception { + Table table = createTableWithDelete(); + update(table, 1, null, "a", "b"); + update(table, 1, "b", "c"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + + List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned); + assertThat(actual).hasSize(1); + + assertRewriteFileGroup( + actual.get(0), + table, + records(table.schema(), ImmutableSet.of(ImmutableList.of(1, "c"))), + 1, + ImmutableSet.of(new PartitionData(PartitionSpec.unpartitioned().partitionType()))); + } + + @Test + void testSplitSize() throws Exception { + Table table = createTable(); + + File dataDir = new File(new Path(table.location(), "data").toUri().getPath()); + dataDir.mkdir(); + GenericAppenderHelper dataAppender = + new GenericAppenderHelper(table, FileFormat.PARQUET, dataDir.toPath()); + List<Record> expected = Lists.newArrayListWithExpectedSize(4000); + for (int i = 0; i < 4; ++i) { + List<Record> batch = RandomGenericData.generate(table.schema(), 1000, 10 + i); + dataAppender.appendToTable(batch); + expected.addAll(batch); + } + + // First run with high limit + List<DataFileRewritePlanner.PlannedGroup> planWithNoLimit = planDataFileRewrite(tableLoader()); + assertThat(planWithNoLimit).hasSize(1); + + // Second run with limit + long limit = + planWithNoLimit.get(0).group().fileScanTasks().get(0).sizeBytes() + + planWithNoLimit.get(0).group().fileScanTasks().get(1).sizeBytes(); + List<DataFileRewritePlanner.PlannedGroup> planned; + try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewritePlanner( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader(), + 11, + 10_000_000, + ImmutableMap.of( + MIN_INPUT_FILES, "2", TARGET_FILE_SIZE_BYTES, String.valueOf(limit))))) { + testHarness.open(); + + OperatorTestBase.trigger(testHarness); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + planned = testHarness.extractOutputValues(); + assertThat(planned).hasSize(1); Review Comment: I know `limit` is set to the size of the first 2 files. but there are 4 files in the table, shouldn't planner generate at least 2 groups? I am missing the connection here. similar to another comment. ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.executeRewrite; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.api.Test; + +class TestDataFileRewriteCommitter extends OperatorTestBase { + @Test + void testUnpartitioned() throws Exception { + Table table = createTable(); + insert(table, 1, "p1"); + insert(table, 2, "p2"); + insert(table, 3, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(1); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(rewritten.get(0), EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processWatermark(EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + } + + assertDataFiles( + table, rewritten.get(0).group().addedFiles(), rewritten.get(0).group().rewrittenFiles()); + } + + @Test + void testPartitioned() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(2); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(2); + assertThat(rewritten.get(0).groupsPerCommit()).isEqualTo(1); + assertThat(rewritten.get(1).groupsPerCommit()).isEqualTo(1); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(rewritten.get(0), EVENT_TIME); + // This should be committed synchronously + assertDataFiles( + table, rewritten.get(0).group().addedFiles(), rewritten.get(0).group().rewrittenFiles()); + + testHarness.processElement(rewritten.get(1), EVENT_TIME); + // This should be committed synchronously + assertDataFiles( Review Comment: `assertDataFiles` check the added and delted files from last snapshot. what if the `rewritten.get(0)` and `rewritten.get(1)` are identical? I still think it is more clear to assert the conditions separately. 1. rewritten: directly assert on expected numbers of added and delete files 2. table state: directly assert on how many live files before and after the rewrite. ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.executeRewrite; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.api.Test; + +class TestDataFileRewriteCommitter extends OperatorTestBase { + @Test + void testUnpartitioned() throws Exception { + Table table = createTable(); + insert(table, 1, "p1"); + insert(table, 2, "p2"); + insert(table, 3, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(1); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(rewritten.get(0), EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processWatermark(EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + } + + assertDataFiles( + table, rewritten.get(0).group().addedFiles(), rewritten.get(0).group().rewrittenFiles()); + } + + @Test + void testPartitioned() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(2); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(2); + assertThat(rewritten.get(0).groupsPerCommit()).isEqualTo(1); + assertThat(rewritten.get(1).groupsPerCommit()).isEqualTo(1); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(rewritten.get(0), EVENT_TIME); + // This should be committed synchronously + assertDataFiles( + table, rewritten.get(0).group().addedFiles(), rewritten.get(0).group().rewrittenFiles()); + + testHarness.processElement(rewritten.get(1), EVENT_TIME); + // This should be committed synchronously + assertDataFiles( + table, rewritten.get(1).group().addedFiles(), rewritten.get(1).group().rewrittenFiles()); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processWatermark(EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + } + } + + @Test + void testBatchSize() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + insertPartitioned(table, 5, "p3"); + insertPartitioned(table, 6, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(3); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(3); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(updateBatchSize(rewritten.get(0)), EVENT_TIME); + assertNoChange(table); + testHarness.processElement(updateBatchSize(rewritten.get(1)), EVENT_TIME); Review Comment: @actually, maybe it is cleaner to add a `copyWithBatchSize` method to `ExecutedGroup` class. that will be easier to read. ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.executeRewrite; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.api.Test; + +class TestDataFileRewriteCommitter extends OperatorTestBase { + @Test + void testUnpartitioned() throws Exception { + Table table = createTable(); + insert(table, 1, "p1"); + insert(table, 2, "p2"); + insert(table, 3, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(1); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(rewritten.get(0), EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processWatermark(EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + } + + assertDataFiles( + table, rewritten.get(0).group().addedFiles(), rewritten.get(0).group().rewrittenFiles()); + } + + @Test + void testPartitioned() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(2); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(2); + assertThat(rewritten.get(0).groupsPerCommit()).isEqualTo(1); + assertThat(rewritten.get(1).groupsPerCommit()).isEqualTo(1); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(rewritten.get(0), EVENT_TIME); + // This should be committed synchronously + assertDataFiles( + table, rewritten.get(0).group().addedFiles(), rewritten.get(0).group().rewrittenFiles()); + + testHarness.processElement(rewritten.get(1), EVENT_TIME); + // This should be committed synchronously + assertDataFiles( + table, rewritten.get(1).group().addedFiles(), rewritten.get(1).group().rewrittenFiles()); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processWatermark(EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + } + } + + @Test + void testBatchSize() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + insertPartitioned(table, 5, "p3"); + insertPartitioned(table, 6, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(3); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(3); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(updateBatchSize(rewritten.get(0)), EVENT_TIME); + assertNoChange(table); + testHarness.processElement(updateBatchSize(rewritten.get(1)), EVENT_TIME); Review Comment: it is difficult to read when `updateBatchSize` set the size to 2. either make the name explicitly like `setBatchSizeToTwo` or just move away from that util method. ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.api.Test; + +class TestDataFileRewritePlanner extends OperatorTestBase { + @Test + void testUnpartitioned() throws Exception { + Set<DataFile> expected = Sets.newHashSetWithExpectedSize(3); + Table table = createTable(); + insert(table, 1, "a"); + expected.addAll(newDataFiles(table)); + insert(table, 2, "b"); + expected.addAll(newDataFiles(table)); + insert(table, 3, "c"); + expected.addAll(newDataFiles(table)); + + List<DataFileRewritePlanner.PlannedGroup> actual = planDataFileRewrite(tableLoader()); + + assertThat(actual).hasSize(1); + assertRewriteFileGroup(actual.get(0), table, expected); + } + + @Test + void testPartitioned() throws Exception { + Set<DataFile> expectedP1 = Sets.newHashSetWithExpectedSize(2); + Set<DataFile> expectedP2 = Sets.newHashSetWithExpectedSize(2); + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + expectedP1.addAll(newDataFiles(table)); + insertPartitioned(table, 2, "p1"); + expectedP1.addAll(newDataFiles(table)); + + insertPartitioned(table, 3, "p2"); + expectedP2.addAll(newDataFiles(table)); + insertPartitioned(table, 4, "p2"); + expectedP2.addAll(newDataFiles(table)); + + // This should not participate in compaction, as there is no more files in the partition + insertPartitioned(table, 5, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> actual = planDataFileRewrite(tableLoader()); + + assertThat(actual).hasSize(2); + if (actual.get(0).group().info().partition().get(0, String.class).equals("p1")) { + assertRewriteFileGroup(actual.get(0), table, expectedP1); + assertRewriteFileGroup(actual.get(1), table, expectedP2); + } else { + assertRewriteFileGroup(actual.get(0), table, expectedP2); + assertRewriteFileGroup(actual.get(1), table, expectedP1); + } + } + + @Test + void testError() throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + + try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewritePlanner( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader(), + 11, + 1L, + ImmutableMap.of(MIN_INPUT_FILES, "2")))) { + testHarness.open(); + + // Cause an exception + dropTable(); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + trigger(testHarness); + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).hasSize(1); + assertThat( + testHarness + .getSideOutput(TaskResultAggregator.ERROR_STREAM) + .poll() + .getValue() + .getMessage()) + .contains("Table does not exist: "); + } + } + + @Test + void testV2Table() throws Exception { + Table table = createTableWithDelete(); + update(table, 1, null, "a", "b"); + update(table, 1, "b", "c"); + + List<DataFileRewritePlanner.PlannedGroup> actual = planDataFileRewrite(tableLoader()); + + assertThat(actual).hasSize(1); + List<FileScanTask> tasks = actual.get(0).group().fileScanTasks(); + assertThat(tasks).hasSize(2); + // Find the task with the deletes + FileScanTask withDelete = tasks.get(0).deletes().isEmpty() ? tasks.get(1) : tasks.get(0); + assertThat(withDelete.deletes()).hasSize(2); + // Find the equality delete and the positional delete + if (withDelete.deletes().get(0).content() == FileContent.EQUALITY_DELETES) { + assertThat(withDelete.deletes().get(1).content()).isEqualTo(FileContent.POSITION_DELETES); + } else { + assertThat(withDelete.deletes().get(0).content()).isEqualTo(FileContent.POSITION_DELETES); + assertThat(withDelete.deletes().get(1).content()).isEqualTo(FileContent.EQUALITY_DELETES); + } + } + + @Test + void testMaxRewriteBytes() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + + // First run with high limit + List<DataFileRewritePlanner.PlannedGroup> planWithNoLimit = planDataFileRewrite(tableLoader()); + assertThat(planWithNoLimit).hasSize(2); + + // Second run with limit + long limit = Review Comment: previous `planWithNoLimit` with 10 MB limit has two groups planned. this `limit` is smaller and planner would generate one group. this is the part I didn't quite follow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org