This is an automated email from the ASF dual-hosted git repository.

noemi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new fdad9d320 IMPALA-13725: Add Iceberg table repair functionalities
fdad9d320 is described below

commit fdad9d32041a736108b876704bd0354090a88d29
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Tue Oct 7 16:45:31 2025 +0200

    IMPALA-13725: Add Iceberg table repair functionalities
    
    In some cases users delete files directly from storage without
    going through the Iceberg API, e.g. they remove old partitions.
    
    This corrupts the table, and makes queries that try to read the
    missing files fail.
    This change introduces a repair statement that deletes the
    dangling references of missing files from the metadata.
    Note that the table cannot be repaired if there are missing
    delete files because Iceberg's DeleteFiles API which is used
    to execute the operation allows removing only data files.
    
    Testing:
     - E2E
       - HDFS
       - S3, Ozone
     - analysis
    
    Change-Id: I514403acaa3b8c0a7b2581d676b82474d846d38e
    Reviewed-on: http://gerrit.cloudera.org:8080/23512
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 common/thrift/JniCatalog.thrift                    |   7 ++
 .../AlterTableExecuteRepairMetadataStmt.java       |  63 ++++++++++++
 .../impala/analysis/AlterTableExecuteStmt.java     |  32 ++++--
 .../catalog/iceberg/ImpalaRepairIcebergTable.java  | 111 +++++++++++++++++++++
 .../apache/impala/service/CatalogOpExecutor.java   |  13 ++-
 .../impala/service/IcebergCatalogOpExecutor.java   |  11 ++
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  16 +++
 tests/query_test/test_iceberg.py                   |  40 ++++++++
 8 files changed, 283 insertions(+), 10 deletions(-)

diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 2fa27cd18..766f8cd11 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -449,6 +449,10 @@ struct TAlterTableExecuteRemoveOrphanFilesParams {
   1: required i64 older_than_millis
 }
 
