szehon-ho commented on code in PR #7029:
URL: https://github.com/apache/iceberg/pull/7029#discussion_r1153631227
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java:
##########
@@ -18,79 +18,23 @@
*/
package org.apache.iceberg.spark;
-import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
-import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class FileRewriteCoordinator {
+public class FileRewriteCoordinator extends
BaseFileRewriteCoordinator<DataFile> {
- private static final Logger LOG =
LoggerFactory.getLogger(FileRewriteCoordinator.class);
private static final FileRewriteCoordinator INSTANCE = new
FileRewriteCoordinator();
- private final Map<Pair<String, String>, Set<DataFile>> resultMap =
Maps.newConcurrentMap();
-
private FileRewriteCoordinator() {}
public static FileRewriteCoordinator get() {
return INSTANCE;
}
- /**
- * Called to persist the output of a rewrite action for a specific group.
Since the write is done
- * via a Spark Datasource, we have to propagate the result through this
side-effect call.
- *
- * @param table table where the rewrite is occurring
- * @param fileSetID the id used to identify the source set of files being
rewritten
- * @param newDataFiles the new files which have been written
- */
- public void stageRewrite(Table table, String fileSetID, Set<DataFile>
newDataFiles) {
- LOG.debug(
- "Staging the output for {} - fileset {} with {} files",
- table.name(),
- fileSetID,
- newDataFiles.size());
- Pair<String, String> id = toID(table, fileSetID);
- resultMap.put(id, newDataFiles);
- }
-
- public Set<DataFile> fetchNewDataFiles(Table table, String fileSetID) {
- Pair<String, String> id = toID(table, fileSetID);
- Set<DataFile> result = resultMap.get(id);
- ValidationException.check(
- result != null, "No results for rewrite of file set %s in table %s",
fileSetID, table);
-
- return result;
- }
-
- public void clearRewrite(Table table, String fileSetID) {
- LOG.debug("Removing entry from RewriteCoordinator for {} - id {}",
table.name(), fileSetID);
- Pair<String, String> id = toID(table, fileSetID);
- resultMap.remove(id);
- }
-
- public Set<String> fetchSetIDs(Table table) {
Review Comment:
Not sure about this one, isn't it directly inherited from base (so no
change)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]