This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new c518d3c81 IMPALA-13501: Clean up uncommitted Iceberg files after 
validation check failure
c518d3c81 is described below

commit c518d3c8182749b83f938105c605c1b67755513c
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Fri Nov 22 11:43:21 2024 +0100

    IMPALA-13501: Clean up uncommitted Iceberg files after validation check 
failure
    
    Iceberg supports multiple writers with optimistic concurrency.
    Each writer can write new files which are then added to the table
    after a validation check to ensure that the commit does not conflict
    with other modifications made during the execution.
    
    When there was a conflicting change which could not be resolved, it
    means that the newly written files cannot be committed to the table,
    so they used to become orphan files on the file system. Orphan files
    can accumulate over time, taking up a lot of storage space. They do
    not belong to the table because they are not referenced by any snapshot
    and therefore they can't be removed by expiring snapshots.
    
    This change introduces automatic cleanup of uncommitted files
    after an unsuccessful DML operation to prevent creating orphan files.
    No cleanup is done if Iceberg throws CommitStateUnknownException
    because the update success or failure is unknown in this case.
    
    Testing:
    - E2E test: Injected ValidationException with debug option.
    - stress test: Added a method to check that no orphan files were
      created after failed conflicting commits.
    
    Change-Id: Ibe59546ebf3c639b75b53dfa1daba37cef50eb21
    Reviewed-on: http://gerrit.cloudera.org:8080/22189
    Reviewed-by: Daniel Becker <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 47 +++++++----
 .../impala/service/IcebergCatalogOpExecutor.java   | 93 ++++++++++++----------
 .../java/org/apache/impala/util/DebugUtils.java    |  7 ++
 tests/query_test/test_iceberg.py                   | 40 +++++++++-
 tests/stress/test_update_stress.py                 | 33 +++++++-
 5 files changed, 157 insertions(+), 63 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 32e2e33cd..33eb0d8f3 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.impala.analysis.AlterTableSortByStmt;
@@ -7570,25 +7571,37 @@ public class CatalogOpExecutor {
       throws ImpalaException {
     FeIcebergTable iceTbl = (FeIcebergTable)table;
     org.apache.iceberg.Transaction iceTxn = 
IcebergUtil.getIcebergTransaction(iceTbl);
-    IcebergCatalogOpExecutor.execute(iceTbl, iceTxn,
-        update.getIceberg_operation());
-    catalogTimeline.markEvent("Executed Iceberg operation " +
-        update.getIceberg_operation().getOperation());
-    if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) {
-      // Add catalog service id and the 'newCatalogVersion' to the table 
parameters.
-      // This way we can avoid reloading the table on self-events (Iceberg 
generates
-      // an ALTER TABLE statement to set the new metadata_location).
-      modification.registerInflightEvent();
-      IcebergCatalogOpExecutor.addCatalogVersionToTxn(
-          iceTxn, catalog_.getCatalogServiceId(), 
modification.newVersionNumber());
-      catalogTimeline.markEvent("Updated table properties");
-    }
+    try {
+      DebugUtils.executeDebugAction(
+          update.getDebug_action(), DebugUtils.ICEBERG_CONFLICT);
+      IcebergCatalogOpExecutor.execute(iceTbl, iceTxn, 
update.getIceberg_operation());
+      catalogTimeline.markEvent("Executed Iceberg operation " +
+          update.getIceberg_operation().getOperation());
+      if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) {
+        // Add catalog service id and the 'newCatalogVersion' to the table 
parameters.
+        // This way we can avoid reloading the table on self-events (Iceberg 
generates
+        // an ALTER TABLE statement to set the new metadata_location).
+        modification.registerInflightEvent();
+        IcebergCatalogOpExecutor.addCatalogVersionToTxn(
+            iceTxn, catalog_.getCatalogServiceId(), 
modification.newVersionNumber());
+        catalogTimeline.markEvent("Updated table properties");
+      }
 
-    if (update.isSetDebug_action()) {
-      String debugAction = update.getDebug_action();
-      DebugUtils.executeDebugAction(debugAction, DebugUtils.ICEBERG_COMMIT);
+      DebugUtils.executeDebugAction(update.getDebug_action(), 
DebugUtils.ICEBERG_COMMIT);
+      iceTxn.commitTransaction();
+    // If we have no information about the success of the commit, we should 
not delete
+    // anything.
+    } catch (CommitStateUnknownException u) {
+      throw new ImpalaRuntimeException(u.getMessage(), u);
+    // If the commit failed, the newly written files should be deleted to 
avoid creating
+    // orphan files in the table. Only data/delete files need cleanup from 
Impala, Iceberg
+    // deletes the metadata files created for this update.
+    } catch (Exception e) {
+      
IcebergCatalogOpExecutor.cleanupUncommittedFiles(update.getIceberg_operation());
+      LOG.info("Cleaned up uncommitted data files after failing to commit them 
to " +
+          "table {}", table.getFullName());
+      throw new ImpalaRuntimeException(e.getMessage(), e);
     }
-    iceTxn.commitTransaction();
     modification.markInflightEventRegistrationComplete();
   }
 
