Copilot commented on code in PR #6551:
URL: https://github.com/apache/paimon/pull/6551#discussion_r2538737698
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -18,22 +18,37 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.orphan.CombinedFlinkOrphanFilesClean;
+import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
-import static
org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.executeDatabaseOrphanFiles;
+import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis;
/** Action to remove the orphan data files and metadata files. */
public class RemoveOrphanFilesAction extends ActionBase {
+ protected static final Logger LOG =
LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
+
private final String databaseName;
- @Nullable private final String tableName;
+ @Nullable private final List<Identifier> tableIdentifiers;
@Nullable private final String parallelism;
private String olderThan = null;
private boolean dryRun = false;
+ private MultiTablesSinkMode mode = COMBINED;
Review Comment:
The default mode is set to `COMBINED` in the code, but the PR description
and the help text in `RemoveOrphanFilesActionFactory` state the default is
`combined`. However, `MultiTablesSinkMode.fromString(null)` returns `DIVIDED`,
not `COMBINED`. This creates an inconsistency: when the `--mode` parameter is
not provided, `fromString(null)` will return `DIVIDED`, but the field default
is `COMBINED`. This means the actual default depends on whether the parameter
is provided. Consider either: (1) removing the field initializer and letting
`mode` be null by default, then using `mode == null || mode == COMBINED` in the
condition, or (2) changing the help text and PR description to accurately
reflect that when `--mode` is not specified, the behavior defaults to `DIVIDED`
per the `fromString` method.
```suggestion
@Nullable private MultiTablesSinkMode mode;
```
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java:
##########
@@ -79,7 +127,23 @@ public void printHelp() {
System.out.println();
System.out.println(
- "If the table is null or *, all orphan files in all tables
under the db will be cleaned up.");
+ "If neither '--table' nor '--tables' is specified, all orphan
files in all tables under the db will be cleaned up.");
+ System.out.println();
+
+ System.out.println(
+ "Use '--table' to specify a single table, or '--tables'
multiple times to specify multiple tables "
+ + "(e.g., '--tables table1 --tables table2 --tables
table3'). "
+ + "These two parameters cannot be used together.");
+ System.out.println();
+
+ System.out.println(
+ "When '--mode combined', multiple tables will be processed
within a single DataStream "
+ + "during job graph construction, instead of creating
one dataStream per table. "
+ + "This significantly reduces job graph construction
time, when processing "
+ + "thousands of tables (jobs may fail to start within
timeout limits). "
+ + "It also reduces JobGraph complexity and avoids
stack over flow issue and resource allocation failures during job running. "
Review Comment:
Corrected spelling of 'dataStream' to 'DataStream' for consistency with
Flink terminology and corrected 'stack over flow' to 'stack overflow'.
```suggestion
+ "during job graph construction, instead of
creating one DataStream per table. "
+ "This significantly reduces job graph construction
time, when processing "
+ "thousands of tables (jobs may fail to start
within timeout limits). "
+ "It also reduces JobGraph complexity and avoids
stack overflow issue and resource allocation failures during job running. "
```
##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java:
##########
@@ -98,6 +102,67 @@ private Path getOrphanFilePath(FileStoreTable table, String
orphanFile) {
return new Path(table.location(), orphanFile);
}
+ private List<String> readTableData(FileStoreTable table) throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ List<String> result =
+ getResultLocal(
+ readBuilder.newRead(),
+ plan == null ? Collections.emptyList() : plan.splits(),
+ rowType);
+ return result;
+ }
+
+ private List<String> getResultLocal(
+ org.apache.paimon.table.source.TableRead read,
+ List<org.apache.paimon.table.source.Split> splits,
+ RowType rowType)
+ throws Exception {
+ try
(org.apache.paimon.reader.RecordReader<org.apache.paimon.data.InternalRow>
+ recordReader = read.createReader(splits)) {
+ List<String> result = new ArrayList<>();
+ recordReader.forEachRemaining(
+ row -> result.add(internalRowToStringLocal(row, rowType)));
+ return result;
+ }
+ }
+
+ /**
+ * Stringify the given {@link InternalRow}. This is a simplified version
that handles basic
+ * types. For complex types (Array, Map, Row), it falls back to toString().
+ *
+ * <p>This method is implemented locally to avoid dependency on
paimon-common's test-jar, which
+ * may not be available in CI environments.
+ */
Review Comment:
The documentation states the method handles complex types by falling back to
toString(), but the actual implementation doesn't handle complex types at
all—it only handles basic types using FieldGetter. The comment is misleading
and should be updated to accurately reflect what the method actually does:
'This is a simplified version that handles basic types only. Complex types
(Array, Map, Row) are not explicitly handled and rely on the default object
representation from FieldGetter.'
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java:
##########
@@ -79,7 +127,23 @@ public void printHelp() {
System.out.println();
System.out.println(
- "If the table is null or *, all orphan files in all tables
under the db will be cleaned up.");
+ "If neither '--table' nor '--tables' is specified, all orphan
files in all tables under the db will be cleaned up.");
+ System.out.println();
+
+ System.out.println(
+ "Use '--table' to specify a single table, or '--tables'
multiple times to specify multiple tables "
+ + "(e.g., '--tables table1 --tables table2 --tables
table3'). "
+ + "These two parameters cannot be used together.");
+ System.out.println();
+
+ System.out.println(
+ "When '--mode combined', multiple tables will be processed
within a single DataStream "
+ + "during job graph construction, instead of creating
one dataStream per table. "
+ + "This significantly reduces job graph construction
time, when processing "
+ + "thousands of tables (jobs may fail to start within
timeout limits). "
+ + "It also reduces JobGraph complexity and avoids
stack over flow issue and resource allocation failures during job running. "
Review Comment:
Corrected spelling of 'stack over flow' to 'stack overflow'.
```suggestion
+ "It also reduces JobGraph complexity and avoids
stack overflow issue and resource allocation failures during job running. "
```
##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java:
##########
@@ -392,6 +457,466 @@ public void testRunWithMode(boolean isNamedArgument)
throws Exception {
.hasMessageContaining("Unknown mode");
}
+ @org.junit.jupiter.api.Test
+ public void testCombinedMode() throws Exception {
+ long fileCreationTime = System.currentTimeMillis();
+ FileStoreTable table1 = createTableAndWriteData("batchTable1");
+ FileStoreTable table2 = createTableAndWriteData("batchTable2");
+ FileStoreTable table3 = createTableAndWriteData("batchTable3");
+
+ FileIO fileIO1 = table1.fileIO();
+ FileIO fileIO2 = table2.fileIO();
+ FileIO fileIO3 = table3.fileIO();
+
+ Path orphanFile1Table1 = getOrphanFilePath(table1, ORPHAN_FILE_1);
+ Path orphanFile2Table1 = getOrphanFilePath(table1, ORPHAN_FILE_2);
+ Path orphanFile1Table2 = getOrphanFilePath(table2, ORPHAN_FILE_1);
+ Path orphanFile2Table2 = getOrphanFilePath(table2, ORPHAN_FILE_2);
+ Path orphanFile1Table3 = getOrphanFilePath(table3, ORPHAN_FILE_1);
+ Path orphanFile2Table3 = getOrphanFilePath(table3, ORPHAN_FILE_2);
+
+ Path[] orphanFiles = {
+ orphanFile1Table1, orphanFile2Table1,
+ orphanFile1Table2, orphanFile2Table2,
+ orphanFile1Table3, orphanFile2Table3
+ };
+ FileIO[] fileIOs = {fileIO1, fileIO1, fileIO2, fileIO2, fileIO3,
fileIO3};
+
+ Thread.sleep(2000);
+
+ long currentTime = System.currentTimeMillis();
+ long olderThanMillis = Math.max(fileCreationTime + 1000, currentTime -
1000);
+ String olderThan =
+ DateTimeUtils.formatLocalDateTime(
+ DateTimeUtils.toLocalDateTime(olderThanMillis), 3);
+
+ long expectedFileCount = 6;
+ long expectedTotalSize = 0;
+ for (int i = 0; i < orphanFiles.length; i++) {
+ if (fileIOs[i].exists(orphanFiles[i])) {
+ expectedTotalSize += fileIOs[i].getFileSize(orphanFiles[i]);
+ }
+ }
+
+ // Test divided mode
+ String dividedMode =
+ String.format(
+ "CALL sys.remove_orphan_files('%s.%s', '%s', false)",
+ database, "*", olderThan);
+ ImmutableList<Row> dividedModeResult =
ImmutableList.copyOf(executeSQL(dividedMode));
+ assertThat(dividedModeResult).hasSize(2);
+ long deletedFileCountWithDivided =
+
Long.parseLong(dividedModeResult.get(0).getField(0).toString());
+ long deletedFileTotalLenInBytesWithDivided =
+
Long.parseLong(dividedModeResult.get(1).getField(0).toString());
+ assertThat(deletedFileCountWithDivided)
+ .as("divided mode should delete 6 orphan files")
+ .isEqualTo(expectedFileCount);
+ assertThat(deletedFileTotalLenInBytesWithDivided)
+ .as("divided mode should delete files with expected total
size")
+ .isEqualTo(expectedTotalSize);
+
+ // Verify files are deleted by divided mode
+ for (int i = 0; i < orphanFiles.length; i++) {
+ assertThat(fileIOs[i].exists(orphanFiles[i]))
+ .as("Orphan file should be deleted by divided mode")
+ .isFalse();
+ }
+
+ // Recreate orphan files for combined mode test
+ long combinedFileCreationTime = System.currentTimeMillis();
+ for (int i = 0; i < orphanFiles.length; i++) {
+ fileIOs[i].writeFile(orphanFiles[i], "orphan", true);
+ }
+ Thread.sleep(2000);
+
+ long combinedCurrentTime = System.currentTimeMillis();
+ long combinedOlderThanMillis =
+ Math.max(combinedFileCreationTime + 1000, combinedCurrentTime
- 1000);
+ String combinedOlderThan =
+ DateTimeUtils.formatLocalDateTime(
+
DateTimeUtils.toLocalDateTime(combinedOlderThanMillis), 3);
+
+ // Test combined mode
+ List<String> args =
+ new ArrayList<>(
+ Arrays.asList(
+ "remove_orphan_files",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ "*",
+ "--mode",
+ "combined",
+ "--dry_run",
+ "false",
+ "--older_than",
+ combinedOlderThan));
+ RemoveOrphanFilesAction action1 =
createAction(RemoveOrphanFilesAction.class, args);
+ assertThatCode(action1::run).doesNotThrowAnyException();
+
+ // Verify files are deleted by combined mode (same result as divided
mode)
+ for (int i = 0; i < orphanFiles.length; i++) {
+ assertThat(fileIOs[i].exists(orphanFiles[i]))
+ .as("Orphan file should be deleted by combined mode (same
as divided mode)")
+ .isFalse();
+ }
+
+ // Verify that normal data in tables can still be read after combined
mode deletion
+ List<String> table1Data = readTableData(table1);
+ assertThat(table1Data)
+ .as("Table1 should still contain normal data after combined
mode deletion")
+ .containsExactly("+I[1, Hi]");
+
+ List<String> table2Data = readTableData(table2);
+ assertThat(table2Data)
+ .as("Table2 should still contain normal data after combined
mode deletion")
+ .containsExactly("+I[1, Hi]");
+
+ List<String> table3Data = readTableData(table3);
+ assertThat(table3Data)
+ .as("Table3 should still contain normal data after combined
mode deletion")
+ .containsExactly("+I[1, Hi]");
+ }
+
+ @org.junit.jupiter.api.Test
+ public void testCombinedModeWithBranch() throws Exception {
+ long fileCreationTime = System.currentTimeMillis();
+
+ FileStoreTable table = createTableAndWriteData("combinedBranchTable");
+
+ // Create first branch and write data
+ table.createBranch("br1");
+ FileStoreTable branchTable1 = createBranchTable(table, "br1");
+ writeToBranch(branchTable1, GenericRow.of(2L,
BinaryString.fromString("Hello"), 20));
+
+ // Create second branch and write data
+ table.createBranch("br2");
+ FileStoreTable branchTable2 = createBranchTable(table, "br2");
+ writeToBranch(branchTable2, GenericRow.of(3L,
BinaryString.fromString("World"), 30));
+
+ // Create orphan files in both branch snapshot directories
+ // This is key: same table, multiple branches - will trigger bug in
Review Comment:
The comment 'will trigger bug in' is incomplete and unclear. It appears to
be referring to a bug that was fixed, but the comment doesn't explain what bug
or what the expected behavior is. This should either be completed or removed,
e.g., 'This tests that combined mode correctly handles multiple branches within
the same table'.
```suggestion
// This tests that combined mode correctly handles multiple branches
within the same table and removes orphan files from each branch.
```
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/CombinedFlinkOrphanFilesClean.java:
##########
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.orphan;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.flink.utils.BoundedTwoInputOperator;
+import org.apache.paimon.flink.utils.OrphanFilesCleanUtil;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.operation.CleanOrphanFilesResult;
+import org.apache.paimon.operation.OrphanFilesClean;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.sum;
+
+/**
+ * Flink {@link OrphanFilesClean}, it will submit a job for multiple tables in
a combined
+ * DataStream.
+ */
+public class CombinedFlinkOrphanFilesClean implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(CombinedFlinkOrphanFilesClean.class);
+
+ protected final List<Identifier> tableIdentifiers;
+ protected final long olderThanMillis;
+ protected final boolean dryRun;
+ @Nullable protected final Integer parallelism;
+ // Map to store cleaners by their full identifier name, for quick lookup
in ProcessFunctions
+ protected final Map<String, FlinkOrphanFilesClean> cleanerMap;
+ // Map from table location to cleaner for quick lookup in
BoundedTwoInputOperator
+ protected final Map<String, FlinkOrphanFilesClean> locationToCleanerMap;
+
+ public CombinedFlinkOrphanFilesClean(
+ List<Identifier> tableIdentifiers,
+ Map<String, FlinkOrphanFilesClean> cleanerMap,
+ long olderThanMillis,
+ boolean dryRun,
+ @Nullable Integer parallelism) {
+ this.olderThanMillis = olderThanMillis;
+ this.dryRun = dryRun;
+ this.tableIdentifiers = tableIdentifiers;
+ this.parallelism = parallelism;
+ this.cleanerMap = cleanerMap;
+ this.locationToCleanerMap = new HashMap<>();
+ for (Identifier tableIdentifier : tableIdentifiers) {
+ FlinkOrphanFilesClean cleaner =
cleanerMap.get(tableIdentifier.getFullName());
+ FileStoreTable table = cleaner.getTable();
+ if (Objects.nonNull(table)) {
+ // Add table location
+ String tableLocation = table.location().toUri().getPath();
+ this.locationToCleanerMap.put(tableLocation, cleaner);
+ // Add external paths if they exist
+ String externalPaths =
table.coreOptions().dataFileExternalPaths();
+ if (externalPaths != null && !externalPaths.isEmpty()) {
+ String[] externalPathArr = externalPaths.split(",");
+ for (String externalPathStr : externalPathArr) {
+ String externalPath = new
Path(externalPathStr.trim()).toUri().getPath();
+ this.locationToCleanerMap.put(externalPath, cleaner);
+ }
+ }
+ }
+ }
+ }
+
+ protected DataStream<CleanOrphanFilesResult>
buildBranchSnapshotDirDeletedStream(
+ StreamExecutionEnvironment env, List<Identifier>
branchIdentifiers) {
+ return env.fromCollection(branchIdentifiers)
+ .process(
+ new ProcessFunction<Identifier, Tuple2<Long, Long>>() {
+ @Override
+ public void processElement(
+ Identifier identifier,
+ ProcessFunction<Identifier, Tuple2<Long,
Long>>.Context ctx,
+ Collector<Tuple2<Long, Long>> out) {
+ FlinkOrphanFilesClean cleaner =
getCleanerForTable(identifier);
+ cleaner.processForBranchSnapshotDirDeleted(
+ identifier.getBranchNameOrDefault(),
out);
+ }
+ })
+ .keyBy(tuple -> 1)
+ .reduce(
+ (ReduceFunction<Tuple2<Long, Long>>)
+ (value1, value2) ->
+ new Tuple2<>(value1.f0 + value2.f0,
value1.f1 + value2.f1))
+ .setParallelism(1)
+ .map(tuple -> new CleanOrphanFilesResult(tuple.f0, tuple.f1));
+ }
+
+ @Nullable
+ public DataStream<CleanOrphanFilesResult>
doOrphanClean(StreamExecutionEnvironment env) {
+ OrphanFilesCleanUtil.configureFlinkEnvironment(env, parallelism);
+ LOG.info(
+ "Starting orphan files clean for {} tables: {}",
+ tableIdentifiers.size(),
+ tableIdentifiers);
+ List<Identifier> branchIdentifiers = new ArrayList<>();
+ long start = System.currentTimeMillis();
+ for (Identifier tableIdentifier : tableIdentifiers) {
+ FlinkOrphanFilesClean cleaner =
cleanerMap.get(tableIdentifier.getFullName());
+ if (Objects.isNull(cleaner)) {
+ LOG.warn("Table {} does not have cleaner, skip it",
tableIdentifier);
+ continue;
+ }
+ List<String> branches = cleaner.validBranches();
+ branches.forEach(
+ branch ->
+ branchIdentifiers.add(
+ new Identifier(
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getTableName(),
+ branch)));
+ }
+ LOG.info(
+ "End orphan files validBranches for {} tables: spend [{}] ms",
+ cleanerMap.size(),
+ System.currentTimeMillis() - start);
+
+ // snapshot and changelog files are the root of everything, so they
are handled specially
+ // here, and subsequently, we will not count their orphan files.
+ DataStream<CleanOrphanFilesResult> branchSnapshotDirDeleted =
+ buildBranchSnapshotDirDeletedStream(env, branchIdentifiers);
+
+ // branch and manifest file
+ // f0: Identifier (table identifier without branch)
+ // f1: Tuple2<String, String> (branch, manifest)
+ final OutputTag<Tuple2<Identifier, Tuple2<String, String>>>
manifestOutputTag =
+ new OutputTag<Tuple2<Identifier, Tuple2<String,
String>>>("manifest-output") {};
+
+ SingleOutputStreamOperator<String> usedManifestFiles =
+ env.fromCollection(branchIdentifiers)
+ .process(
+ new ProcessFunction<Identifier,
Tuple2<Identifier, String>>() {
+
+ @Override
+ public void processElement(
+ Identifier identifier,
+ ProcessFunction<Identifier,
Tuple2<Identifier, String>>
+ .Context
+ ctx,
+ Collector<Tuple2<Identifier,
String>> out)
+ throws Exception {
+ FlinkOrphanFilesClean cleaner =
+ getCleanerForTable(identifier);
+ for (Snapshot snapshot :
+ cleaner.safelyGetAllSnapshots(
+
identifier.getBranchNameOrDefault())) {
+ out.collect(
+ new Tuple2<>(identifier,
snapshot.toJson()));
+ }
+ }
+ })
+ .rebalance()
+ .process(
+ new ProcessFunction<Tuple2<Identifier,
String>, String>() {
+
+ @Override
+ public void processElement(
+ Tuple2<Identifier, String>
branchAndSnapshot,
+ ProcessFunction<Tuple2<Identifier,
String>, String>
+ .Context
+ ctx,
+ Collector<String> out)
+ throws Exception {
+ Identifier identifier =
branchAndSnapshot.f0;
+ Snapshot snapshot =
Snapshot.fromJson(branchAndSnapshot.f1);
+ FlinkOrphanFilesClean cleaner =
+ getCleanerForTable(identifier);
+ String branch =
identifier.getBranchNameOrDefault();
+ Identifier tableIdentifier =
+ Identifier.create(
+
identifier.getDatabaseName(),
+
identifier.getTableName());
+ Consumer<String> manifestConsumer =
+ manifest -> {
+ Tuple2<Identifier,
Tuple2<String, String>>
+ tuple2 =
+ new
Tuple2<>(
+
tableIdentifier,
+
new Tuple2<>(
+
branch,
+
manifest));
+ LOG.trace(
+
"[COMBINED_ORPHAN_CLEAN] Outputting manifest to side output: identifier={},
branch={}, manifest={}",
+ tableIdentifier,
+ branch,
+ manifest);
+
ctx.output(manifestOutputTag, tuple2);
+ };
+ cleaner.collectWithoutDataFile(
+ branch, snapshot,
out::collect, manifestConsumer);
+ }
+ });
+
+ DataStream<String> usedFiles =
+ usedManifestFiles
+ .getSideOutput(manifestOutputTag)
+ .keyBy(tuple2 -> tuple2.f0) // Use Identifier object
directly as key
Review Comment:
[nitpick] The comment 'Use Identifier object directly as key' is misleading.
The code uses `tuple2.f0` which is indeed an Identifier, but the comment should
clarify that Identifier objects are used as keys for grouping by table.
Consider: 'Group by table identifier to process manifests per table'.
```suggestion
.keyBy(tuple2 -> tuple2.f0) // Group by table
identifier to process manifests per table
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]