This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new a50ec923f3 Core: Interface changes for separating rewrite planner and
runner (#12306)
a50ec923f3 is described below
commit a50ec923f3d928f67e2a4a361c0d1162341aa084
Author: pvary <[email protected]>
AuthorDate: Thu Feb 27 02:22:56 2025 +0100
Core: Interface changes for separating rewrite planner and runner (#12306)
---
.../apache/iceberg/actions/FileRewritePlan.java | 71 ++++++++++++++++
.../apache/iceberg/actions/FileRewritePlanner.java | 88 ++++++++++++++++++++
.../apache/iceberg/actions/FileRewriteRunner.java | 76 +++++++++++++++++
.../apache/iceberg/actions/RewriteFileGroup.java | 70 +++++++++++-----
.../apache/iceberg/actions/RewriteGroupBase.java | 96 ++++++++++++++++++++++
.../actions/RewritePositionDeletesGroup.java | 65 +++++++++------
6 files changed, 421 insertions(+), 45 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/actions/FileRewritePlan.java
b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlan.java
new file mode 100644
index 0000000000..3ad112de77
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlan.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.io.CloseableIterable;
+
+/**
+ * Container class holding the output of a {@link FileRewritePlanner#plan()}
call. Contains a list
+ * of groups (See: {@link RewriteFileGroup} and {@link
RewritePositionDeletesGroup}) which are used
+ * by engines to rewrite a particular set of files.
+ *
+ * @param <I> the Java type of the plan info like {@link
RewriteDataFiles.FileGroupInfo} or {@link
+ * RewritePositionDeleteFiles.FileGroupInfo}
+ * @param <T> the Java type of the input scan tasks (input)
+ * @param <F> the Java type of the content files (input and output)
+ * @param <G> the Java type of the rewrite file group like {@link
RewriteFileGroup} or {@link
+ * RewritePositionDeletesGroup}
+ */
+public class FileRewritePlan<
+ I,
+ T extends ContentScanTask<F>,
+ F extends ContentFile<F>,
+ G extends RewriteGroupBase<I, T, F>> {
+ private final CloseableIterable<G> groups;
+ private final int totalGroupCount;
+ private final Map<StructLike, Integer> groupsInPartition;
+
+ FileRewritePlan(
+ CloseableIterable<G> groups,
+ int totalGroupCount,
+ Map<StructLike, Integer> groupsInPartition) {
+ this.groups = groups;
+ this.totalGroupCount = totalGroupCount;
+ this.groupsInPartition = groupsInPartition;
+ }
+
+ /** The stream of the generated {@link RewriteGroupBase}s. */
+ public CloseableIterable<G> groups() {
+ return groups;
+ }
+
+ /** The number of the generated groups in the given partition. */
+ public int groupsInPartition(StructLike partition) {
+ return groupsInPartition.get(partition);
+ }
+
+ /** The total number of the groups generated by this plan. */
+ public int totalGroupCount() {
+ return totalGroupCount;
+ }
+}
diff --git
a/core/src/main/java/org/apache/iceberg/actions/FileRewritePlanner.java
b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlanner.java
new file mode 100644
index 0000000000..2e47f4dd91
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlanner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+
+/**
+ * A class for planning a rewrite operation.
+ *
+ * <p>A Rewrite Operation is performed by several classes working in
conjunction.
+ *
+ * <ul>
+ * <li>{@link FileRewritePlanner} - (this class) which scans the files in
the table and determines
+ * which files should be rewritten and how they should be grouped. The
grouping is based on
+ * partitioning and the planning could create multiple groups within a
partition. This
+ * produces a {@link FileRewritePlan}.
+ * <li>{@link FileRewritePlan} - This immutable output of the planner
contains a set of groups
+ * like {@link RewriteFileGroup} or {@link RewritePositionDeletesGroup},
each containing a set
+ * of files that are meant to be compacted together by the {@link
FileRewriteRunner}.
+ * <li>{@link FileRewriteRunner} - The runner is the engine specific
implementation that can take
+ * a {@link FileRewritePlan} and perform a rewrite on each of the groups
within. This is the
+ * actual implementation of the rewrite which generates new files.
+ * </ul>
+ *
+ * <p>The lifecycle for the planner looks like the following:
+ *
+ * <ul>
+ * <li>{@link #init(Map)} initializes the planner with the configuration
parameters
+ * <li>{@link #plan()} called for generating the {@link FileRewritePlan} for
the given parameters.
+ * </ul>
+ *
+ * @param <I> the Java type of the plan info like {@link
RewriteDataFiles.FileGroupInfo} or {@link
+ * RewritePositionDeleteFiles.FileGroupInfo}
+ * @param <T> the Java type of the input scan tasks (input)
+ * @param <F> the Java type of the content files (input and output)
+ * @param <G> the Java type of the rewrite file group like {@link
RewriteFileGroup} or {@link
+ * RewritePositionDeletesGroup}
+ */
+public interface FileRewritePlanner<
+ I,
+ T extends ContentScanTask<F>,
+ F extends ContentFile<F>,
+ G extends RewriteGroupBase<I, T, F>> {
+
+ /** Returns a description for this planner. */
+ default String description() {
+ return getClass().getName();
+ }
+
+ /**
+ * Returns a set of supported options for this planner. Only options
specified in this list will
+ * be accepted at runtime. Any other options will be rejected.
+ */
+ Set<String> validOptions();
+
+ /**
+ * Initializes this planner using provided options.
+ *
+ * @param options options to initialize this planner
+ */
+ void init(Map<String, String> options);
+
+ /**
+ * Generates the plan for rewrite.
+ *
+ * @return the generated plan which could be executed during the compaction
+ */
+ FileRewritePlan<I, T, F, G> plan();
+}
diff --git
a/core/src/main/java/org/apache/iceberg/actions/FileRewriteRunner.java
b/core/src/main/java/org/apache/iceberg/actions/FileRewriteRunner.java
new file mode 100644
index 0000000000..4c7384861c
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewriteRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+
+/**
+ * A class for rewriting content file groups ({@link RewriteGroupBase}). The
lifecycle for the
+ * runner looks like the following:
+ *
+ * <ul>
+ * <li>{@link #init(Map)} initializes the runner with the configuration
parameters
+ * <li>{@link #rewrite(RewriteGroupBase)} called for every group in the plan
to do the actual
+ * rewrite of the files, and returns the generated new files.
+ * </ul>
+ *
+ * @param <I> the Java type of the plan info like {@link
RewriteDataFiles.FileGroupInfo} or {@link
+ * RewritePositionDeleteFiles.FileGroupInfo}
+ * @param <T> the Java type of the input scan tasks (input)
+ * @param <F> the Java type of the content files (input and output)
+ * @param <G> the Java type of the rewrite file group like {@link
RewriteFileGroup} or {@link
+ * RewritePositionDeletesGroup}
+ */
+public interface FileRewriteRunner<
+ I,
+ T extends ContentScanTask<F>,
+ F extends ContentFile<F>,
+ G extends RewriteGroupBase<I, T, F>> {
+
+ /** Returns a description for this runner. */
+ default String description() {
+ return getClass().getName();
+ }
+
+ /**
+ * Returns a set of supported options for this runner. Only options
specified in this list will be
+ * accepted at runtime. Any other options will be rejected.
+ */
+ Set<String> validOptions();
+
+ /**
+ * Initializes this runner using provided options.
+ *
+ * @param options options to initialize this runner
+ */
+ void init(Map<String, String> options);
+
+ /**
+ * Rewrite a group of files represented by the given list of scan tasks.
+ *
+ * <p>The implementation is supposed to be engine-specific (e.g. Spark,
Flink, Trino).
+ *
+ * @param group of scan tasks for files to be rewritten together
+ * @return a set of newly written files
+ */
+ Set<F> rewrite(G group);
+}
diff --git
a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
index dfc9842780..c31ff327b6 100644
--- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
@@ -34,23 +34,32 @@ import org.apache.iceberg.util.DataFileSet;
* Container class representing a set of files to be rewritten by a
RewriteAction and the new files
* which have been written by the action.
*/
-public class RewriteFileGroup {
- private final FileGroupInfo info;
- private final List<FileScanTask> fileScanTasks;
-
+public class RewriteFileGroup extends RewriteGroupBase<FileGroupInfo,
FileScanTask, DataFile> {
+ private final int outputSpecId;
private DataFileSet addedFiles = DataFileSet.create();
+ @Deprecated
public RewriteFileGroup(FileGroupInfo info, List<FileScanTask>
fileScanTasks) {
- this.info = info;
- this.fileScanTasks = fileScanTasks;
+ this(info, fileScanTasks, 0, 0L, 0L, 0);
}
- public FileGroupInfo info() {
- return info;
+ public RewriteFileGroup(
+ FileGroupInfo info,
+ List<FileScanTask> fileScanTasks,
+ int outputSpecId,
+ long writeMaxFileSize,
+ long splitSize,
+ int expectedOutputFiles) {
+ super(info, fileScanTasks, writeMaxFileSize, splitSize,
expectedOutputFiles);
+ this.outputSpecId = outputSpecId;
}
+ /**
+ * @deprecated use {@link #fileScanTasks()}
+ */
+ @Deprecated
public List<FileScanTask> fileScans() {
- return fileScanTasks;
+ return fileScanTasks();
}
public void setOutputFiles(Set<DataFile> files) {
@@ -58,7 +67,7 @@ public class RewriteFileGroup {
}
public Set<DataFile> rewrittenFiles() {
- return fileScans().stream()
+ return fileScanTasks().stream()
.map(FileScanTask::file)
.collect(Collectors.toCollection(DataFileSet::create));
}
@@ -70,43 +79,60 @@ public class RewriteFileGroup {
public RewriteDataFiles.FileGroupRewriteResult asResult() {
Preconditions.checkState(addedFiles != null, "Cannot get result, Group was
never rewritten");
return ImmutableRewriteDataFiles.FileGroupRewriteResult.builder()
- .info(info)
+ .info(info())
.addedDataFilesCount(addedFiles.size())
- .rewrittenDataFilesCount(fileScanTasks.size())
- .rewrittenBytesCount(sizeInBytes())
+ .rewrittenDataFilesCount(fileScanTasks().size())
+ .rewrittenBytesCount(inputFilesSizeInBytes())
.build();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("info", info)
- .add("numRewrittenFiles", fileScanTasks.size())
+ .add("info", info())
+ .add("numRewrittenFiles", fileScanTasks().size())
.add(
"numAddedFiles",
addedFiles == null ? "Rewrite Incomplete" :
Integer.toString(addedFiles.size()))
- .add("numRewrittenBytes", sizeInBytes())
+ .add("numRewrittenBytes", inputFilesSizeInBytes())
+ .add("maxOutputFileSize", maxOutputFileSize())
+ .add("inputSplitSize", inputSplitSize())
+ .add("expectedOutputFiles", expectedOutputFiles())
+ .add("outputSpecId", outputSpecId)
.toString();
}
+ /**
+ * @deprecated use {@link #inputFilesSizeInBytes()}
+ */
+ @Deprecated
public long sizeInBytes() {
- return fileScanTasks.stream().mapToLong(FileScanTask::length).sum();
+ return inputFilesSizeInBytes();
}
+ /**
+ * @deprecated use {@link #inputFileNum()}
+ */
+ @Deprecated
public int numFiles() {
- return fileScanTasks.size();
+ return inputFileNum();
+ }
+
+ public int outputSpecId() {
+ return outputSpecId;
}
public static Comparator<RewriteFileGroup> comparator(RewriteJobOrder
rewriteJobOrder) {
switch (rewriteJobOrder) {
case BYTES_ASC:
- return Comparator.comparing(RewriteFileGroup::sizeInBytes);
+ return Comparator.comparing(RewriteFileGroup::inputFilesSizeInBytes);
case BYTES_DESC:
- return Comparator.comparing(RewriteFileGroup::sizeInBytes,
Comparator.reverseOrder());
+ return Comparator.comparing(
+ RewriteFileGroup::inputFilesSizeInBytes,
Comparator.reverseOrder());
case FILES_ASC:
- return Comparator.comparing(RewriteFileGroup::numFiles);
+ return Comparator.comparing(RewriteFileGroup::inputFileNum);
case FILES_DESC:
- return Comparator.comparing(RewriteFileGroup::numFiles,
Comparator.reverseOrder());
+ return Comparator.comparing(RewriteFileGroup::inputFileNum,
Comparator.reverseOrder());
default:
return (unused, unused2) -> 0;
}
diff --git
a/core/src/main/java/org/apache/iceberg/actions/RewriteGroupBase.java
b/core/src/main/java/org/apache/iceberg/actions/RewriteGroupBase.java
new file mode 100644
index 0000000000..28d182bd28
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteGroupBase.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+
+/**
+ * Container class representing a set of files to be rewritten by a {@link
FileRewriteRunner}.
+ *
+ * @param <I> the Java type of the plan info member like {@link
RewriteDataFiles.FileGroupInfo} or
+ * {@link RewritePositionDeleteFiles.FileGroupInfo}
+ * @param <T> the Java type of the input scan tasks (input)
+ * @param <F> the Java type of the content files (input and output)
+ */
+public abstract class RewriteGroupBase<I, T extends ContentScanTask<F>, F
extends ContentFile<F>> {
+ private final I info;
+ private final List<T> fileScanTasks;
+ private final long maxOutputFileSize;
+ private final long inputSplitSize;
+ private final int expectedOutputFiles;
+
+ RewriteGroupBase(
+ I info,
+ List<T> fileScanTasks,
+ long maxOutputFileSize,
+ long inputSplitSize,
+ int expectedOutputFiles) {
+ this.info = info;
+ this.fileScanTasks = fileScanTasks;
+ this.maxOutputFileSize = maxOutputFileSize;
+ this.inputSplitSize = inputSplitSize;
+ this.expectedOutputFiles = expectedOutputFiles;
+ }
+
+ /** Identifiers and partition information about the group. */
+ public I info() {
+ return info;
+ }
+
+ /** Scan tasks for input files. */
+ public List<T> fileScanTasks() {
+ return fileScanTasks;
+ }
+
+ /** Accumulated size for the input files. */
+ public long inputFilesSizeInBytes() {
+ return fileScanTasks.stream().mapToLong(T::length).sum();
+ }
+
+ /** Number of the input files. */
+ public int inputFileNum() {
+ return fileScanTasks.size();
+ }
+
+ /**
+ * The target file size which should be used by the {@link
FileRewriteRunner}. The {@link
+ * FileRewritePlanner} could chose different values than defined by the
table properties.
+ *
+ * @return the target size should be used by the runner
+ */
+ public long maxOutputFileSize() {
+ return maxOutputFileSize;
+ }
+
+ /**
+ * The amount of bytes of data the {@link FileRewriteRunner} should read
from a single group in a
+ * single read task. The {@link FileRewritePlanner} chooses a value to allow
parallelization for
+ * the runners, but prevent fragmentation of the output caused by too many
readers.
+ */
+ public long inputSplitSize() {
+ return inputSplitSize;
+ }
+
+ /** The total number of files that should be produced by the rewrite of this
entire file group. */
+ public int expectedOutputFiles() {
+ return expectedOutputFiles;
+ }
+}
diff --git
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
index d1c688417a..168c63f66f 100644
---
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
+++
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
@@ -35,27 +35,35 @@ import org.apache.iceberg.util.DeleteFileSet;
* Container class representing a set of position delete files to be rewritten
by a {@link
* RewritePositionDeleteFiles} and the new files which have been written by
the action.
*/
-public class RewritePositionDeletesGroup {
- private final FileGroupInfo info;
- private final List<PositionDeletesScanTask> tasks;
+public class RewritePositionDeletesGroup
+ extends RewriteGroupBase<FileGroupInfo, PositionDeletesScanTask,
DeleteFile> {
private final long maxRewrittenDataSequenceNumber;
private DeleteFileSet addedDeleteFiles = DeleteFileSet.create();
+ @Deprecated
public RewritePositionDeletesGroup(FileGroupInfo info,
List<PositionDeletesScanTask> tasks) {
+ this(info, tasks, 0L, 0L, 0);
+ }
+
+ public RewritePositionDeletesGroup(
+ FileGroupInfo info,
+ List<PositionDeletesScanTask> tasks,
+ long writeMaxFileSize,
+ long splitSize,
+ int expectedOutputFiles) {
+ super(info, tasks, writeMaxFileSize, splitSize, expectedOutputFiles);
Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty");
- this.info = info;
- this.tasks = tasks;
this.maxRewrittenDataSequenceNumber =
tasks.stream().mapToLong(t ->
t.file().dataSequenceNumber()).max().getAsLong();
}
- public FileGroupInfo info() {
- return info;
- }
-
+ /**
+ * @deprecated use {@link #fileScanTasks()}
+ */
+ @Deprecated
public List<PositionDeletesScanTask> tasks() {
- return tasks;
+ return fileScanTasks();
}
public void setOutputFiles(Set<DeleteFile> files) {
@@ -67,7 +75,7 @@ public class RewritePositionDeletesGroup {
}
public Set<DeleteFile> rewrittenDeleteFiles() {
- return tasks().stream()
+ return fileScanTasks().stream()
.map(PositionDeletesScanTask::file)
.collect(Collectors.toCollection(DeleteFileSet::create));
}
@@ -81,10 +89,10 @@ public class RewritePositionDeletesGroup {
addedDeleteFiles != null, "Cannot get result, Group was never
rewritten");
return ImmutableRewritePositionDeleteFiles.FileGroupRewriteResult.builder()
- .info(info)
+ .info(info())
.addedDeleteFilesCount(addedDeleteFiles.size())
- .rewrittenDeleteFilesCount(tasks.size())
- .rewrittenBytesCount(rewrittenBytes())
+ .rewrittenDeleteFilesCount(inputFileNum())
+ .rewrittenBytesCount(inputFilesSizeInBytes())
.addedBytesCount(addedBytes())
.build();
}
@@ -92,42 +100,53 @@ public class RewritePositionDeletesGroup {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("info", info)
- .add("numRewrittenPositionDeleteFiles", tasks.size())
+ .add("info", info())
+ .add("numRewrittenPositionDeleteFiles", fileScanTasks().size())
.add(
"numAddedPositionDeleteFiles",
addedDeleteFiles == null
? "Rewrite Incomplete"
: Integer.toString(addedDeleteFiles.size()))
.add("numAddedBytes", addedBytes())
- .add("numRewrittenBytes", rewrittenBytes())
+ .add("numRewrittenBytes", inputFilesSizeInBytes())
+ .add("maxOutputFileSize", maxOutputFileSize())
+ .add("inputSplitSize", inputSplitSize())
+ .add("expectedOutputFiles", expectedOutputFiles())
.toString();
}
+ /**
+ * @deprecated use {@link #inputFilesSizeInBytes()}
+ */
+ @Deprecated
public long rewrittenBytes() {
- return tasks.stream().mapToLong(PositionDeletesScanTask::length).sum();
+ return inputFilesSizeInBytes();
}
public long addedBytes() {
return
addedDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
}
+ /**
+ * @deprecated use {@link #inputFileNum()}
+ */
+ @Deprecated
public int numRewrittenDeleteFiles() {
- return tasks.size();
+ return inputFileNum();
}
public static Comparator<RewritePositionDeletesGroup>
comparator(RewriteJobOrder order) {
switch (order) {
case BYTES_ASC:
- return
Comparator.comparing(RewritePositionDeletesGroup::rewrittenBytes);
+ return
Comparator.comparing(RewritePositionDeletesGroup::inputFilesSizeInBytes);
case BYTES_DESC:
return Comparator.comparing(
- RewritePositionDeletesGroup::rewrittenBytes,
Comparator.reverseOrder());
+ RewritePositionDeletesGroup::inputFilesSizeInBytes,
Comparator.reverseOrder());
case FILES_ASC:
- return
Comparator.comparing(RewritePositionDeletesGroup::numRewrittenDeleteFiles);
+ return Comparator.comparing(RewritePositionDeletesGroup::inputFileNum);
case FILES_DESC:
return Comparator.comparing(
- RewritePositionDeletesGroup::numRewrittenDeleteFiles,
Comparator.reverseOrder());
+ RewritePositionDeletesGroup::inputFileNum,
Comparator.reverseOrder());
default:
return (unused, unused2) -> 0;
}