diff --git 
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 4435ad9e1..1d1a50e8d 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -18,7 +18,6 @@
 package org.apache.impala.service;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,7 +44,6 @@ import org.apache.iceberg.UpdatePartitionSpec;
 import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.Term;
 import org.apache.iceberg.hive.HiveCatalog;
@@ -77,6 +75,8 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.impala.common.FileSystemUtil.deleteIfExists;
+
 /**
  * This is a helper for the CatalogOpExecutor to provide Iceberg related DDL 
functionality
  * such as creating and dropping tables from Iceberg.
@@ -381,19 +381,15 @@ public class IcebergCatalogOpExecutor {
       DeleteFile deleteFile = createDeleteFile(feIcebergTable, buf);
       rowDelta.addDeletes(deleteFile);
     }
-    try {
-      // Validate that there are no conflicting data files, because if data 
files are
-      // added in the meantime, they potentially contain records that should 
have been
-      // affected by this DELETE operation.
-      rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
-      rowDelta.validateNoConflictingDataFiles();
-      rowDelta.validateDataFilesExist(
-          icebergOp.getData_files_referenced_by_position_deletes());
-      rowDelta.validateDeletedFiles();
-      rowDelta.commit();
-    } catch (ValidationException e) {
-      throw new ImpalaRuntimeException(e.getMessage(), e);
-    }
+    // Validate that there are no conflicting data files, because if data 
files are
+    // added in the meantime, they potentially contain records that should 
have been
+    // affected by this DELETE operation.
+    rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
+    rowDelta.validateNoConflictingDataFiles();
+    rowDelta.validateDataFilesExist(
+        icebergOp.getData_files_referenced_by_position_deletes());
+    rowDelta.validateDeletedFiles();
+    rowDelta.commit();
   }
 
   private static void updateRows(FeIcebergTable feIcebergTable, Transaction 
txn,
@@ -409,22 +405,18 @@ public class IcebergCatalogOpExecutor {
       DataFile dataFile = createDataFile(feIcebergTable, buf);
       rowDelta.addRows(dataFile);
     }
-    try {
-      // Validate that there are no conflicting data files, because if data 
files are
-      // added in the meantime, they potentially contain records that should 
have been
-      // affected by this UPDATE operation. Also validate that there are no 
conflicting
-      // delete files, because we don't want to revive records that have been 
deleted
-      // in the meantime.
-      rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
-      rowDelta.validateNoConflictingDataFiles();
-      rowDelta.validateNoConflictingDeleteFiles();
-      rowDelta.validateDataFilesExist(
-          icebergOp.getData_files_referenced_by_position_deletes());
-      rowDelta.validateDeletedFiles();
-      rowDelta.commit();
-    } catch (ValidationException e) {
-      throw new ImpalaRuntimeException(e.getMessage(), e);
-    }
+    // Validate that there are no conflicting data files, because if data 
files are
+    // added in the meantime, they potentially contain records that should 
have been
+    // affected by this UPDATE operation. Also validate that there are no 
conflicting
+    // delete files, because we don't want to revive records that have been 
deleted
+    // in the meantime.
+    rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
+    rowDelta.validateNoConflictingDataFiles();
+    rowDelta.validateNoConflictingDeleteFiles();
+    rowDelta.validateDataFilesExist(
+        icebergOp.getData_files_referenced_by_position_deletes());
+    rowDelta.validateDeletedFiles();
+    rowDelta.commit();
   }
 
   private static DataFile createDataFile(FeIcebergTable feIcebergTable, 
ByteBuffer buf)
@@ -488,11 +480,7 @@ public class IcebergCatalogOpExecutor {
       DataFile dataFile = createDataFile(feIcebergTable, buf);
       batchWrite.addFile(dataFile);
     }
-    try {
-      batchWrite.commit();
-    } catch (ValidationException e) {
-      throw new ImpalaRuntimeException(e.getMessage(), e);
-    }
+    batchWrite.commit();
   }
 
   private static Metrics buildDataFileMetrics(FbIcebergDataFile dataFile) {
@@ -564,12 +552,8 @@ public class IcebergCatalogOpExecutor {
       DataFile dataFile = createDataFile(feIcebergTable, buf);
       rewrite.addFile(dataFile);
     }
-    try {
-      rewrite.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
-      rewrite.commit();
-    } catch (ValidationException e) {
-      throw new ImpalaRuntimeException(e.getMessage(), e);
-    }
+    rewrite.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
+    rewrite.commit();
   }
 
   /**
@@ -594,4 +578,29 @@ public class IcebergCatalogOpExecutor {
                     String.valueOf(version));
     updateProps.commit();
   }
+
+  /**
+   * When a write operation fails the validation check due to conflicts, the 
data/delete
+   * files created by it cannot be committed to the table: they would become 
orphan files.
+   * This method deletes these uncommitted data/delete files from the file 
system.
+   * @param icebergOp the failed DML operation that contains the list of newly 
written
+   *                  data and delete files which have to be cleaned up.
+   */
+  protected static void cleanupUncommittedFiles(TIcebergOperationParam 
icebergOp) {
+    if (icebergOp.isSetIceberg_data_files_fb()) {
+      deleteIcebergFiles(icebergOp.getIceberg_data_files_fb());
+    }
+    if (icebergOp.isSetIceberg_delete_files_fb()) {
+      deleteIcebergFiles(icebergOp.getIceberg_delete_files_fb());
+    }
+  }
+
+  private static void deleteIcebergFiles(Iterable<ByteBuffer> icebergFiles) {
+    for (ByteBuffer buf : icebergFiles) {
+      String pathString = 
FbIcebergDataFile.getRootAsFbIcebergDataFile(buf).path();
+      if (pathString != null) {
+        deleteIfExists(new org.apache.hadoop.fs.Path((pathString)));
+      }
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 781acd766..9f7750d8f 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +60,9 @@ public class DebugUtils {
   // debug action label for Iceberg transaction commit.
   public static final String ICEBERG_COMMIT = "catalogd_iceberg_commit";
 
+  // debug action label for Iceberg validation check failure.
+  public static final String ICEBERG_CONFLICT = "catalogd_iceberg_conflict";
+
   // debug action label for Iceberg create table.
   public static final String ICEBERG_CREATE = "catalogd_iceberg_create";
 
@@ -196,6 +200,9 @@ public class DebugUtils {
             case "commitfailedexception":
               exceptionToThrow = new CommitFailedException(param);
               break;
+            case "validationexception":
+              exceptionToThrow = new ValidationException(param);
+              break;
             case "icebergalreadyexistsexception":
               exceptionToThrow = new org.apache.iceberg.exceptions.
                   AlreadyExistsException("Table already exists");
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 91e08723f..ab51bab13 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -44,7 +44,7 @@ from tests.common.file_utils import (
   create_iceberg_table_from_directory,
   create_table_from_parquet)
 from tests.shell.util import run_impala_shell_cmd
-from tests.util.filesystem_utils import get_fs_path, IS_HDFS, WAREHOUSE
+from tests.util.filesystem_utils import get_fs_path, IS_HDFS, WAREHOUSE, 
FILESYSTEM_PREFIX
 from tests.util.get_parquet_metadata import get_parquet_metadata
 from tests.util.iceberg_util import cast_ts, quote, get_snapshots, 
IcebergCatalogs
 
@@ -2001,6 +2001,44 @@ class TestIcebergV2Table(IcebergTestSuite):
   def test_merge_star(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database)
 
+  def test_cleanup(self, unique_database):
+      """Test that all uncommitted files written by Impala are removed from 
the file
+      system when a DML commit to an Iceberg table fails, and that the effects 
of the
+      failed operation are not visible."""
+      table_name = "iceberg_cleanup_failure"
+      fq_tbl_name = unique_database + "." + table_name
+      # The query options that inject an iceberg validation check failure.
+      fail_ice_commit_options = {'debug_action':
+                        'CATALOGD_ICEBERG_CONFLICT:EXCEPTION@'
+                        'ValidationException@'
+                        'simulated validation check failure'}
+      # Create an iceberg table and insert a row.
+      self.execute_query_expect_success(self.client, """CREATE TABLE {0} (i 
int)
+          STORED BY ICEBERG TBLPROPERTIES 
('format-version'='2')""".format(fq_tbl_name))
+      self.execute_query_expect_success(self.client,
+          "insert into {0} values (1)".format(fq_tbl_name))
+
+      # Run a query that would update a row, but pass the query options that
+      # will cause the iceberg validation check to fail.
+      err = self.execute_query_expect_failure(self.client,
+          "update {0} set i=2 where i=1".format(fq_tbl_name),
+          query_options=fail_ice_commit_options)
+      # Check that we get the error message.
+      assert error_msg_expected(
+          str(err), "ValidationException: simulated validation check failure")
+      # Check that the table content was not updated.
+      data = self.execute_query_expect_success(self.client,
+          "select * from {0}".format(fq_tbl_name))
+      assert len(data.data) == 1
+      assert data.data[0] == '1'
+
+      # Check that the uncommitted data and delete files are removed from the 
file system
+      # and only the first data file remains.
+      table_location = "{0}/test-warehouse/{1}.db/{2}/data".format(
+          FILESYSTEM_PREFIX, unique_database, table_name)
+      files_result = check_output(["hdfs", "dfs", "-ls", table_location])
+      assert "Found 1 items" in files_result
+
 
 # Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. 
Note, that most
 # of the test coverage is in TestIcebergV2Table.test_read_position_deletes but 
since it
diff --git a/tests/stress/test_update_stress.py 
b/tests/stress/test_update_stress.py
index 1148f7ce4..bb966c864 100644
--- a/tests/stress/test_update_stress.py
+++ b/tests/stress/test_update_stress.py
@@ -21,12 +21,13 @@ import pytest
 import random
 import time
 from multiprocessing import Value
+from subprocess import check_output
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.parametrize import UniqueDatabase
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.stress.stress_util import run_tasks, Task
-from tests.util.filesystem_utils import IS_HDFS
+from tests.util.filesystem_utils import FILESYSTEM_PREFIX, IS_HDFS
 
 
 # Longer-running UPDATE tests are executed here
@@ -238,7 +239,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
     """Issues DELETE and UPDATE statements in parallel in a way that some
     invariants must be true when a spectator process inspects the table."""
 
-    tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database
+    tbl_suffix = "test_concurrent_deletes_and_updates"
+    tbl_name = unique_database + "." + tbl_suffix
     self.client.set_configuration_option("SYNC_DDL", "true")
     self.client.execute("""create table {0} (id int, j bigint)
         stored as iceberg
@@ -259,6 +261,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
     result = self.client.execute("select count(*) from {}".format(tbl_name))
     assert result.data == ['0']
 
+    self.check_no_orphan_files(unique_database, tbl_suffix)
+
   @pytest.mark.execute_serially
   @UniqueDatabase.parametrize(sync_ddl=True)
   def test_iceberg_deletes_and_updates_and_optimize(self, unique_database):
@@ -266,7 +270,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
     invariants must be true when a spectator process inspects the table.
     An optimizer thread also invokes OPTMIZE regularly on the table."""
 
-    tbl_name = "%s.test_concurrent_write_and_optimize" % unique_database
+    tbl_suffix = "test_concurrent_write_and_optimize"
+    tbl_name = unique_database + "." + tbl_suffix
     self.client.set_configuration_option("SYNC_DDL", "true")
     self.client.execute("""create table {0} (id int, j bigint)
         stored as iceberg
@@ -293,3 +298,25 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
 
     result = self.client.execute("select count(*) from {}".format(tbl_name))
     assert result.data == ['0']
+
+    self.check_no_orphan_files(unique_database, tbl_suffix)
+
+  def check_no_orphan_files(self, unique_database, table_name):
+    # Check that the uncommitted data and delete files are removed from the 
file system
+    # and only those files remain that are reachable through valid snapshots.
+    data_files_in_tbl_result = self.client.execute(
+        "select file_path from {}.{}.all_files;".format(unique_database, 
table_name))
+    data_files_in_tbl = [row.split('/test-warehouse')[1]
+        for row in data_files_in_tbl_result.data]
+
+    table_location = "{0}/test-warehouse/{1}.db/{2}/data".format(
+        FILESYSTEM_PREFIX, unique_database, table_name)
+    data_files_on_fs_result = check_output(["hdfs", "dfs", "-ls", 
table_location])
+    # The first row of the HDFS result is a summary, the following lines 
contain
+    # 1 file each.
+    data_files_on_fs_rows = data_files_on_fs_result.strip().split('\n')[1:]
+    data_files_on_fs = [row.split()[-1].split('/test-warehouse')[1]
+        for row in data_files_on_fs_rows]
+
+    assert len(data_files_in_tbl) == len(data_files_on_fs_rows)
+    assert set(data_files_on_fs) == set(data_files_in_tbl)

Reply via email to