This is an automated email from the ASF dual-hosted git repository. stevenwu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 4bb60aa0bc873c15e4446a0960d381d24fcbe8be Author: Gyula Fora <g_f...@apple.com> AuthorDate: Tue May 6 14:34:00 2025 +0200 Flink: Backport Maintenance - RewriteDataFiles to Flink 1.19 Backports #11497 --- .../flink/maintenance/api/ExpireSnapshots.java | 2 +- .../maintenance/api/MaintenanceTaskBuilder.java | 4 +- .../flink/maintenance/api/RewriteDataFiles.java | 234 ++++++++++++ .../flink/maintenance/api/TableMaintenance.java | 7 +- .../operator/DataFileRewriteCommitter.java | 199 ++++++++++ .../operator/DataFileRewritePlanner.java | 206 ++++++++++ .../operator/DataFileRewriteRunner.java | 253 +++++++++++++ .../maintenance/operator/DeleteFilesProcessor.java | 29 +- .../operator/ExpireSnapshotsProcessor.java | 2 +- .../flink/maintenance/operator/LockRemover.java | 28 +- .../flink/maintenance/operator/LogUtil.java | 26 ++ .../operator/TableMaintenanceMetrics.java | 30 ++ .../maintenance/operator/TaskResultAggregator.java | 101 +++++ .../flink/maintenance/operator/TriggerManager.java | 29 +- .../api/MaintenanceTaskInfraExtension.java | 8 +- .../maintenance/api/MaintenanceTaskTestBase.java | 39 +- .../flink/maintenance/api/TestExpireSnapshots.java | 43 +-- .../flink/maintenance/api/TestMaintenanceE2E.java | 13 + .../maintenance/api/TestRewriteDataFiles.java | 417 +++++++++++++++++++++ .../maintenance/operator/OperatorTestBase.java | 113 ++++++ .../flink/maintenance/operator/RewriteUtil.java | 83 ++++ .../operator/TestDataFileRewriteCommitter.java | 278 ++++++++++++++ .../operator/TestDataFileRewritePlanner.java | 193 ++++++++++ .../operator/TestDataFileRewriteRunner.java | 355 ++++++++++++++++++ .../operator/TestDeleteFilesProcessor.java | 3 +- 25 files changed, 2587 insertions(+), 108 deletions(-) diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java index 9cde5cb173..2bac5163ca 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java @@ -113,7 +113,7 @@ public class ExpireSnapshots { operatorName(DELETE_FILES_OPERATOR_NAME), TypeInformation.of(Void.class), new DeleteFilesProcessor( - index(), taskName(), tableLoader().loadTable(), deleteBatchSize)) + tableLoader().loadTable(), taskName(), index(), deleteBatchSize)) .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix()) .slotSharingGroup(slotSharingGroup()) .setParallelism(parallelism()); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java index 3fc431d025..c884f13b3e 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java @@ -195,9 +195,9 @@ public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder<?> DataStream<TaskResult> append( DataStream<Trigger> sourceStream, - int taskIndex, - String newTaskName, String newTableName, + String newTaskName, + int taskIndex, TableLoader newTableLoader, String defaultUidSuffix, String defaultSlotSharingGroup, diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java new file mode 100644 index 0000000000..b96635e6ab --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.util.Map; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.iceberg.actions.BinPackRewriteFilePlanner; +import org.apache.iceberg.actions.SizeBasedFileRewritePlanner; +import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteCommitter; +import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner; +import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteRunner; +import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Creates the data file rewriter data stream. Which runs a single iteration of the task for every + * {@link Trigger} event. + */ +public class RewriteDataFiles { + static final String PLANNER_TASK_NAME = "RDF Planner"; + static final String REWRITE_TASK_NAME = "Rewrite"; + static final String COMMIT_TASK_NAME = "Rewrite commit"; + static final String AGGREGATOR_TASK_NAME = "Rewrite aggregator"; + + private RewriteDataFiles() {} + + /** Creates the builder for a stream which rewrites data files for the table. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder<RewriteDataFiles.Builder> { + private boolean partialProgressEnabled = false; + private int partialProgressMaxCommits = 10; + private final Map<String, String> rewriteOptions = Maps.newHashMapWithExpectedSize(6); + private long maxRewriteBytes = Long.MAX_VALUE; + + /** + * Allows committing compacted data files in batches. See {@link + * org.apache.iceberg.actions.RewriteDataFiles#PARTIAL_PROGRESS_ENABLED} for more details. + * + * @param newPartialProgressEnabled to enable partial commits + */ + public Builder partialProgressEnabled(boolean newPartialProgressEnabled) { + this.partialProgressEnabled = newPartialProgressEnabled; + return this; + } + + /** + * Configures the size of batches if {@link #partialProgressEnabled}. See {@link + * org.apache.iceberg.actions.RewriteDataFiles#PARTIAL_PROGRESS_MAX_COMMITS} for more details. + * + * @param newPartialProgressMaxCommits to target number of the commits per run + */ + public Builder partialProgressMaxCommits(int newPartialProgressMaxCommits) { + this.partialProgressMaxCommits = newPartialProgressMaxCommits; + return this; + } + + /** + * Configures the maximum byte size of the rewrites for one scheduled compaction. This could be + * used to limit the resources used by the compaction. + * + * @param newMaxRewriteBytes to limit the size of the rewrites + */ + public Builder maxRewriteBytes(long newMaxRewriteBytes) { + this.maxRewriteBytes = newMaxRewriteBytes; + return this; + } + + /** + * Configures the target file size. See {@link + * org.apache.iceberg.actions.RewriteDataFiles#TARGET_FILE_SIZE_BYTES} for more details. + * + * @param targetFileSizeBytes target file size + */ + public Builder targetFileSizeBytes(long targetFileSizeBytes) { + this.rewriteOptions.put( + SizeBasedFileRewritePlanner.TARGET_FILE_SIZE_BYTES, String.valueOf(targetFileSizeBytes)); + return this; + } + + /** + * Configures the min file size considered for rewriting. See {@link + * SizeBasedFileRewritePlanner#MIN_FILE_SIZE_BYTES} for more details. + * + * @param minFileSizeBytes min file size + */ + public Builder minFileSizeBytes(long minFileSizeBytes) { + this.rewriteOptions.put( + SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, String.valueOf(minFileSizeBytes)); + return this; + } + + /** + * Configures the max file size considered for rewriting. See {@link + * SizeBasedFileRewritePlanner#MAX_FILE_SIZE_BYTES} for more details. + * + * @param maxFileSizeBytes max file size + */ + public Builder maxFileSizeBytes(long maxFileSizeBytes) { + this.rewriteOptions.put( + SizeBasedFileRewritePlanner.MAX_FILE_SIZE_BYTES, String.valueOf(maxFileSizeBytes)); + return this; + } + + /** + * Configures the minimum file number after a rewrite is always initiated. See description see + * {@link SizeBasedFileRewritePlanner#MIN_INPUT_FILES} for more details. + * + * @param minInputFiles min file number + */ + public Builder minInputFiles(int minInputFiles) { + this.rewriteOptions.put( + SizeBasedFileRewritePlanner.MIN_INPUT_FILES, String.valueOf(minInputFiles)); + return this; + } + + /** + * Configures the minimum delete file number for a file after a rewrite is always initiated. See + * {@link BinPackRewriteFilePlanner#DELETE_FILE_THRESHOLD} for more details. + * + * @param deleteFileThreshold min delete file number + */ + public Builder deleteFileThreshold(int deleteFileThreshold) { + this.rewriteOptions.put( + BinPackRewriteFilePlanner.DELETE_FILE_THRESHOLD, String.valueOf(deleteFileThreshold)); + return this; + } + + /** + * Overrides other options and forces rewriting of all provided files. + * + * @param rewriteAll enables a full rewrite + */ + public Builder rewriteAll(boolean rewriteAll) { + this.rewriteOptions.put(SizeBasedFileRewritePlanner.REWRITE_ALL, String.valueOf(rewriteAll)); + return this; + } + + /** + * Configures the group size for rewriting. See {@link + * SizeBasedFileRewritePlanner#MAX_FILE_GROUP_SIZE_BYTES} for more details. + * + * @param maxFileGroupSizeBytes file group size for rewrite + */ + public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) { + this.rewriteOptions.put( + SizeBasedFileRewritePlanner.MAX_FILE_GROUP_SIZE_BYTES, + String.valueOf(maxFileGroupSizeBytes)); + return this; + } + + /** + * The input is a {@link DataStream} with {@link Trigger} events and every event should be + * immediately followed by a {@link Watermark} with the same timestamp as the event. + * + * <p>The output is a {@link DataStream} with the {@link TaskResult} of the run followed by the + * {@link Watermark}. + */ + @Override + DataStream<TaskResult> append(DataStream<Trigger> trigger) { + SingleOutputStreamOperator<DataFileRewritePlanner.PlannedGroup> planned = + trigger + .process( + new DataFileRewritePlanner( + tableName(), + taskName(), + index(), + tableLoader(), + partialProgressEnabled ? partialProgressMaxCommits : 1, + maxRewriteBytes, + rewriteOptions)) + .name(operatorName(PLANNER_TASK_NAME)) + .uid(PLANNER_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + + SingleOutputStreamOperator<DataFileRewriteRunner.ExecutedGroup> rewritten = + planned + .rebalance() + .process(new DataFileRewriteRunner(tableName(), taskName(), index())) + .name(operatorName(REWRITE_TASK_NAME)) + .uid(REWRITE_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .setParallelism(parallelism()); + + SingleOutputStreamOperator<Trigger> updated = + rewritten + .transform( + operatorName(COMMIT_TASK_NAME), + TypeInformation.of(Trigger.class), + new DataFileRewriteCommitter(tableName(), taskName(), index(), tableLoader())) + .uid(COMMIT_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + + return trigger + .union(updated) + .connect( + planned + .getSideOutput(TaskResultAggregator.ERROR_STREAM) + .union( + rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM), + updated.getSideOutput(TaskResultAggregator.ERROR_STREAM))) + .transform( + operatorName(AGGREGATOR_TASK_NAME), + TypeInformation.of(TaskResult.class), + new TaskResultAggregator(tableName(), taskName(), index())) + .uid(AGGREGATOR_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + } + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java index 25d0325abb..12230891ac 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -21,6 +21,7 @@ package org.apache.iceberg.flink.maintenance.api; import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.Locale; import java.util.UUID; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; @@ -257,9 +258,9 @@ public class TableMaintenance { DataStream<TaskResult> result = builder.append( filtered, - taskIndex, - taskNames.get(taskIndex), tableName, + taskNames.get(taskIndex), + taskIndex, loader, uidSuffix, slotSharingGroup, @@ -301,7 +302,7 @@ public class TableMaintenance { private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int taskIndex) { return String.format( - "%s [%s]", streamBuilder.getClass().getSimpleName(), String.valueOf(taskIndex)); + Locale.ROOT, "%s [%d]", streamBuilder.getClass().getSimpleName(), taskIndex); } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java new file mode 100644 index 0000000000..135d3d9b42 --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.Set; +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFilesCommitManager; +import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService; +import org.apache.iceberg.actions.RewriteFileGroup; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Commits the rewrite changes using {@link RewriteDataFilesCommitManager}. The input is a {@link + * DataFileRewriteRunner.ExecutedGroup}. Only {@link Watermark} is emitted which is chained to + * {@link TaskResultAggregator} input 1. + */ +@Internal +public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger> + implements OneInputStreamOperator<DataFileRewriteRunner.ExecutedGroup, Trigger> { + private static final Logger LOG = LoggerFactory.getLogger(DataFileRewriteCommitter.class); + + private final String tableName; + private final String taskName; + private final int taskIndex; + private final TableLoader tableLoader; + + private transient Table table; + private transient CommitService commitService; + private transient Counter errorCounter; + private transient Counter addedDataFileNumCounter; + private transient Counter addedDataFileSizeCounter; + private transient Counter removedDataFileNumCounter; + private transient Counter removedDataFileSizeCounter; + + public DataFileRewriteCommitter( + String tableName, String taskName, int taskIndex, TableLoader tableLoader) { + Preconditions.checkNotNull(tableName, "Table name should no be null"); + Preconditions.checkNotNull(taskName, "Task name should no be null"); + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + + this.tableName = tableName; + this.taskName = taskName; + this.taskIndex = taskIndex; + this.tableLoader = tableLoader; + } + + @Override + public void open() throws Exception { + super.open(); + + tableLoader.open(); + this.table = tableLoader.loadTable(); + + MetricGroup taskMetricGroup = + TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex); + this.errorCounter = taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER); + this.addedDataFileNumCounter = + taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC); + this.addedDataFileSizeCounter = + taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC); + this.removedDataFileNumCounter = + taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC); + this.removedDataFileSizeCounter = + taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC); + } + + @Override + public void processElement(StreamRecord<DataFileRewriteRunner.ExecutedGroup> streamRecord) { + DataFileRewriteRunner.ExecutedGroup executedGroup = streamRecord.getValue(); + try { + if (commitService == null) { + // Refresh the table to get the latest snapshot for the committer + table.refresh(); + + FlinkRewriteDataFilesCommitManager commitManager = + new FlinkRewriteDataFilesCommitManager( + table, executedGroup.snapshotId(), streamRecord.getTimestamp()); + this.commitService = commitManager.service(executedGroup.groupsPerCommit()); + commitService.start(); + } + + commitService.offer(executedGroup.group()); + } catch (Exception e) { + LOG.warn( + DataFileRewritePlanner.MESSAGE_PREFIX + "Exception processing {}", + tableName, + taskName, + taskIndex, + streamRecord.getTimestamp(), + executedGroup, + e); + output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e)); + errorCounter.inc(); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + try { + if (commitService != null) { + commitService.close(); + } + + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + "Successfully completed data file compaction", + tableName, + taskName, + taskIndex, + mark.getTimestamp()); + } catch (Exception e) { + LOG.warn( + DataFileRewritePlanner.MESSAGE_PREFIX + "Exception closing commit service", + tableName, + taskName, + taskIndex, + mark.getTimestamp(), + e); + output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e)); + errorCounter.inc(); + } + + // Cleanup + this.commitService = null; + + super.processWatermark(mark); + } + + @Override + public void close() throws IOException { + if (commitService != null) { + commitService.close(); + } + } + + private class FlinkRewriteDataFilesCommitManager extends RewriteDataFilesCommitManager { + private final long timestamp; + + FlinkRewriteDataFilesCommitManager(Table table, long startingSnapshotId, long timestamp) { + super(table, startingSnapshotId); + this.timestamp = timestamp; + } + + @Override + public void commitFileGroups(Set<RewriteFileGroup> fileGroups) { + super.commitFileGroups(fileGroups); + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + "Committed {}", + tableName, + taskName, + taskIndex, + timestamp, + fileGroups); + updateMetrics(fileGroups); + } + + private void updateMetrics(Set<RewriteFileGroup> fileGroups) { + for (RewriteFileGroup fileGroup : fileGroups) { + for (DataFile added : fileGroup.addedFiles()) { + addedDataFileNumCounter.inc(); + addedDataFileSizeCounter.inc(added.fileSizeInBytes()); + } + + for (DataFile rewritten : fileGroup.rewrittenFiles()) { + removedDataFileNumCounter.inc(); + removedDataFileSizeCounter.inc(rewritten.fileSizeInBytes()); + } + } + } + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java new file mode 100644 index 0000000000..e36c9c6610 --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.math.RoundingMode; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.actions.BinPackRewriteFilePlanner; +import org.apache.iceberg.actions.FileRewritePlan; +import org.apache.iceberg.actions.RewriteDataFiles; +import org.apache.iceberg.actions.RewriteFileGroup; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.math.IntMath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Plans the rewrite groups using the {@link BinPackRewriteFilePlanner}. The input is the {@link + * Trigger}, the output is zero, or some {@link PlannedGroup}s. + */ +@Internal +public class DataFileRewritePlanner + extends ProcessFunction<Trigger, DataFileRewritePlanner.PlannedGroup> { + static final String MESSAGE_PREFIX = "[For table {} with {}[{}] at {}]: "; + private static final Logger LOG = LoggerFactory.getLogger(DataFileRewritePlanner.class); + + private final String tableName; + private final String taskName; + private final int taskIndex; + private final TableLoader tableLoader; + private final int partialProgressMaxCommits; + private final long maxRewriteBytes; + private final Map<String, String> rewriterOptions; + private transient Counter errorCounter; + + public DataFileRewritePlanner( + String tableName, + String taskName, + int taskIndex, + TableLoader tableLoader, + int newPartialProgressMaxCommits, + long maxRewriteBytes, + Map<String, String> rewriterOptions) { + Preconditions.checkNotNull(tableName, "Table name should no be null"); + Preconditions.checkNotNull(taskName, "Task name should no be null"); + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(rewriterOptions, "Options map should no be null"); + + this.tableName = tableName; + this.taskName = taskName; + this.taskIndex = taskIndex; + this.tableLoader = tableLoader; + this.partialProgressMaxCommits = newPartialProgressMaxCommits; + this.maxRewriteBytes = maxRewriteBytes; + this.rewriterOptions = rewriterOptions; + } + + @Override + public void open(Configuration parameters) throws Exception { + tableLoader.open(); + this.errorCounter = + TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex) + .counter(TableMaintenanceMetrics.ERROR_COUNTER); + } + + @Override + public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> out) + throws Exception { + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + "Creating rewrite plan", + tableName, + taskName, + taskIndex, + ctx.timestamp()); + try { + SerializableTable table = + (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()); + if (table.currentSnapshot() == null) { + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an empty table", + tableName, + taskName, + taskIndex, + ctx.timestamp()); + return; + } + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriterOptions); + + FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> + plan = planner.plan(); + + long rewriteBytes = 0; + List<RewriteFileGroup> groups = Lists.newArrayList(); + for (CloseableIterator<RewriteFileGroup> groupIterator = plan.groups().iterator(); + groupIterator.hasNext(); ) { + RewriteFileGroup group = groupIterator.next(); + if (rewriteBytes + group.inputFilesSizeInBytes() > maxRewriteBytes) { + // Keep going, maybe some other group might fit in + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + + "Skipping group as max rewrite size reached {}", + tableName, + taskName, + taskIndex, + ctx.timestamp(), + group); + } else { + rewriteBytes += group.inputFilesSizeInBytes(); + groups.add(group); + } + } + + int groupsPerCommit = + IntMath.divide(groups.size(), partialProgressMaxCommits, RoundingMode.CEILING); + + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + "Rewrite plan created {}", + tableName, + taskName, + taskIndex, + ctx.timestamp(), + groups); + + for (RewriteFileGroup group : groups) { + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + "Emitting {}", + tableName, + taskName, + taskIndex, + ctx.timestamp(), + group); + out.collect(new PlannedGroup(table, groupsPerCommit, group)); + } + } catch (Exception e) { + LOG.warn( + DataFileRewritePlanner.MESSAGE_PREFIX + "Failed to plan data file rewrite groups", + tableName, + taskName, + taskIndex, + ctx.timestamp(), + e); + ctx.output(TaskResultAggregator.ERROR_STREAM, e); + errorCounter.inc(); + } + } + + @Override + public void close() throws Exception { + super.close(); + tableLoader.close(); + } + + public static class PlannedGroup { + private final SerializableTable table; + private final int groupsPerCommit; + private final RewriteFileGroup group; + + private PlannedGroup(SerializableTable table, int groupsPerCommit, RewriteFileGroup group) { + this.table = table; + this.groupsPerCommit = groupsPerCommit; + this.group = group; + } + + SerializableTable table() { + return table; + } + + int groupsPerCommit() { + return groupsPerCommit; + } + + RewriteFileGroup group() { + return group; + } + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java new file mode 100644 index 0000000000..c03b5cc1c8 --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + +import java.util.Collections; +import java.util.Set; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; +import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.RewriteFileGroup; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.TaskWriterFactory; +import org.apache.iceberg.flink.source.DataIterator; +import org.apache.iceberg.flink.source.FileScanTaskReader; +import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Executes a rewrite for a single {@link PlannedGroup}. Reads the files with the standard {@link + * FileScanTaskReader}, so the delete files are considered, and writes using the {@link + * TaskWriterFactory}. The output is an {@link ExecutedGroup}. + */ +@Internal +public class DataFileRewriteRunner + extends ProcessFunction<PlannedGroup, DataFileRewriteRunner.ExecutedGroup> { + private static final Logger LOG = LoggerFactory.getLogger(DataFileRewriteRunner.class); + + private final String tableName; + private final String taskName; + private final int taskIndex; + + private transient int subTaskId; + private transient int attemptId; + private transient Counter errorCounter; + + public DataFileRewriteRunner(String tableName, String taskName, int taskIndex) { + Preconditions.checkNotNull(tableName, "Table name should no be null"); + Preconditions.checkNotNull(taskName, "Task name should no be null"); + this.tableName = tableName; + this.taskName = taskName; + this.taskIndex = taskIndex; + } + + @Override + public void open(Configuration parameters) { + this.errorCounter = + TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex) + .counter(TableMaintenanceMetrics.ERROR_COUNTER); + + this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber(); + } + + @Override + public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGroup> out) + throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug( + DataFileRewritePlanner.MESSAGE_PREFIX + "Rewriting files for group {} with files: {}", + tableName, + taskName, + taskIndex, + ctx.timestamp(), + value.group().info(), + value.group().rewrittenFiles()); + } else { + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + + "Rewriting files for group {} with {} number of files", + tableName, + taskName, + taskIndex, + ctx.timestamp(), + value.group().info(), + value.group().rewrittenFiles().size()); + } + + try (TaskWriter<RowData> writer = writerFor(value)) { + try (DataIterator<RowData> iterator = readerFor(value)) { + while (iterator.hasNext()) { + writer.write(iterator.next()); + } + + Set<DataFile> dataFiles = Sets.newHashSet(writer.dataFiles()); + value.group().setOutputFiles(dataFiles); + out.collect( + new ExecutedGroup( + value.table().currentSnapshot().snapshotId(), + value.groupsPerCommit(), + value.group())); + if (LOG.isDebugEnabled()) { + LOG.debug( + DataFileRewritePlanner.MESSAGE_PREFIX + "Rewritten files {} from {} to {}", + tableName, + taskName, + taskIndex, + ctx.timestamp(), + value.group().info(), + value.group().rewrittenFiles(), + value.group().addedFiles()); + } else { + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + "Rewritten {} files to {} files", + tableName, + taskName, + taskIndex, + ctx.timestamp(), + value.group().rewrittenFiles().size(), + value.group().addedFiles().size()); + } + } catch (Exception ex) { + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + "Exception rewriting datafile group {}", + tableName, + taskName, + taskIndex, + ctx.timestamp(), + value.group(), + ex); + ctx.output(TaskResultAggregator.ERROR_STREAM, ex); + errorCounter.inc(); + abort(writer, ctx.timestamp()); + } + } catch (Exception ex) { + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + + "Exception creating compaction writer for group {}", + tableName, + taskName, + taskIndex, + ctx.timestamp(), + value.group(), + ex); + ctx.output(TaskResultAggregator.ERROR_STREAM, ex); + errorCounter.inc(); + } + } + + private TaskWriter<RowData> writerFor(PlannedGroup value) { + String formatString = + PropertyUtil.propertyAsString( + value.table().properties(), + TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + RowDataTaskWriterFactory factory = + new RowDataTaskWriterFactory( + value.table(), + FlinkSchemaUtil.convert(value.table().schema()), + value.group().inputSplitSize(), + FileFormat.fromString(formatString), + value.table().properties(), + null, + false); + factory.initialize(subTaskId, attemptId); + return factory.create(); + } + + private DataIterator<RowData> readerFor(PlannedGroup value) { + RowDataFileScanTaskReader reader = + new RowDataFileScanTaskReader( + value.table().schema(), + value.table().schema(), + PropertyUtil.propertyAsString(value.table().properties(), DEFAULT_NAME_MAPPING, null), + false, + Collections.emptyList()); + return new DataIterator<>( + reader, + new BaseCombinedScanTask(value.group().fileScanTasks()), + value.table().io(), + value.table().encryption()); + } + + private void abort(TaskWriter<RowData> writer, long timestamp) { + try { + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + + "Aborting rewrite for (subTaskId {}, attemptId {})", + tableName, + taskName, + taskIndex, + timestamp, + subTaskId, + attemptId); + writer.abort(); + } catch (Exception e) { + LOG.info( + DataFileRewritePlanner.MESSAGE_PREFIX + "Exception in abort", + tableName, + taskName, + taskIndex, + timestamp, + e); + } + } + + public static class ExecutedGroup { + private final long snapshotId; + private final int groupsPerCommit; + private final RewriteFileGroup group; + + @VisibleForTesting + ExecutedGroup(long snapshotId, int groupsPerCommit, RewriteFileGroup group) { + this.snapshotId = snapshotId; + this.groupsPerCommit = groupsPerCommit; + this.group = group; + } + + long snapshotId() { + return snapshotId; + } + + int groupsPerCommit() { + return groupsPerCommit; + } + + RewriteFileGroup group() { + return group; + } + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java index dc7846c4c4..9189f5f018 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java @@ -21,6 +21,7 @@ package org.apache.iceberg.flink.maintenance.operator; import java.util.Set; import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -40,17 +41,17 @@ public class DeleteFilesProcessor extends AbstractStreamOperator<Void> implements OneInputStreamOperator<String, Void> { private static final Logger LOG = LoggerFactory.getLogger(DeleteFilesProcessor.class); - private final String taskIndex; + private final String tableName; private final String taskName; + private final int taskIndex; private final SupportsBulkOperations io; - private final String tableName; private final Set<String> filesToDelete = Sets.newHashSet(); private final int batchSize; private transient Counter failedCounter; private transient Counter succeededCounter; - public DeleteFilesProcessor(int taskIndex, String taskName, Table table, int batchSize) { + public DeleteFilesProcessor(Table table, String taskName, int taskIndex, int batchSize) { Preconditions.checkNotNull(taskName, "Task name should no be null"); Preconditions.checkNotNull(table, "Table should no be null"); @@ -60,31 +61,21 @@ public class DeleteFilesProcessor extends AbstractStreamOperator<Void> "%s doesn't support bulk delete", fileIO.getClass().getSimpleName()); - this.taskIndex = String.valueOf(taskIndex); + this.tableName = table.name(); this.taskName = taskName; + this.taskIndex = taskIndex; this.io = (SupportsBulkOperations) fileIO; - this.tableName = table.name(); this.batchSize = batchSize; } @Override public void open() throws Exception { + MetricGroup taskMetricGroup = + TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex); this.failedCounter = - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY) - .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) - .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName) - .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, taskIndex) - .counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER); + taskMetricGroup.counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER); this.succeededCounter = - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY) - .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) - .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName) - .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, taskIndex) - .counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER); + taskMetricGroup.counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER); } @Override diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java index a09d0244e9..098d32e3b6 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java @@ -74,7 +74,7 @@ public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResul this.table = tableLoader.loadTable(); this.plannerPool = plannerPoolSize != null - ? ThreadPools.newWorkerPool(table.name() + "-table--planner", plannerPoolSize) + ? ThreadPools.newFixedThreadPool(table.name() + "-table--planner", plannerPoolSize) : ThreadPools.getWorkerPool(); } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java index 49fa69b487..2066ca8e01 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -98,31 +99,16 @@ public class LockRemover extends AbstractStreamOperator<Void> this.failedTaskResultCounters = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); this.taskLastRunDurationMs = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); ++taskIndex) { + MetricGroup taskMetricGroup = + TableMaintenanceMetrics.groupFor( + getRuntimeContext(), tableName, maintenanceTaskNames.get(taskIndex), taskIndex); succeededTaskResultCounters.add( - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY) - .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) - .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) - .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) - .counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER)); + taskMetricGroup.counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER)); failedTaskResultCounters.add( - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY) - .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) - .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) - .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) - .counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER)); + taskMetricGroup.counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER)); AtomicLong duration = new AtomicLong(0); taskLastRunDurationMs.add(duration); - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY) - .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) - .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) - .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) - .gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, duration::get); + taskMetricGroup.gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, duration::get); } lockFactory.open(); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LogUtil.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LogUtil.java new file mode 100644 index 0000000000..8bdcd7ba2b --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LogUtil.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +class LogUtil { + static final String MESSAGE_PREFIX = "[For table {} with {}[{}] at {}]: "; + static final String MESSAGE_FORMAT_PREFIX = "[For table %s with {%s}[{%d}] at {%d}]: "; + + private LogUtil() {} +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java index 6147c3a5fd..897760caaa 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java @@ -18,12 +18,18 @@ */ package org.apache.iceberg.flink.maintenance.operator; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.MetricGroup; + public class TableMaintenanceMetrics { public static final String GROUP_KEY = "maintenance"; public static final String TASK_NAME_KEY = "taskName"; public static final String TASK_INDEX_KEY = "taskIndex"; public static final String TABLE_NAME_KEY = "tableName"; + // Operator error counter + public static final String ERROR_COUNTER = "error"; + // TriggerManager metrics public static final String RATE_LIMITER_TRIGGERED = "rateLimiterTriggered"; public static final String CONCURRENT_RUN_THROTTLED = "concurrentRunThrottled"; @@ -39,6 +45,30 @@ public class TableMaintenanceMetrics { public static final String DELETE_FILE_FAILED_COUNTER = "deleteFailed"; public static final String DELETE_FILE_SUCCEEDED_COUNTER = "deleteSucceeded"; + // DataFileUpdater metrics + public static final String ADDED_DATA_FILE_NUM_METRIC = "addedDataFileNum"; + public static final String ADDED_DATA_FILE_SIZE_METRIC = "addedDataFileSize"; + public static final String REMOVED_DATA_FILE_NUM_METRIC = "removedDataFileNum"; + public static final String REMOVED_DATA_FILE_SIZE_METRIC = "removedDataFileSize"; + + static MetricGroup groupFor( + RuntimeContext context, String tableName, String taskName, int taskIndex) { + return groupFor(groupFor(context, tableName), taskName, taskIndex); + } + + static MetricGroup groupFor(RuntimeContext context, String tableName) { + return context + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName); + } + + static MetricGroup groupFor(MetricGroup mainGroup, String taskName, int taskIndex) { + return mainGroup + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)); + } + private TableMaintenanceMetrics() { // do not instantiate } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java new file mode 100644 index 0000000000..cceb043a26 --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.flink.maintenance.api.TaskResult; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Aggregates results of the operators for a given maintenance task. + * + * <ul> + * <li>Input 1 is used: + * <ul> + * <li>To provide the {@link TaskResult#startEpoch()} - should be chained to the task input + * <li>To mark that the task is finished - should be chained at the end of the task, so an + * incoming watermark will signal that the task is finished + * </ul> + * <li>Input 2 expects an {@link Exception} which caused the failure - should be chained to the + * {@link #ERROR_STREAM} of the operators + * </ul> + * + * The operator emits a {@link TaskResult} with the overall result on {@link Watermark}. + */ +@Internal +public class TaskResultAggregator extends AbstractStreamOperator<TaskResult> + implements TwoInputStreamOperator<Trigger, Exception, TaskResult> { + public static final OutputTag<Exception> ERROR_STREAM = + new OutputTag<>("error-stream", TypeInformation.of(Exception.class)); + + private static final Logger LOG = LoggerFactory.getLogger(TaskResultAggregator.class); + + private final String tableName; + private final String taskName; + private final int taskIndex; + private final List<Exception> exceptions; + private transient Long startTime; + + public TaskResultAggregator(String tableName, String taskName, int taskIndex) { + Preconditions.checkNotNull(tableName, "Table name should no be null"); + Preconditions.checkNotNull(taskName, "Task name should no be null"); + + this.tableName = tableName; + this.taskName = taskName; + this.taskIndex = taskIndex; + this.exceptions = Lists.newArrayList(); + this.startTime = 0L; + } + + @Override + public void processElement1(StreamRecord<Trigger> streamRecord) { + startTime = streamRecord.getValue().timestamp(); + } + + @Override + public void processElement2(StreamRecord<Exception> streamRecord) { + Preconditions.checkNotNull(streamRecord.getValue(), "Exception could not be `null`."); + exceptions.add(streamRecord.getValue()); + } + + @Override + public void processWatermark(Watermark mark) { + TaskResult response = new TaskResult(taskIndex, startTime, exceptions.isEmpty(), exceptions); + output.collect(new StreamRecord<>(response)); + LOG.info( + "Aggregated result for table {}, task {}[{}] is {}", + tableName, + taskName, + taskIndex, + response); + exceptions.clear(); + startTime = 0L; + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java index d6d1ebe397..bd8424d726 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.TimerService; @@ -121,33 +122,17 @@ public class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, T @Override public void open(Configuration parameters) throws Exception { + MetricGroup mainGroup = TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName); this.rateLimiterTriggeredCounter = - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY) - .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) - .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + mainGroup.counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); this.concurrentRunThrottledCounter = - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY) - .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) - .counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED); - this.nothingToTriggerCounter = - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY) - .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) - .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + mainGroup.counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED); + this.nothingToTriggerCounter = mainGroup.counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); this.triggerCounters = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); ++taskIndex) { triggerCounters.add( - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY) - .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) - .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) - .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) + TableMaintenanceMetrics.groupFor( + mainGroup, maintenanceTaskNames.get(taskIndex), taskIndex) .counter(TableMaintenanceMetrics.TRIGGERED)); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java index 1b69be1fa3..10efb9120c 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java @@ -48,16 +48,16 @@ class MaintenanceTaskInfraExtension implements BeforeEachCallback { @Override public void beforeEach(ExtensionContext context) { - env = StreamExecutionEnvironment.getExecutionEnvironment(); - source = new ManualSource<>(env, TypeInformation.of(Trigger.class)); + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + this.source = new ManualSource<>(env, TypeInformation.of(Trigger.class)); // Adds the watermark to mimic the behaviour expected for the input of the maintenance tasks - triggerStream = + this.triggerStream = source .dataStream() .assignTimestampsAndWatermarks(new TableMaintenance.PunctuatedWatermarkStrategy()) .name(IGNORED_OPERATOR_NAME) .forceNonParallel(); - sink = new CollectingSink<>(); + this.sink = new CollectingSink<>(); } StreamExecutionEnvironment env() { diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java index 36041d9c38..6d3a10c4b1 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java @@ -37,16 +37,42 @@ class MaintenanceTaskTestBase extends OperatorTestBase { @RegisterExtension MaintenanceTaskInfraExtension infra = new MaintenanceTaskInfraExtension(); void runAndWaitForSuccess( + StreamExecutionEnvironment env, + ManualSource<Trigger> triggerSource, + CollectingSink<TaskResult> collectingSink) + throws Exception { + runAndWaitForResult(env, triggerSource, collectingSink, false, () -> true); + } + + void runAndWaitForSuccess( + StreamExecutionEnvironment env, + ManualSource<Trigger> triggerSource, + CollectingSink<TaskResult> collectingSink, + Supplier<Boolean> waitForCondition) + throws Exception { + runAndWaitForResult(env, triggerSource, collectingSink, false, waitForCondition); + } + + void runAndWaitForFailure( + StreamExecutionEnvironment env, + ManualSource<Trigger> triggerSource, + CollectingSink<TaskResult> collectingSink) + throws Exception { + runAndWaitForResult(env, triggerSource, collectingSink, true, () -> true); + } + + void runAndWaitForResult( StreamExecutionEnvironment env, ManualSource<Trigger> triggerSource, CollectingSink<TaskResult> collectingSink, + boolean generateFailure, Supplier<Boolean> waitForCondition) throws Exception { JobClient jobClient = null; try { jobClient = env.executeAsync(); - // Do a single task run + // Do a single successful task run long time = System.currentTimeMillis(); triggerSource.sendRecord(Trigger.create(time, TESTING_TASK_ID), time); @@ -56,6 +82,17 @@ class MaintenanceTaskTestBase extends OperatorTestBase { assertThat(result.success()).isTrue(); assertThat(result.taskIndex()).isEqualTo(TESTING_TASK_ID); + if (generateFailure) { + dropTable(); + time = System.currentTimeMillis(); + triggerSource.sendRecord(Trigger.create(time, TESTING_TASK_ID), time); + result = collectingSink.poll(POLL_DURATION); + + assertThat(result.startEpoch()).isEqualTo(time); + assertThat(result.success()).isFalse(); + assertThat(result.taskIndex()).isEqualTo(TESTING_TASK_ID); + } + Awaitility.await().until(waitForCondition::get); } finally { closeJobClient(jobClient); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java index f80129f966..b8aa259e2f 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java @@ -27,7 +27,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.time.Duration; import java.util.List; import java.util.Set; -import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -69,9 +68,9 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase { .uidSuffix(UID_SUFFIX) .append( infra.triggerStream(), - 0, - DUMMY_TASK_NAME, DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + 0, tableLoader(), "OTHER", StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, @@ -101,36 +100,16 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase { ExpireSnapshots.builder() .append( infra.triggerStream(), - 0, - DUMMY_TASK_NAME, DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + 0, tableLoader(), UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 1) .sinkTo(infra.sink()); - JobClient jobClient = null; - try { - jobClient = infra.env().executeAsync(); - - // Do a single task run - long time = System.currentTimeMillis(); - infra.source().sendRecord(Trigger.create(time, 1), time); - - // First successful run (ensure that the operators are loaded/opened etc.) - assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue(); - - // Drop the table, so it will cause an exception - dropTable(); - - // Failed run - infra.source().sendRecord(Trigger.create(time + 1, 1), time + 1); - - assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isFalse(); - } finally { - closeJobClient(jobClient); - } + runAndWaitForFailure(infra.env(), infra.source(), infra.sink()); // Check the metrics. There are no expired snapshots or data files because ExpireSnapshots has // no max age of number of snapshots set, so no files are removed. @@ -162,9 +141,9 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase { .uidSuffix(UID_SUFFIX) .append( infra.triggerStream(), - 0, - DUMMY_TASK_NAME, DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + 0, tableLoader(), UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, @@ -180,9 +159,9 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase { ExpireSnapshots.builder() .append( infra.triggerStream(), - 0, - DUMMY_TASK_NAME, DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + 0, tableLoader(), UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, @@ -204,9 +183,9 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase { .parallelism(1) .append( infra.triggerStream(), - 0, - DUMMY_TASK_NAME, DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + 0, tableLoader(), UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java index 467ad2d8ce..0a860fec47 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java @@ -52,6 +52,19 @@ class TestMaintenanceE2E extends OperatorTestBase { .retainLast(5) .deleteBatchSize(5) .parallelism(8)) + .add( + RewriteDataFiles.builder() + .scheduleOnDataFileCount(10) + .partialProgressEnabled(true) + .partialProgressMaxCommits(10) + .maxRewriteBytes(1000L) + .targetFileSizeBytes(1000L) + .minFileSizeBytes(1000L) + .maxFileSizeBytes(1000L) + .minInputFiles(10) + .deleteFileThreshold(10) + .rewriteAll(false) + .maxFileGroupSizeBytes(1000L)) .append(); JobClient jobClient = null; diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java new file mode 100644 index 0000000000..618647259b --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import static org.apache.iceberg.flink.SimpleDataUtil.createRecord; +import static org.apache.iceberg.flink.maintenance.api.RewriteDataFiles.COMMIT_TASK_NAME; +import static org.apache.iceberg.flink.maintenance.api.RewriteDataFiles.PLANNER_TASK_NAME; +import static org.apache.iceberg.flink.maintenance.api.RewriteDataFiles.REWRITE_TASK_NAME; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.ERROR_COUNTER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.stream.StreamSupport; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +class TestRewriteDataFiles extends MaintenanceTaskTestBase { + @Test + void testRewriteUnpartitioned() throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + insert(table, 3, "c"); + insert(table, 4, "d"); + + assertFileNum(table, 4, 0); + + appendRewriteDataFiles( + RewriteDataFiles.builder() + .parallelism(2) + .deleteFileThreshold(10) + .targetFileSizeBytes(1_000_000L) + .maxFileGroupSizeBytes(10_000_000L) + .maxFileSizeBytes(2_000_000L) + .minFileSizeBytes(500_000L) + .minInputFiles(2) + .partialProgressEnabled(true) + .partialProgressMaxCommits(1) + .maxRewriteBytes(100_000L) + .rewriteAll(false)); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 1, 0); + + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + createRecord(1, "a"), + createRecord(2, "b"), + createRecord(3, "c"), + createRecord(4, "d"))); + } + + @Test + void testRewritePartitioned() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + + assertFileNum(table, 4, 0); + + appendRewriteDataFiles(); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 2, 0); + + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + createRecord(1, "p1"), + createRecord(2, "p1"), + createRecord(3, "p2"), + createRecord(4, "p2"))); + } + + @Test + void testPlannerFailure() throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + + assertFileNum(table, 2, 0); + + appendRewriteDataFiles(); + + runAndWaitForFailure(infra.env(), infra.source(), infra.sink()); + + // Check the metrics. The first task should be successful, but the second one should fail. This + // should be represented in the counters. + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder<List<String>, Long>() + .put( + ImmutableList.of( + PLANNER_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ERROR_COUNTER), + 1L) + .put( + ImmutableList.of( + REWRITE_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ERROR_COUNTER), + 0L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ERROR_COUNTER), + 0L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ADDED_DATA_FILE_NUM_METRIC), + 1L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ADDED_DATA_FILE_SIZE_METRIC), + -1L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + REMOVED_DATA_FILE_NUM_METRIC), + 2L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + REMOVED_DATA_FILE_SIZE_METRIC), + -1L) + .build()); + } + + @Test + void testUidAndSlotSharingGroup() { + createTable(); + + RewriteDataFiles.builder() + .slotSharingGroup(SLOT_SHARING_GROUP) + .uidSuffix(UID_SUFFIX) + .append( + infra.triggerStream(), + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + 0, + tableLoader(), + "OTHER", + "OTHER", + 1) + .sinkTo(infra.sink()); + + checkUidsAreSet(infra.env(), UID_SUFFIX); + checkSlotSharingGroupsAreSet(infra.env(), SLOT_SHARING_GROUP); + } + + @Test + void testUidAndSlotSharingGroupUnset() { + createTable(); + + RewriteDataFiles.builder() + .append( + infra.triggerStream(), + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + 0, + tableLoader(), + UID_SUFFIX, + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + checkUidsAreSet(infra.env(), null); + checkSlotSharingGroupsAreSet(infra.env(), null); + } + + @Test + void testMetrics() throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + + assertFileNum(table, 2, 0); + + appendRewriteDataFiles(); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + // Check the metrics + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder<List<String>, Long>() + .put( + ImmutableList.of( + PLANNER_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ERROR_COUNTER), + 0L) + .put( + ImmutableList.of( + REWRITE_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ERROR_COUNTER), + 0L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ERROR_COUNTER), + 0L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ADDED_DATA_FILE_NUM_METRIC), + 1L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ADDED_DATA_FILE_SIZE_METRIC), + -1L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + REMOVED_DATA_FILE_NUM_METRIC), + 2L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + REMOVED_DATA_FILE_SIZE_METRIC), + -1L) + .build()); + } + + @Test + void testV2Table() throws Exception { + Table table = createTableWithDelete(); + update(table, 1, null, "a", "b"); + update(table, 1, "b", "c"); + + assertFileNum(table, 2, 3); + SimpleDataUtil.assertTableRecords(table, ImmutableList.of(createRecord(1, "c"))); + + appendRewriteDataFiles(); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + // After #11131 we don't remove the delete files + assertFileNum(table, 1, 3); + + SimpleDataUtil.assertTableRecords(table, ImmutableList.of(createRecord(1, "c"))); + + // Check the metrics + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder<List<String>, Long>() + .put( + ImmutableList.of( + PLANNER_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ERROR_COUNTER), + 0L) + .put( + ImmutableList.of( + REWRITE_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ERROR_COUNTER), + 0L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ERROR_COUNTER), + 0L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ADDED_DATA_FILE_NUM_METRIC), + 1L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + ADDED_DATA_FILE_SIZE_METRIC), + -1L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + REMOVED_DATA_FILE_NUM_METRIC), + 2L) + .put( + ImmutableList.of( + COMMIT_TASK_NAME + "[0]", + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + "0", + REMOVED_DATA_FILE_SIZE_METRIC), + -1L) + .build()); + } + + private void appendRewriteDataFiles() { + appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true)); + } + + private void appendRewriteDataFiles(RewriteDataFiles.Builder builder) { + builder + .append( + infra.triggerStream(), + DUMMY_TABLE_NAME, + DUMMY_TASK_NAME, + 0, + tableLoader(), + UID_SUFFIX, + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + } + + private static void assertFileNum( + Table table, int expectedDataFileNum, int expectedDeleteFileNum) { + table.refresh(); + assertThat( + table.currentSnapshot().dataManifests(table.io()).stream() + .flatMap( + m -> + StreamSupport.stream( + ManifestFiles.read(m, table.io(), table.specs()).spliterator(), false)) + .count()) + .isEqualTo(expectedDataFileNum); + assertThat( + table.currentSnapshot().deleteManifests(table.io()).stream() + .flatMap( + m -> + StreamSupport.stream( + ManifestFiles.readDeleteManifest(m, table.io(), table.specs()) + .spliterator(), + false)) + .count()) + .isEqualTo(expectedDeleteFileNum); + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index a577e40b2c..4a4bceffb1 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -33,16 +33,26 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.maintenance.api.Trigger; import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -126,12 +136,85 @@ public class OperatorTestBase { ImmutableMap.of("format-version", "2", "write.upsert.enabled", "true")); } + protected static Table createPartitionedTable() { + return CATALOG_EXTENSION + .catalog() + .createTable( + TestFixtures.TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(), + null, + ImmutableMap.of("flink.max-continuous-empty-commits", "100000")); + } + protected void insert(Table table, Integer id, String data) throws IOException { new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, data))); table.refresh(); } + /** + * For the same identifier column id this methods simulate the following row operations: <tr> + * <li>add an equality delete on oldData + * <li>insert newData </tr> + * + * @param table to modify + * @param id the identifier column id + * @param oldData the old data to be deleted + * @param newData the new data to be inserted + */ + protected void update(Table table, Integer id, String oldData, String newData) + throws IOException { + DataFile dataFile = + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .writeFile(Lists.newArrayList(SimpleDataUtil.createRecord(id, newData))); + DeleteFile eqDelete = writeEqualityDelete(table, id, oldData); + + table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).commit(); + } + + /** + * For the same identifier column id this methods simulate the following row operations: <tr> + * <li>add an equality delete on oldData + * <li>insert tempData + * <li>add a position delete on tempData + * <li>insert newData </tr> + * + * @param table to modify + * @param id the identifier column id + * @param oldData the old data to be deleted + * @param tempData the temp data to be inserted and deleted with a position delete + * @param newData the new data to be inserted + */ + protected void update(Table table, Integer id, String oldData, String tempData, String newData) + throws IOException { + DataFile dataFile = + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .writeFile( + Lists.newArrayList( + SimpleDataUtil.createRecord(id, tempData), + SimpleDataUtil.createRecord(id, newData))); + DeleteFile eqDelete = writeEqualityDelete(table, id, oldData); + DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, tempData); + + table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit(); + } + + protected void insertPartitioned(Table table, Integer id, String data) throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .appendToTable( + TestHelpers.Row.of(data), Lists.newArrayList(SimpleDataUtil.createRecord(id, data))); + table.refresh(); + } + + protected void insertFullPartitioned(Table table, Integer id, String data) throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .appendToTable( + TestHelpers.Row.of(data, id), + Lists.newArrayList(SimpleDataUtil.createRecord(id, data))); + table.refresh(); + } + protected void dropTable() { CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); } @@ -214,6 +297,36 @@ public class OperatorTestBase { return config; } + private DeleteFile writeEqualityDelete(Table table, Integer id, String oldData) + throws IOException { + File file = File.createTempFile("junit", null, warehouseDir.toFile()); + assertThat(file.delete()).isTrue(); + return FileHelpers.writeDeleteFile( + table, + Files.localOutput(file), + new PartitionData(PartitionSpec.unpartitioned().partitionType()), + Lists.newArrayList(SimpleDataUtil.createRecord(id, oldData)), + SCHEMA_WITH_PRIMARY_KEY); + } + + private DeleteFile writePosDelete( + Table table, CharSequence path, Integer pos, Integer id, String oldData) throws IOException { + File file = File.createTempFile("junit", null, warehouseDir.toFile()); + assertThat(file.delete()).isTrue(); + PositionDelete<GenericRecord> posDelete = PositionDelete.create(); + GenericRecord nested = GenericRecord.create(table.schema()); + nested.set(0, id); + nested.set(1, oldData); + posDelete.set(path, pos, nested); + return FileHelpers.writePosDeleteFile( + table, Files.localOutput(file), null, Lists.newArrayList(posDelete)); + } + + static void trigger(OneInputStreamOperatorTestHarness<Trigger, ?> harness) throws Exception { + long time = System.currentTimeMillis(); + harness.processElement(Trigger.create(time, 0), time); + } + private static class MemoryLock implements TriggerLockFactory.Lock { volatile boolean locked = false; diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java new file mode 100644 index 0000000000..6e43009e08 --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Set; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; + +class RewriteUtil { + private RewriteUtil() {} + + static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(TableLoader tableLoader) + throws Exception { + try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewritePlanner( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader, + 11, + 10_000_000L, + ImmutableMap.of(MIN_INPUT_FILES, "2")))) { + testHarness.open(); + + OperatorTestBase.trigger(testHarness); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + return testHarness.extractOutputValues(); + } + } + + static List<DataFileRewriteRunner.ExecutedGroup> executeRewrite( + List<DataFileRewritePlanner.PlannedGroup> elements) throws Exception { + try (OneInputStreamOperatorTestHarness< + DataFileRewritePlanner.PlannedGroup, DataFileRewriteRunner.ExecutedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewriteRunner( + OperatorTestBase.DUMMY_TABLE_NAME, OperatorTestBase.DUMMY_TABLE_NAME, 0))) { + testHarness.open(); + + for (DataFileRewritePlanner.PlannedGroup element : elements) { + testHarness.processElement(element, System.currentTimeMillis()); + } + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + return testHarness.extractOutputValues(); + } + } + + static Set<DataFile> newDataFiles(Table table) { + table.refresh(); + return Sets.newHashSet(table.currentSnapshot().addedDataFiles(table.io())); + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java new file mode 100644 index 0000000000..9e8f2ec921 --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.executeRewrite; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; +import static org.apache.iceberg.metrics.CommitMetricsResult.TOTAL_DATA_FILES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.api.Test; + +class TestDataFileRewriteCommitter extends OperatorTestBase { + @Test + void testUnpartitioned() throws Exception { + Table table = createTable(); + insert(table, 1, "p1"); + insert(table, 2, "p2"); + insert(table, 3, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(1); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(rewritten.get(0), EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processWatermark(EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + } + + assertDataFiles( + table, rewritten.get(0).group().addedFiles(), rewritten.get(0).group().rewrittenFiles(), 1); + } + + @Test + void testPartitioned() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(2); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(2); + assertThat(rewritten.get(0).groupsPerCommit()).isEqualTo(1); + assertThat(rewritten.get(1).groupsPerCommit()).isEqualTo(1); + ensureDifferentGroups(rewritten); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(rewritten.get(0), EVENT_TIME); + assertDataFiles( + table, + rewritten.get(0).group().addedFiles(), + rewritten.get(0).group().rewrittenFiles(), + 3); + + testHarness.processElement(rewritten.get(1), EVENT_TIME); + assertDataFiles( + table, + rewritten.get(1).group().addedFiles(), + rewritten.get(1).group().rewrittenFiles(), + 2); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processWatermark(EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + } + } + + @Test + void testNewTable() throws Exception { + Table table = createTable(); + List<DataFileRewriteRunner.ExecutedGroup> rewritten; + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + insert(table, 1, "p1"); + insert(table, 2, "p2"); + insert(table, 3, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(1); + + testHarness.processElement(rewritten.get(0), EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processWatermark(EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + } + + assertDataFiles( + table, rewritten.get(0).group().addedFiles(), rewritten.get(0).group().rewrittenFiles(), 1); + } + + @Test + void testBatchSize() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + insertPartitioned(table, 5, "p3"); + insertPartitioned(table, 6, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(3); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(3); + ensureDifferentGroups(rewritten); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(setBatchSizeToTwo(rewritten.get(0)), EVENT_TIME); + assertNoChange(table); + testHarness.processElement(setBatchSizeToTwo(rewritten.get(1)), EVENT_TIME); + + Set<DataFile> added = Sets.newHashSet(rewritten.get(0).group().addedFiles()); + added.addAll(rewritten.get(1).group().addedFiles()); + Set<DataFile> removed = Sets.newHashSet(rewritten.get(0).group().rewrittenFiles()); + removed.addAll(rewritten.get(1).group().rewrittenFiles()); + assertDataFiles(table, added, removed, 4); + + testHarness.processElement(setBatchSizeToTwo(rewritten.get(2)), EVENT_TIME); + assertNoChange(table); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processWatermark(EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + } + + // This should be committed on close + assertDataFiles( + table, rewritten.get(2).group().addedFiles(), rewritten.get(2).group().rewrittenFiles(), 3); + } + + @Test + void testError() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + insertPartitioned(table, 5, "p3"); + insertPartitioned(table, 6, "p3"); + insertPartitioned(table, 7, "p4"); + insertPartitioned(table, 8, "p4"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(4); + List<DataFileRewriteRunner.ExecutedGroup> rewritten = executeRewrite(planned); + assertThat(rewritten).hasSize(4); + + try (OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> + testHarness = harness()) { + testHarness.open(); + + testHarness.processElement(setBatchSizeToTwo(rewritten.get(0)), EVENT_TIME); + assertNoChange(table); + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + + DataFileRewriteRunner.ExecutedGroup group = spy(setBatchSizeToTwo(rewritten.get(1))); + when(group.group()).thenThrow(new RuntimeException("Testing error")); + testHarness.processElement(group, EVENT_TIME); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).hasSize(1); + assertThat( + testHarness + .getSideOutput(TaskResultAggregator.ERROR_STREAM) + .poll() + .getValue() + .getMessage()) + .contains("Testing error"); + } + } + + private OneInputStreamOperatorTestHarness<DataFileRewriteRunner.ExecutedGroup, Trigger> harness() + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new DataFileRewriteCommitter( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader())); + } + + private static DataFileRewriteRunner.ExecutedGroup setBatchSizeToTwo( + DataFileRewriteRunner.ExecutedGroup from) { + return new DataFileRewriteRunner.ExecutedGroup(from.snapshotId(), 2, from.group()); + } + + // Ensure that the groups are different, so the tests are not accidentally passing + private static void ensureDifferentGroups(List<DataFileRewriteRunner.ExecutedGroup> rewritten) { + List<String> resultFiles = + rewritten.stream() + .flatMap(task -> task.group().addedFiles().stream().map(ContentFile::location)) + .collect(Collectors.toList()); + assertThat(resultFiles).hasSize(Set.copyOf(resultFiles).size()); + } + + /** + * Assert that the number of the data files in the table is as expected. Additionally, tests that + * the last commit contains the expected added and removed files. + * + * @param table the table to check + * @param expectedAdded the expected added data files + * @param expectedRemoved the expected removed data files + * @param expectedCurrent the expected current data files count + */ + private static void assertDataFiles( + Table table, + Set<DataFile> expectedAdded, + Set<DataFile> expectedRemoved, + long expectedCurrent) { + table.refresh(); + + assertThat(table.currentSnapshot().summary().get(TOTAL_DATA_FILES)) + .isEqualTo(String.valueOf(expectedCurrent)); + Set<DataFile> actualAdded = Sets.newHashSet(table.currentSnapshot().addedDataFiles(table.io())); + Set<DataFile> actualRemoved = + Sets.newHashSet(table.currentSnapshot().removedDataFiles(table.io())); + assertThat(actualAdded.stream().map(DataFile::location).collect(Collectors.toSet())) + .isEqualTo(expectedAdded.stream().map(DataFile::location).collect(Collectors.toSet())); + assertThat(actualRemoved.stream().map(DataFile::location).collect(Collectors.toSet())) + .isEqualTo(expectedRemoved.stream().map(DataFile::location).collect(Collectors.toSet())); + } + + private static void assertNoChange(Table table) { + long original = table.currentSnapshot().snapshotId(); + table.refresh(); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(original); + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java new file mode 100644 index 0000000000..1d7e29813a --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.api.Test; + +class TestDataFileRewritePlanner extends OperatorTestBase { + @Test + void testUnpartitioned() throws Exception { + Set<DataFile> expected = Sets.newHashSetWithExpectedSize(3); + Table table = createTable(); + insert(table, 1, "a"); + expected.addAll(newDataFiles(table)); + insert(table, 2, "b"); + expected.addAll(newDataFiles(table)); + insert(table, 3, "c"); + expected.addAll(newDataFiles(table)); + + List<DataFileRewritePlanner.PlannedGroup> actual = planDataFileRewrite(tableLoader()); + + assertThat(actual).hasSize(1); + assertRewriteFileGroup(actual.get(0), table, expected); + } + + @Test + void testPartitioned() throws Exception { + Set<DataFile> expectedP1 = Sets.newHashSetWithExpectedSize(2); + Set<DataFile> expectedP2 = Sets.newHashSetWithExpectedSize(2); + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + expectedP1.addAll(newDataFiles(table)); + insertPartitioned(table, 2, "p1"); + expectedP1.addAll(newDataFiles(table)); + + insertPartitioned(table, 3, "p2"); + expectedP2.addAll(newDataFiles(table)); + insertPartitioned(table, 4, "p2"); + expectedP2.addAll(newDataFiles(table)); + + // This should not participate in compaction, as there is no more files in the partition + insertPartitioned(table, 5, "p3"); + + List<DataFileRewritePlanner.PlannedGroup> actual = planDataFileRewrite(tableLoader()); + + assertThat(actual).hasSize(2); + if (actual.get(0).group().info().partition().get(0, String.class).equals("p1")) { + assertRewriteFileGroup(actual.get(0), table, expectedP1); + assertRewriteFileGroup(actual.get(1), table, expectedP2); + } else { + assertRewriteFileGroup(actual.get(0), table, expectedP2); + assertRewriteFileGroup(actual.get(1), table, expectedP1); + } + } + + @Test + void testError() throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + + try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewritePlanner( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader(), + 11, + 1L, + ImmutableMap.of(MIN_INPUT_FILES, "2")))) { + testHarness.open(); + + // Cause an exception + dropTable(); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + trigger(testHarness); + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).hasSize(1); + assertThat( + testHarness + .getSideOutput(TaskResultAggregator.ERROR_STREAM) + .poll() + .getValue() + .getMessage()) + .contains("Table does not exist: "); + } + } + + @Test + void testV2Table() throws Exception { + Table table = createTableWithDelete(); + update(table, 1, null, "a", "b"); + update(table, 1, "b", "c"); + + List<DataFileRewritePlanner.PlannedGroup> actual = planDataFileRewrite(tableLoader()); + + assertThat(actual).hasSize(1); + List<FileScanTask> tasks = actual.get(0).group().fileScanTasks(); + assertThat(tasks).hasSize(2); + // Find the task with the deletes + FileScanTask withDelete = tasks.get(0).deletes().isEmpty() ? tasks.get(1) : tasks.get(0); + assertThat(withDelete.deletes()).hasSize(2); + assertThat(withDelete.deletes().stream().map(ContentFile::content).collect(Collectors.toList())) + .containsExactlyInAnyOrder(FileContent.POSITION_DELETES, FileContent.EQUALITY_DELETES); + } + + @Test + void testMaxRewriteBytes() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + + // First run with high maxRewriteBytes + List<DataFileRewritePlanner.PlannedGroup> planWithNoMaxRewriteBytes = + planDataFileRewrite(tableLoader()); + assertThat(planWithNoMaxRewriteBytes).hasSize(2); + + // Second run with low maxRewriteBytes, the 2nd group should be removed from the plan + long maxRewriteBytes = + planWithNoMaxRewriteBytes.get(0).group().fileScanTasks().get(0).sizeBytes() + + planWithNoMaxRewriteBytes.get(1).group().fileScanTasks().get(0).sizeBytes() + + 1; + try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewritePlanner( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader(), + 11, + maxRewriteBytes, + ImmutableMap.of(MIN_INPUT_FILES, "2")))) { + testHarness.open(); + + OperatorTestBase.trigger(testHarness); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + // Only a single group is planned + assertThat(testHarness.extractOutputValues()).hasSize(1); + } + } + + void assertRewriteFileGroup( + DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, Set<DataFile> files) { + assertThat(plannedGroup.table().currentSnapshot().snapshotId()) + .isEqualTo(table.currentSnapshot().snapshotId()); + assertThat(plannedGroup.groupsPerCommit()).isEqualTo(1); + assertThat( + plannedGroup.group().fileScanTasks().stream() + .map(s -> s.file().location()) + .collect(Collectors.toSet())) + .containsExactlyInAnyOrderElementsOf( + files.stream().map(ContentFile::location).collect(Collectors.toList())); + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java new file mode 100644 index 0000000000..6dc8fb3c02 --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.actions.RewriteDataFiles.TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.executeRewrite; +import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class TestDataFileRewriteRunner extends OperatorTestBase { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testExecute(boolean partitioned) throws Exception { + Table table; + PartitionData partition; + if (partitioned) { + table = createPartitionedTable(); + partition = new PartitionData(table.spec().partitionType()); + partition.set(0, "p1"); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p1"); + } else { + table = createTable(); + partition = new PartitionData(PartitionSpec.unpartitioned().partitionType()); + insert(table, 1, "p1"); + insert(table, 2, "p1"); + insert(table, 3, "p1"); + } + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned); + assertThat(actual).hasSize(1); + + assertRewriteFileGroup( + actual.get(0), + table, + records( + table.schema(), + ImmutableSet.of( + ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"), ImmutableList.of(3, "p1"))), + 1, + ImmutableSet.of(partition)); + } + + @Test + void testPartitionSpecChange() throws Exception { + Table table = createPartitionedTable(); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + PartitionData oldPartition = new PartitionData(table.spec().partitionType()); + oldPartition.set(0, "p1"); + + try (OneInputStreamOperatorTestHarness< + DataFileRewritePlanner.PlannedGroup, DataFileRewriteRunner.ExecutedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewriteRunner( + OperatorTestBase.DUMMY_TABLE_NAME, OperatorTestBase.DUMMY_TABLE_NAME, 0))) { + testHarness.open(); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + + testHarness.processElement(planned.get(0), System.currentTimeMillis()); + List<DataFileRewriteRunner.ExecutedGroup> actual = testHarness.extractOutputValues(); + assertThat(actual).hasSize(1); + assertRewriteFileGroup( + actual.get(0), + table, + records( + table.schema(), + ImmutableSet.of(ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"))), + 1, + ImmutableSet.of(oldPartition)); + + insertPartitioned(table, 3, "p1"); + + planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + + testHarness.processElement(planned.get(0), System.currentTimeMillis()); + actual = testHarness.extractOutputValues(); + assertThat(actual).hasSize(2); + assertRewriteFileGroup( + actual.get(1), + table, + records( + table.schema(), + ImmutableSet.of( + ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"), ImmutableList.of(3, "p1"))), + 1, + ImmutableSet.of(oldPartition)); + + // Alter the table schema + table.updateSpec().addField("id").commit(); + // Insert some now data + insertFullPartitioned(table, 4, "p1"); + insertFullPartitioned(table, 4, "p1"); + PartitionData newPartition = new PartitionData(table.spec().partitionType()); + newPartition.set(0, "p1"); + newPartition.set(1, 4); + table.refresh(); + + planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(2); + DataFileRewritePlanner.PlannedGroup oldCompact = planned.get(0); + DataFileRewritePlanner.PlannedGroup newCompact = planned.get(1); + if (oldCompact.group().inputFileNum() == 2) { + newCompact = planned.get(0); + oldCompact = planned.get(1); + } + + testHarness.processElement(newCompact, System.currentTimeMillis()); + actual = testHarness.extractOutputValues(); + assertThat(actual).hasSize(3); + assertRewriteFileGroup( + actual.get(2), + table, + records( + table.schema(), + ImmutableList.of(ImmutableList.of(4, "p1"), ImmutableList.of(4, "p1"))), + 1, + ImmutableSet.of(newPartition)); + + testHarness.processElement(oldCompact, System.currentTimeMillis()); + actual = testHarness.extractOutputValues(); + assertThat(actual).hasSize(4); + PartitionData[] transformedPartitions = { + newPartition.copy(), newPartition.copy(), newPartition.copy() + }; + transformedPartitions[0].set(1, 1); + transformedPartitions[1].set(1, 2); + transformedPartitions[2].set(1, 3); + assertRewriteFileGroup( + actual.get(3), + table, + records( + table.schema(), + ImmutableSet.of( + ImmutableList.of(1, "p1"), ImmutableList.of(2, "p1"), ImmutableList.of(3, "p1"))), + 3, + Sets.newHashSet(transformedPartitions)); + } + } + + @Test + void testError() throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + + try (OneInputStreamOperatorTestHarness< + DataFileRewritePlanner.PlannedGroup, DataFileRewriteRunner.ExecutedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewriteRunner( + OperatorTestBase.DUMMY_TABLE_NAME, OperatorTestBase.DUMMY_TABLE_NAME, 0))) { + testHarness.open(); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + // Cause an exception + dropTable(); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + testHarness.processElement(planned.get(0), System.currentTimeMillis()); + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).hasSize(1); + assertThat( + testHarness + .getSideOutput(TaskResultAggregator.ERROR_STREAM) + .poll() + .getValue() + .getMessage()) + .contains("File does not exist: "); + } + } + + @Test + void testV2Table() throws Exception { + Table table = createTableWithDelete(); + update(table, 1, null, "a", "b"); + update(table, 1, "b", "c"); + + List<DataFileRewritePlanner.PlannedGroup> planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + + List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned); + assertThat(actual).hasSize(1); + + assertRewriteFileGroup( + actual.get(0), + table, + records(table.schema(), ImmutableSet.of(ImmutableList.of(1, "c"))), + 1, + ImmutableSet.of(new PartitionData(PartitionSpec.unpartitioned().partitionType()))); + } + + @Test + void testSplitSize() throws Exception { + Table table = createTable(); + + File dataDir = new File(new Path(table.location(), "data").toUri().getPath()); + dataDir.mkdir(); + GenericAppenderHelper dataAppender = + new GenericAppenderHelper(table, FileFormat.PARQUET, dataDir.toPath()); + List<Record> expected = Lists.newArrayListWithExpectedSize(4000); + for (int i = 0; i < 4; ++i) { + List<Record> batch = RandomGenericData.generate(table.schema(), 1000, 10 + i); + dataAppender.appendToTable(batch); + expected.addAll(batch); + } + + // First run with high target file size + List<DataFileRewritePlanner.PlannedGroup> planWithNoTargetFileSize = + planDataFileRewrite(tableLoader()); + assertThat(planWithNoTargetFileSize).hasSize(1); + + // Second run with low target file size + long targetFileSize = + planWithNoTargetFileSize.get(0).group().fileScanTasks().get(0).sizeBytes() + + planWithNoTargetFileSize.get(0).group().fileScanTasks().get(1).sizeBytes(); + List<DataFileRewritePlanner.PlannedGroup> planned; + try (OneInputStreamOperatorTestHarness<Trigger, DataFileRewritePlanner.PlannedGroup> + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewritePlanner( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader(), + 11, + 10_000_000, + ImmutableMap.of( + MIN_INPUT_FILES, + "2", + TARGET_FILE_SIZE_BYTES, + String.valueOf(targetFileSize))))) { + testHarness.open(); + + OperatorTestBase.trigger(testHarness); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + planned = testHarness.extractOutputValues(); + assertThat(planned).hasSize(1); + } + + List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned); + assertThat(actual).hasSize(1); + + assertRewriteFileGroup( + actual.get(0), + table, + expected, + 2, + ImmutableSet.of(new PartitionData(PartitionSpec.unpartitioned().partitionType()))); + } + + void assertRewriteFileGroup( + DataFileRewriteRunner.ExecutedGroup actual, + Table table, + Collection<Record> expectedRecords, + int expectedFileNum, + Set<StructLike> expectedPartitions) + throws IOException { + assertThat(actual.snapshotId()).isEqualTo(table.currentSnapshot().snapshotId()); + assertThat(actual.groupsPerCommit()).isEqualTo(1); + assertThat(actual.group().addedFiles()).hasSize(expectedFileNum); + Collection<Record> writtenRecords = Lists.newArrayListWithExpectedSize(expectedRecords.size()); + Set<StructLike> writtenPartitions = Sets.newHashSetWithExpectedSize(expectedPartitions.size()); + for (DataFile newDataFile : actual.group().addedFiles()) { + assertThat(newDataFile.format()).isEqualTo(FileFormat.PARQUET); + assertThat(newDataFile.content()).isEqualTo(FileContent.DATA); + assertThat(newDataFile.keyMetadata()).isNull(); + writtenPartitions.add(newDataFile.partition()); + + try (CloseableIterable<Record> reader = + Parquet.read(table.io().newInputFile(newDataFile.location())) + .project(table.schema()) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(table.schema(), fileSchema)) + .build()) { + List<Record> newRecords = Lists.newArrayList(reader); + assertThat(newRecords).hasSize((int) newDataFile.recordCount()); + writtenRecords.addAll(newRecords); + } + } + + assertThat(writtenRecords).containsExactlyInAnyOrderElementsOf(expectedRecords); + assertThat(writtenPartitions).isEqualTo(expectedPartitions); + } + + private List<Record> records(Schema schema, Collection<List<Object>> data) { + GenericRecord record = GenericRecord.create(schema); + + ImmutableList.Builder<Record> builder = ImmutableList.builder(); + data.forEach( + recordData -> + builder.add( + record.copy(ImmutableMap.of("id", recordData.get(0), "data", recordData.get(1))))); + + return builder.build(); + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java index d70c4aafd5..7511e1029b 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java @@ -90,8 +90,7 @@ class TestDeleteFilesProcessor extends OperatorTestBase { tableLoader().open(); try (OneInputStreamOperatorTestHarness<String, Void> testHarness = new OneInputStreamOperatorTestHarness<>( - new DeleteFilesProcessor(0, DUMMY_TASK_NAME, tableLoader.loadTable(), 10), - StringSerializer.INSTANCE)) { + new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 10), StringSerializer.INSTANCE)) { testHarness.open(); testHarness.processElement(fileName, System.currentTimeMillis()); testHarness.processWatermark(EVENT_TIME);