openinx commented on a change in pull request #1624: URL: https://github.com/apache/iceberg/pull/1624#discussion_r509912885
########## File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class RewriteDataFilesActionBase<ThisT> Review comment: nit: we usually name it as `BaseRewriteDataFilesAction` if it's an abstract class. btw, seems we don't have to break `extends ..` into a new line because it does not exceed the max length . ########## File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class RewriteDataFilesActionBase<ThisT> + extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class); + + private final Table table; + private final FileIO fileIO; + private PartitionSpec spec; + private boolean caseSensitive; + private Expression filter; + private long targetSizeInBytes; + private int splitLookback; + private long splitOpenFileCost; + private long rewriteScanLimit; + + public RewriteDataFilesActionBase(Table table) { + this.table = table; + this.fileIO = table.io(); + this.spec = table.spec(); + this.filter = Expressions.alwaysTrue(); + long splitSize = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.SPLIT_SIZE, + TableProperties.SPLIT_SIZE_DEFAULT); + long targetFileSize = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + this.targetSizeInBytes = Math.min(splitSize, targetFileSize); + this.splitLookback = PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.SPLIT_LOOKBACK, + TableProperties.SPLIT_LOOKBACK_DEFAULT); + this.splitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.SPLIT_OPEN_FILE_COST, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + this.rewriteScanLimit = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.REWRITE_SCAN_LIMIT, + TableProperties.REWRITE_SCAN_LIMIT_DEFAULT); + } + + protected void setCaseSensitive(boolean caseSensitive) { + this.caseSensitive = caseSensitive; + } + + protected boolean isCaseSensitive() { Review comment: nit: rename the `isCaseSensitive` to `caseSensitive` pls. for example, we `ManifestGroup` has the method : ```java ManifestGroup caseSensitive(boolean newCaseSensitive) { this.caseSensitive = newCaseSensitive; deleteIndexBuilder.caseSensitive(newCaseSensitive); return this; } ``` ########## File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java ########## @@ -111,171 +49,12 @@ protected RewriteDataFilesAction self() { } @Override - protected Table table() { - return table; - } - - /** - * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite - * - * @param specId PartitionSpec id to rewrite - * @return this for method chaining - */ - public RewriteDataFilesAction outputSpecId(int specId) { - Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId); - this.spec = table.specs().get(specId); - return this; - } - - /** - * Specify the target rewrite data file size in bytes - * - * @param targetSize size in bytes of rewrite data file - * @return this for method chaining - */ - public RewriteDataFilesAction targetSizeInBytes(long targetSize) { - Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d", - targetSize); - this.targetSizeInBytes = targetSize; - return this; - } - - /** - * Specify the number of "bins" considered when trying to pack the next file split into a task. - * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single - * task with extra planning cost. - * <p> - * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file - * metadata, user can use a lookback of 1. - * - * @param lookback number of "bins" considered when trying to pack the next file split into a task. - * @return this for method chaining - */ - public RewriteDataFilesAction splitLookback(int lookback) { - Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback); - this.splitLookback = lookback; - return this; - } - - /** - * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified - * threshold, Iceberg will use this value to do count. - * <p> - * this configuration controls the number of files to compact for each task, small value would lead to a - * high compaction, the default value is 4MB. - * - * @param openFileCost minimum file size to count to pack into one "bin". - * @return this for method chaining - */ - public RewriteDataFilesAction splitOpenFileCost(long openFileCost) { - Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost); - this.splitOpenFileCost = openFileCost; - return this; - } - - /** - * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the - * filter may be rewritten. - * - * @param expr Expression to filter out DataFiles - * @return this for method chaining - */ - public RewriteDataFilesAction filter(Expression expr) { - this.filter = Expressions.and(filter, expr); - return this; - } - - @Override - public RewriteDataFilesActionResult execute() { - CloseableIterable<FileScanTask> fileScanTasks = null; - try { - fileScanTasks = table.newScan() - .caseSensitive(caseSensitive) - .ignoreResiduals() - .filter(filter) - .planFiles(); - } finally { - try { - if (fileScanTasks != null) { - fileScanTasks.close(); - } - } catch (IOException ioe) { - LOG.warn("Failed to close task iterable", ioe); - } - } - - Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator()); - Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream() - .filter(kv -> kv.getValue().size() > 1) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - // Nothing to rewrite if there's only one DataFile in each partition. - if (filteredGroupedTasks.isEmpty()) { - return RewriteDataFilesActionResult.empty(); - } - - // Split and combine tasks under each partition - List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream() - .map(scanTasks -> { - CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles( - CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes); - return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost); - }) - .flatMap(Streams::stream) - .collect(Collectors.toList()); - + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size()); - - Broadcast<FileIO> io = sparkContext.broadcast(fileIO); - Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager); - - RowDataRewriter rowDataRewriter = new RowDataRewriter(table, spec, caseSensitive, io, encryption); - - List<DataFile> addedDataFiles = rowDataRewriter.rewriteDataForTasks(taskRDD); - List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream() - .flatMap(tasks -> tasks.stream().map(FileScanTask::file)) - .collect(Collectors.toList()); - replaceDataFiles(currentDataFiles, addedDataFiles); - - return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles); - } - - private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition( - CloseableIterator<FileScanTask> tasksIter) { - ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap( - Maps.newHashMap(), Lists::newArrayList); - - try { - tasksIter.forEachRemaining(task -> { - StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition()); - tasksGroupedByPartition.put(structLike, task); - }); - - } finally { - try { - tasksIter.close(); - } catch (IOException ioe) { - LOG.warn("Failed to close task iterator", ioe); - } - } - - return tasksGroupedByPartition.asMap(); - } - - private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) { - try { - RewriteFiles rewriteFiles = table.newRewrite(); - rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles)); - commit(rewriteFiles); - - } catch (Exception e) { - Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString())) - .noRetry() - .suppressFailureWhenFinished() - .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc)) - .run(fileIO::deleteFile); - - throw e; - } + Broadcast<FileIO> io = sparkContext.broadcast(SparkUtil.serializableFileIO(this.table())); Review comment: nit: we usually use `this` to assign value to a local field, so that we could distinguish it's a local field member assignment or normal field assignment. If call the private or protected methods, we don't use `this`. ########## File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class RewriteDataFilesActionBase<ThisT> + extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class); + + private final Table table; + private final FileIO fileIO; + private PartitionSpec spec; + private boolean caseSensitive; + private Expression filter; + private long targetSizeInBytes; + private int splitLookback; + private long splitOpenFileCost; + private long rewriteScanLimit; + + public RewriteDataFilesActionBase(Table table) { + this.table = table; + this.fileIO = table.io(); + this.spec = table.spec(); + this.filter = Expressions.alwaysTrue(); + long splitSize = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.SPLIT_SIZE, + TableProperties.SPLIT_SIZE_DEFAULT); + long targetFileSize = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + this.targetSizeInBytes = Math.min(splitSize, targetFileSize); + this.splitLookback = PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.SPLIT_LOOKBACK, + TableProperties.SPLIT_LOOKBACK_DEFAULT); + this.splitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.SPLIT_OPEN_FILE_COST, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + this.rewriteScanLimit = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.REWRITE_SCAN_LIMIT, + TableProperties.REWRITE_SCAN_LIMIT_DEFAULT); + } + + protected void setCaseSensitive(boolean caseSensitive) { + this.caseSensitive = caseSensitive; + } + + protected boolean isCaseSensitive() { + return caseSensitive; + } + + /** + * Set the size of the scanned file. If the file size is greater than this value, it will not be scanned and will not + * be compressed. + * + * @param limitSize the limit size of the scanned file ,default is 100M + * @return this for method chaining + */ + public RewriteDataFilesActionBase<ThisT> rewriteScanLimit(long limitSize) { + Preconditions.checkArgument(limitSize > 0L, "Invalid rewriteScanLimit size ."); + this.rewriteScanLimit = limitSize; + return this; + } + + /** + * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the + * filter may be rewritten. + * + * @param expr Expression to filter out DataFiles + * @return this for method chaining + */ + + public RewriteDataFilesActionBase<ThisT> filter(Expression expr) { + this.filter = Expressions.and(filter, expr); + return this; + } + + /** + * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite + * + * @param specId PartitionSpec id to rewrite + * @return this for method chaining + */ + public RewriteDataFilesActionBase<ThisT> outputSpecId(int specId) { + Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId); + this.spec = table.specs().get(specId); + return this; + } + + /** + * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this + * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra + * planning cost. + * <p> + * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file + * metadata, user can use a lookback of 1. + * + * @param lookback number of "bins" considered when trying to pack the next file split into a task. + * @return this for method chaining + */ + public RewriteDataFilesActionBase<ThisT> splitLookback(int lookback) { + Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback); + this.splitLookback = lookback; + return this; + } + + /** + * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified + * threshold, Iceberg will use this value to do count. + * <p> + * this configuration controls the number of files to compact for each task, small value would lead to a high + * compaction, the default value is 4MB. + * + * @param openFileCost minimum file size to count to pack into one "bin". + * @return this for method chaining + */ + public RewriteDataFilesActionBase<ThisT> splitOpenFileCost(long openFileCost) { + Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost); + this.splitOpenFileCost = openFileCost; + return this; + } + + + /** + * Specify the target rewrite data file size in bytes + * + * @param targetSize size in bytes of rewrite data file + * @return this for method chaining + */ + public RewriteDataFilesActionBase<ThisT> targetSizeInBytes(long targetSize) { + Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d", + targetSize); + this.targetSizeInBytes = targetSize; + return this; + } + + + private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) { + try { + RewriteFiles rewriteFiles = table.newRewrite(); + rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles)); + commit(rewriteFiles); + } catch (Exception e) { + Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString())) + .noRetry() + .suppressFailureWhenFinished() + .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc)) + .run(fileIO::deleteFile); + throw e; + } + } + + private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition( + CloseableIterator<FileScanTask> tasksIter) { + ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap( + Maps.newHashMap(), Lists::newArrayList); + try (CloseableIterator<FileScanTask> iterator = tasksIter) { + iterator.forEachRemaining(task -> { + if (task.length() < rewriteScanLimit) { + StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition()); + tasksGroupedByPartition.put(structLike, task); + } + }); + } catch (IOException e) { + LOG.warn("Failed to close task iterator", e); + } + return tasksGroupedByPartition.asMap(); + } + + private List<CombinedScanTask> getCombinedScanTasks( + Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks) { + // Split and combine tasks under each partition + List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream() + .map(scanTasks -> { + CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes); + return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost); + }) + .flatMap(Streams::stream) + .collect(Collectors.toList()); + return combinedScanTasks; + } + + @Override + public RewriteDataFilesActionResult execute() { + CloseableIterable<FileScanTask> fileScanTasks = null; + try { + fileScanTasks = table.newScan() + .caseSensitive(caseSensitive) + .ignoreResiduals() + .filter(filter) + .planFiles(); + } finally { + try { + if (fileScanTasks != null) { + fileScanTasks.close(); + } + } catch (IOException ioe) { + LOG.warn("Failed to close task iterable", ioe); + } + } + + Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator()); + Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream() + .filter(kv -> kv.getValue().size() > 1) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // Nothing to rewrite if there's only one DataFile in each partition. + if (filteredGroupedTasks.isEmpty()) { + return RewriteDataFilesActionResult.empty(); + } + // Split and combine tasks under each partition + List<CombinedScanTask> combinedScanTasks = getCombinedScanTasks(filteredGroupedTasks); Review comment: Is it necessary to move the split & combine parts into a separate method ? I think the original codes is more clear and we don't have to change it, it break the steps into several code blocks and each block represent one step. Pls keep the code as it is if we don't have a strong reason to change it. ########## File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class RewriteDataFilesActionBase<ThisT> + extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class); + + private final Table table; + private final FileIO fileIO; + private PartitionSpec spec; + private boolean caseSensitive; + private Expression filter; + private long targetSizeInBytes; + private int splitLookback; + private long splitOpenFileCost; + private long rewriteScanLimit; + + public RewriteDataFilesActionBase(Table table) { + this.table = table; + this.fileIO = table.io(); + this.spec = table.spec(); + this.filter = Expressions.alwaysTrue(); + long splitSize = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.SPLIT_SIZE, + TableProperties.SPLIT_SIZE_DEFAULT); + long targetFileSize = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + this.targetSizeInBytes = Math.min(splitSize, targetFileSize); + this.splitLookback = PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.SPLIT_LOOKBACK, + TableProperties.SPLIT_LOOKBACK_DEFAULT); + this.splitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.SPLIT_OPEN_FILE_COST, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + this.rewriteScanLimit = PropertyUtil.propertyAsLong( Review comment: This is a refactor PR while introducing the `rewriteScanLimit` seems to be a new feature, we'd better not mix the refactor & new feature development in a single PR. It's helpful for reviewing and providing full unit tests if we really need the `rewriteScanLimit`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org