+// Parameters for ALTER TABLE EXECUTE REPAIR_METADATA operations.
+struct TAlterTableExecuteRepairMetadataParams {
+}
+
 // Parameters for ALTER TABLE EXECUTE ... operations.
 struct TAlterTableExecuteParams {
   // Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS
@@ -459,6 +463,9 @@ struct TAlterTableExecuteParams {
 
   // Parameters for ALTER TABLE EXECUTE REMOVE_ORPHAN_FILES
   3: optional TAlterTableExecuteRemoveOrphanFilesParams 
remove_orphan_files_params
+
+  // True iff it is an ALTER TABLE EXECUTE REPAIR statement.
+  4: optional TAlterTableExecuteRepairMetadataParams repair_metadata_params
 }
 
 // Parameters for all ALTER TABLE commands.
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRepairMetadataStmt.java
 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRepairMetadataStmt.java
new file mode 100644
index 000000000..10102d9a4
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRepairMetadataStmt.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import hiveexec.com.google.common.base.Preconditions;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TAlterTableExecuteParams;
+import org.apache.impala.thrift.TAlterTableExecuteRepairMetadataParams;
+import org.apache.impala.thrift.TAlterTableParams;
+import org.apache.impala.thrift.TAlterTableType;
+
+public class AlterTableExecuteRepairMetadataStmt extends AlterTableExecuteStmt 
{
+
+  protected final static String USAGE = "EXECUTE REPAIR_METADATA()";
+
+  protected AlterTableExecuteRepairMetadataStmt(TableName tableName, Expr 
fnCallExpr) {
+    super(tableName, fnCallExpr);
+  }
+
+  @Override
+  public String getOperation() { return "EXECUTE REPAIR_METADATA"; }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    FeTable table = getTargetTable();
+    if (!(table instanceof FeIcebergTable)) {
+      throw new AnalysisException(
+          "ALTER TABLE EXECUTE REPAIR_METADATA is only supported "
+          + "for Iceberg tables: " + table.getTableName());
+    }
+    analyzeFunctionCallExpr(analyzer, USAGE);
+  }
+
+  @Override
+  public TAlterTableParams toThrift() {
+    TAlterTableParams params = super.toThrift();
+    params.setAlter_type(TAlterTableType.EXECUTE);
+    TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
+    TAlterTableExecuteRepairMetadataParams repairMetadataParams =
+        new TAlterTableExecuteRepairMetadataParams();
+    executeParams.setRepair_metadata_params(repairMetadataParams);
+    params.setSet_execute_params(executeParams);
+    return params;
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
index 9a65dddcc..501aded9f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
@@ -19,7 +19,6 @@ package org.apache.impala.analysis;
 
 import com.google.common.base.Preconditions;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.impala.common.AnalysisException;
 
 /**
@@ -70,12 +69,15 @@ public class AlterTableExecuteStmt extends AlterTableStmt {
       case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, 
fnCallExpr);
       case "REMOVE_ORPHAN_FILES":
         return new AlterTableExecuteRemoveOrphanFilesStmt(tableName, 
fnCallExpr);
+      case "REPAIR_METADATA":
+        return new AlterTableExecuteRepairMetadataStmt(tableName, fnCallExpr);
       default:
         throw new AnalysisException(String.format("'%s' is not supported by 
ALTER "
                 + "TABLE <table> EXECUTE. Supported operations are: "
                 + "EXPIRE_SNAPSHOTS(<expression>), "
                 + "ROLLBACK(<expression>), "
-                + "REMOVE_ORPHAN_FILES(<expression>).",
+                + "REMOVE_ORPHAN_FILES(<expression>)."
+                + "REPAIR_METADATA(), ",
             functionNameOrig));
     }
   }
@@ -84,14 +86,26 @@ public class AlterTableExecuteStmt extends AlterTableStmt {
       throws AnalysisException {
     // fnCallExpr_ analyzed here manually, because it is not an actual 
function but a
     // catalog operation.
-    String fnName = fnCallExpr_.getFnName().toString();
-    Preconditions.checkState(StringUtils.equalsAnyIgnoreCase(
-        fnName, "EXPIRE_SNAPSHOTS", "ROLLBACK", "REMOVE_ORPHAN_FILES"));
-    if (fnCallExpr_.getParams().size() != 1) {
-      throw new AnalysisException(
-          usage + " must have one parameter: " + fnCallExpr_.toSql());
+    String fnName = fnCallExpr_.getFnName().toString().toUpperCase();
+    switch (fnName) {
+      case "EXPIRE_SNAPSHOTS":
+      case "ROLLBACK":
+      case "REMOVE_ORPHAN_FILES":
+        if (fnCallExpr_.getParams().size() != 1) {
+          throw new AnalysisException(
+              usage + " must have one parameter: " + fnCallExpr_.toSql());
+        }
+        fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
+        break;
+      case "REPAIR_METADATA":
+        if (fnCallExpr_.getParams().size() != 0) {
+          throw new AnalysisException(
+              usage + " should have no parameter: " + fnCallExpr_.toSql());
+        }
+        break;
+      default:
+        Preconditions.checkState(false, "Invalid function call in ALTER TABLE 
EXECUTE.");
     }
-    fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
   }
 
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/ImpalaRepairIcebergTable.java
 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/ImpalaRepairIcebergTable.java
new file mode 100644
index 000000000..4ed950cb3
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/ImpalaRepairIcebergTable.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.iceberg;
+
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.Transaction;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergContentFileStore;
+import org.apache.impala.service.BackendConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+
+public class ImpalaRepairIcebergTable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ImpalaRepairIcebergTable.class);
+  private final FeIcebergTable table_;
+  public ImpalaRepairIcebergTable(FeIcebergTable table) {
+    table_ = table;
+  }
+
+  public int execute(Transaction iceTxn) throws CatalogException {
+    Collection<String> missingFiles;
+    IcebergContentFileStore fileStore = table_.getContentFileStore();
+    if (fileStore.hasMissingFile()) {
+      missingFiles = fileStore.getMissingFiles();
+    } else {
+      missingFiles = getMissingFiles(fileStore);
+    }
+    // Delete all files from the missing files collection
+    int numRemovedReferences = missingFiles.size();
+    if (numRemovedReferences > 0) {
+      DeleteFiles deleteFiles = iceTxn.newDelete();
+      for (String path : missingFiles) {
+        deleteFiles.deleteFile(path);
+      }
+      deleteFiles.commit();
+      LOG.info("Removed {} files during table repair: {}",
+          numRemovedReferences, getFilesLog(missingFiles));
+    } else {
+      LOG.info("No files were removed during table repair.");
+    }
+    return numRemovedReferences;
+  }
+
+  private List<String> getMissingFiles(IcebergContentFileStore fileStore)
+      throws CatalogException {
+    // Check all data files in parallel and create a list of dangling 
references.
+    List<String> missingFiles;
+    ForkJoinPool forkJoinPool = null;
+    try {
+      forkJoinPool = new 
ForkJoinPool(BackendConfig.INSTANCE.icebergCatalogNumThreads());
+      missingFiles = forkJoinPool.submit(() ->
+          StreamSupport.stream(
+              fileStore.getAllDataFiles().spliterator(), /*parallel=*/true)
+            .map(fileDesc -> fileDesc.getAbsolutePath(table_.getLocation()))
+            .filter(path -> 
!table_.getIcebergApiTable().io().newInputFile(path).exists())
+            .collect(Collectors.toList())
+      ).get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new CatalogException(e.getMessage(), e);
+    } finally {
+      if (forkJoinPool != null) {
+        forkJoinPool.shutdown();
+      }
+    }
+    return missingFiles;
+  }
+
+  private static String getFilesLog(Collection<String> missingFiles) {
+    int numPathsToLog = 3;
+    if (LOG.isTraceEnabled()) {
+      numPathsToLog = 1000;
+    }
+    int numRemoved = missingFiles.size();
+    String fileNames = Joiner.on(", ").join(
+        Iterables.limit(missingFiles, numPathsToLog));
+    if (numRemoved > numPathsToLog) {
+      int remaining = numRemoved - numPathsToLog;
+      fileNames += String.format(", and %d more.", remaining);
+    }
+    return fileNames;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 163cca702..8cf929946 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1651,7 +1651,18 @@ public class CatalogOpExecutor {
           } else if (setExecuteParams.isSetRemove_orphan_files_params()) {
             throw new IllegalStateException(
                 "Alter table execute REMOVE_ORPHAN_FILES should not use "
-                + "Iceberg Transaction.");
+                    + "Iceberg Transaction.");
+          } else if (setExecuteParams.isSetRepair_metadata_params()) {
+            int numRemovedReferences =
+                IcebergCatalogOpExecutor.alterTableExecuteRepair(tbl, iceTxn);
+            // Do not commit empty transaction if there were no missing files 
to remove.
+            if (numRemovedReferences == 0) {
+              addSummary(response, "No missing data files detected.");
+              catalogTimeline.markEvent("Abandoned empty Iceberg transaction");
+              return false;
+            }
+            addSummary(response, "Iceberg table repaired by deleting "
+                + numRemovedReferences + " manifest entries of missing data 
files.");
           } else {
             // Cannot happen, but throw just in case.
             throw new IllegalStateException(
diff --git 
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 90e159bf9..3904dc5c0 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -60,6 +60,7 @@ import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.catalog.iceberg.IcebergHiveCatalog;
 import org.apache.impala.catalog.iceberg.ImpalaIcebergDeleteOrphanFiles;
+import org.apache.impala.catalog.iceberg.ImpalaRepairIcebergTable;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.fb.FbIcebergColumnStats;
 import org.apache.impala.fb.FbIcebergDataFile;
@@ -268,6 +269,16 @@ public class IcebergCatalogOpExecutor {
     return "Remove orphan files executed.";
   }
 
+  /**
+   * Executes an ALTER TABLE EXECUTE REPAIR_METADATA.
+   * @return The number of removed missing data files.
+   */
+  public static int alterTableExecuteRepair(FeIcebergTable tbl, Transaction 
iceTxn)
+      throws CatalogException {
+    ImpalaRepairIcebergTable repair = new ImpalaRepairIcebergTable(tbl);
+    return repair.execute(iceTxn);
+  }
+
   /**
    * Deletes files related to specific set of partitions
    */
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 876265e3b..6fac45273 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -4883,6 +4883,22 @@ public class AnalyzeDDLTest extends FrontendTestBase {
             + "'1111' cannot be converted to a TIMESTAMP");
   }
 
+  @Test
+  public void TestAlterExecuteRepairMetadata() {
+    AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute "
+        + "repair_metadata();");
+
+    // Negative tests
+    AnalysisError("alter table nodb.alltypes execute repair_metadata();",
+        "Could not resolve table reference: 'nodb.alltypes'");
+    AnalysisError("alter table functional.alltypes execute repair_metadata();",
+        "ALTER TABLE EXECUTE REPAIR_METADATA is only supported for Iceberg 
tables: "
+            + "functional.alltypes");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
+            + "repair_metadata('2024-02-11 10:00:00');",
+        "EXECUTE REPAIR_METADATA() should have no parameter");
+  }
+
   private static String buildLongOwnerName() {
     StringBuilder comment = new StringBuilder();
     for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) {
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index d5683198c..762fb4d50 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -2254,6 +2254,46 @@ class TestIcebergV2Table(IcebergTestSuite):
             tbl_name, second_snapshot.get_snapshot_id()))
     assert "partitions=2/unknown" in selective_time_travel_data.runtime_profile
 
