This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c16cefa501 Core: Move deleteRemovedMetadataFiles(..) to CatalogUtil
(#11352)
c16cefa501 is described below
commit c16cefa5015dda417f80e0d59124cd92448787ab
Author: leesf <[email protected]>
AuthorDate: Mon Oct 21 20:35:03 2024 +0800
Core: Move deleteRemovedMetadataFiles(..) to CatalogUtil (#11352)
---
.../iceberg/BaseMetastoreTableOperations.java | 49 +---------------------
.../main/java/org/apache/iceberg/CatalogUtil.java | 45 ++++++++++++++++++++
.../iceberg/hadoop/HadoopTableOperations.java | 40 +-----------------
3 files changed, 48 insertions(+), 86 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
index 53f3250dc9..dbab9e8139 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
@@ -19,7 +19,6 @@
package org.apache.iceberg;
import java.util.Locale;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -32,11 +31,8 @@ import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
@@ -127,7 +123,7 @@ public abstract class BaseMetastoreTableOperations extends
BaseMetastoreOperatio
long start = System.currentTimeMillis();
doCommit(base, metadata);
- deleteRemovedMetadataFiles(base, metadata);
+ CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata);
requestRefresh();
LOG.info(
@@ -354,47 +350,4 @@ public abstract class BaseMetastoreTableOperations extends
BaseMetastoreOperatio
return -1;
}
}
-
- /**
- * Deletes the oldest metadata files if {@link
- * TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
- *
- * @param base table metadata on which previous versions were based
- * @param metadata new table metadata with updated previous versions
- */
- private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata
metadata) {
- if (base == null) {
- return;
- }
-
- boolean deleteAfterCommit =
- metadata.propertyAsBoolean(
- TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
- TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);
-
- if (deleteAfterCommit) {
- Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles =
- Sets.newHashSet(base.previousFiles());
- // TableMetadata#addPreviousFile builds up the metadata log and uses
- // TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many
files should stay in
- // the log, thus we don't include metadata.previousFiles() for deletion
- everything else can
- // be removed
- removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
- if (io() instanceof SupportsBulkOperations) {
- ((SupportsBulkOperations) io())
- .deleteFiles(
- Iterables.transform(
- removedPreviousMetadataFiles,
TableMetadata.MetadataLogEntry::file));
- } else {
- Tasks.foreach(removedPreviousMetadataFiles)
- .noRetry()
- .suppressFailureWhenFinished()
- .onFailure(
- (previousMetadataFile, exc) ->
- LOG.warn(
- "Delete failed for previous metadata file: {}",
previousMetadataFile, exc))
- .run(previousMetadataFile ->
io().deleteFile(previousMetadataFile.file()));
- }
- }
- }
}
diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java
b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
index 70b10cbaeb..609e94b7b1 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
@@ -515,4 +515,49 @@ public class CatalogUtil {
return sb.toString();
}
+
+ /**
+ * Deletes the oldest metadata files if {@link
+ * TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
+ *
+ * @param io FileIO instance to use for deletes
+ * @param base table metadata on which previous versions were based
+ * @param metadata new table metadata with updated previous versions
+ */
+ public static void deleteRemovedMetadataFiles(
+ FileIO io, TableMetadata base, TableMetadata metadata) {
+ if (base == null) {
+ return;
+ }
+
+ boolean deleteAfterCommit =
+ metadata.propertyAsBoolean(
+ TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
+ TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);
+
+ if (deleteAfterCommit) {
+ Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles =
+ Sets.newHashSet(base.previousFiles());
+ // TableMetadata#addPreviousFile builds up the metadata log and uses
+ // TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many
files should stay in
+ // the log, thus we don't include metadata.previousFiles() for deletion
- everything else can
+ // be removed
+ removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
+ if (io instanceof SupportsBulkOperations) {
+ ((SupportsBulkOperations) io)
+ .deleteFiles(
+ Iterables.transform(
+ removedPreviousMetadataFiles,
TableMetadata.MetadataLogEntry::file));
+ } else {
+ Tasks.foreach(removedPreviousMetadataFiles)
+ .noRetry()
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (previousMetadataFile, exc) ->
+ LOG.warn(
+ "Delete failed for previous metadata file: {}",
previousMetadataFile, exc))
+ .run(previousMetadataFile ->
io.deleteFile(previousMetadataFile.file()));
+ }
+ }
+ }
}
diff --git
a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 1e0cf44221..2429937140 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -22,7 +22,6 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
-import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -31,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.LocationProviders;
import org.apache.iceberg.LockManager;
import org.apache.iceberg.TableMetadata;
@@ -45,10 +45,7 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.Tasks;
-import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,7 +165,7 @@ public class HadoopTableOperations implements
TableOperations {
// update the best-effort version pointer
writeVersionHint(nextVersion);
- deleteRemovedMetadataFiles(base, metadata);
+ CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata);
this.shouldRefresh = true;
}
@@ -414,39 +411,6 @@ public class HadoopTableOperations implements
TableOperations {
return Util.getFs(path, hadoopConf);
}
- /**
- * Deletes the oldest metadata files if {@link
- * TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
- *
- * @param base table metadata on which previous versions were based
- * @param metadata new table metadata with updated previous versions
- */
- private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata
metadata) {
- if (base == null) {
- return;
- }
-
- boolean deleteAfterCommit =
- metadata.propertyAsBoolean(
- TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
- TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);
-
- if (deleteAfterCommit) {
- Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles =
- Sets.newHashSet(base.previousFiles());
- removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
- Tasks.foreach(removedPreviousMetadataFiles)
- .executeWith(ThreadPools.getWorkerPool())
- .noRetry()
- .suppressFailureWhenFinished()
- .onFailure(
- (previousMetadataFile, exc) ->
- LOG.warn(
- "Delete failed for previous metadata file: {}",
previousMetadataFile, exc))
- .run(previousMetadataFile ->
io().deleteFile(previousMetadataFile.file()));
- }
- }
-
private static TableMetadata checkUUID(TableMetadata currentMetadata,
TableMetadata newMetadata) {
String newUUID = newMetadata.uuid();
if (currentMetadata != null && currentMetadata.uuid() != null && newUUID
!= null) {