+  def test_table_repair(self, unique_database):
+    tbl_name = 'tbl_with_removed_files'
+    db_tbl = unique_database + "." + tbl_name
+    repair_query = "alter table {0} execute repair_metadata()"
+    with self.create_impala_client() as impalad_client:
+      impalad_client.execute(
+          "create table {0} (i int) stored as iceberg 
tblproperties('format-version'='2')"
+          .format(db_tbl))
+      insert_q = "insert into {0} values ({1})"
+      self.execute_query_expect_success(impalad_client, 
insert_q.format(db_tbl, 1))
+      self.execute_query_expect_success(impalad_client, 
insert_q.format(db_tbl, 2))
+      self.execute_query_expect_success(impalad_client, 
insert_q.format(db_tbl, 3))
+      self.execute_query_expect_success(impalad_client, 
insert_q.format(db_tbl, 4))
+      self.execute_query_expect_success(impalad_client, 
insert_q.format(db_tbl, 5))
+      result = impalad_client.execute('select i from {0} order by 
i'.format(db_tbl))
+      assert result.data == ['1', '2', '3', '4', '5']
+
+      TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, 
tbl_name)
+      DATA_PATH = os.path.join(TABLE_PATH, "data")
+
+      # Check that table remains intact if there are no missing files
+      result = self.execute_query_expect_success(
+          impalad_client, repair_query.format(db_tbl))
+      assert result.data[0] == "No missing data files detected."
+      result = impalad_client.execute('select i from {0} order by 
i'.format(db_tbl))
+      assert result.data == ['1', '2', '3', '4', '5']
+
+      # Delete 2 data files from the file system directly to corrupt the table.
+      data_files = self.filesystem_client.ls(DATA_PATH)
+      self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[0])
+      self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[1])
+      self.execute_query_expect_success(impalad_client, "invalidate metadata")
+      result = self.execute_query_expect_success(
+          impalad_client, repair_query.format(db_tbl))
+      assert result.data[0] == \
+          "Iceberg table repaired by deleting 2 manifest entries of missing 
data files."
+      result = impalad_client.execute('select * from {0} order by 
i'.format(db_tbl))
+      assert len(result.data) == 3
+
+
 # Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. 
Note, that most
 # of the test coverage is in TestIcebergV2Table.test_read_position_deletes but 
since it
 # runs also with the V2 optimizations setting turned off, some tests were 
moved here.

Reply via email to