This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d928815d1a3bbab25eea7bfc40baa8913a9465f2
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Jan 21 18:44:35 2025 +0100

    IMPALA-13654: Tolerate missing data files of Iceberg tables
    
    Before this patch we got a TableLoadingException for missing data files.
    This means the IcebergTable will be in an incomplete state in Impala's
    memory, therefore we won't be able to do any operation on it.
    
    We should continue table loading in such cases, and only throw exception
    for queries that are about to read the missing data files.
    
    This way ROLLBACK / DROP PARTITION, and some SELECT statements should
    still work.
    
    If Impala is running in strict mode via CatalogD flag
    --iceberg_allow_datafiles_in_table_location_only, and an Iceberg table
    has data files outside of table location, we still raise an exception
    and leave the table in an unloaded state. To retain this behavior, the
    IOException we threw is substituted to TableLoadingException which fits
    better to logic errors anyway.
    
    Testing
     * added e2e tests
    
    Change-Id: If753619d8ee1b30f018e90157ff7bdbe5d7f1525
    Reviewed-on: http://gerrit.cloudera.org:8080/22367
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 common/thrift/CatalogObjects.thrift                |   1 +
 .../impala/catalog/IcebergContentFileStore.java    |  49 ++++---
 .../impala/catalog/IcebergFileMetadataLoader.java  |  39 ++++--
 .../apache/impala/planner/IcebergScanPlanner.java  |  12 +-
 .../QueryTest/iceberg-missing-data-files.test      | 145 +++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |  52 ++++++++
 6 files changed, 269 insertions(+), 29 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift 
b/common/thrift/CatalogObjects.thrift
index 851ed3f1e..6e186d6b9 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -642,6 +642,7 @@ struct TIcebergContentFileStore {
   5: optional bool has_avro
   6: optional bool has_orc
   7: optional bool has_parquet
+  8: optional list<string> missing_files
 }
 
 // Represents a drop partition request for Iceberg tables
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
index 2a3caa5cf..10345f57a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
@@ -22,9 +22,12 @@ import com.google.common.collect.Lists;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -153,6 +156,7 @@ public class IcebergContentFileStore {
   private MapListContainer dataFilesWithDeletes_ = new MapListContainer();
   private MapListContainer positionDeleteFiles_ = new MapListContainer();
   private MapListContainer equalityDeleteFiles_ = new MapListContainer();
+  private Set<String> missingFiles_ = new HashSet<>();
 
   // Caches file descriptors loaded during time-travel queries.
   private final ConcurrentMap<String, EncodedFileDescriptor> oldFileDescMap_ =
@@ -173,31 +177,33 @@ public class IcebergContentFileStore {
     Preconditions.checkNotNull(icebergFiles);
 
     Map<String, IcebergFileDescriptor> fileDescMap = new HashMap<>();
-    for (FileDescriptor fileDesc : fileDescriptors) {
-      Preconditions.checkState(fileDesc instanceof IcebergFileDescriptor);
+    for (IcebergFileDescriptor fileDesc : fileDescriptors) {
       Path path = new Path(fileDesc.getAbsolutePath(iceApiTable.location()));
-      fileDescMap.put(path.toUri().getPath(), (IcebergFileDescriptor)fileDesc);
+      fileDescMap.put(path.toUri().getPath(), fileDesc);
     }
 
     for (DataFile dataFile : icebergFiles.dataFilesWithoutDeletes) {
-      Pair<String, EncodedFileDescriptor> pathHashAndFd =
-          getPathHashAndFd(dataFile, fileDescMap);
-      dataFilesWithoutDeletes_.add(pathHashAndFd.first, pathHashAndFd.second);
+      storeFile(dataFile, fileDescMap, dataFilesWithoutDeletes_);
     }
     for (DataFile dataFile : icebergFiles.dataFilesWithDeletes) {
-      Pair<String, EncodedFileDescriptor> pathHashAndFd =
-          getPathHashAndFd(dataFile, fileDescMap);
-      dataFilesWithDeletes_.add(pathHashAndFd.first, pathHashAndFd.second);
+      storeFile(dataFile, fileDescMap, dataFilesWithDeletes_);
     }
     for (DeleteFile deleteFile : icebergFiles.positionDeleteFiles) {
-      Pair<String, EncodedFileDescriptor> pathHashAndFd =
-          getPathHashAndFd(deleteFile, fileDescMap);
-      positionDeleteFiles_.add(pathHashAndFd.first, pathHashAndFd.second);
+      storeFile(deleteFile, fileDescMap, positionDeleteFiles_);
     }
     for (DeleteFile deleteFile : icebergFiles.equalityDeleteFiles) {
-      Pair<String, EncodedFileDescriptor> pathHashAndFd =
-          getPathHashAndFd(deleteFile, fileDescMap);
-      equalityDeleteFiles_.add(pathHashAndFd.first, pathHashAndFd.second);
+      storeFile(deleteFile, fileDescMap, equalityDeleteFiles_);
+    }
+  }
+
+  private void storeFile(ContentFile<?> contentFile,
+      Map<String, IcebergFileDescriptor> fileDescMap, MapListContainer 
container) {
+    Pair<String, EncodedFileDescriptor> pathHashAndFd =
+        getPathHashAndFd(contentFile, fileDescMap);
+    if (pathHashAndFd.second != null) {
+      container.add(pathHashAndFd.first, pathHashAndFd.second);
+    } else {
+      missingFiles_.add(contentFile.path().toString());
     }
   }
 
@@ -240,6 +246,14 @@ public class IcebergContentFileStore {
     return equalityDeleteFiles_.getList();
   }
 
+  public boolean hasMissingFile() {
+    return !missingFiles_.isEmpty();
+  }
+
+  public Set<String> getMissingFiles() {
+    return missingFiles_;
+  }
+
   public long getNumFiles() {
     return dataFilesWithoutDeletes_.getNumFiles() +
            dataFilesWithDeletes_.getNumFiles() +
@@ -291,6 +305,8 @@ public class IcebergContentFileStore {
     Path path = new Path(contentFile.path().toString());
     IcebergFileDescriptor fileDesc = fileDescMap.get(path.toUri().getPath());
 
+    if (fileDesc == null) return null;
+
     FbFileMetadata fileMetadata = fileDesc.getFbFileMetadata();
     Preconditions.checkState(fileMetadata != null);
     FbIcebergMetadata icebergMetadata = fileMetadata.icebergMetadata();
@@ -312,6 +328,7 @@ public class IcebergContentFileStore {
     ret.setHas_avro(hasAvro_);
     ret.setHas_orc(hasOrc_);
     ret.setHas_parquet(hasParquet_);
+    ret.setMissing_files(new ArrayList<>(missingFiles_));
     return ret;
   }
 
@@ -342,6 +359,8 @@ public class IcebergContentFileStore {
     ret.hasAvro_ = tFileStore.isSetHas_avro() ? tFileStore.isHas_avro() : 
false;
     ret.hasOrc_ = tFileStore.isSetHas_orc() ? tFileStore.isHas_orc() : false;
     ret.hasParquet_ = tFileStore.isSetHas_parquet() ? 
tFileStore.isHas_parquet() : false;
+    ret.missingFiles_ = tFileStore.isSetMissing_files() ?
+        new HashSet<>(tFileStore.getMissing_files()) : Collections.emptySet();
     return ret;
   }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
index c77766723..fbfb95fa9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
@@ -27,6 +27,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -172,18 +174,31 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
       Path path = FileSystemUtil.createFullyQualifiedPath(
           new Path(contentFileInfo.getSecond().path().toString()));
       FileStatus stat = nameToFileStatus.get(path);
-      IcebergFileDescriptor fd = createFd(contentFileInfo.getFirst(),
-          contentFileInfo.getSecond(), stat, partPath, numUnknownDiskIds);
-      loadedFds_.add(fd);
-      fileMetadataStats_.accumulate(fd);
+      loadFdFromStorage(contentFileInfo, stat, partPath, numUnknownDiskIds);
     }
-    loadStats_.loadedFiles += filesSupportsStorageIds.size();
     loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
     if (LOG.isTraceEnabled()) {
       LOG.trace(loadStats_.debugString());
     }
   }
 
+  private void loadFdFromStorage(Pair<FileSystem, ContentFile<?>> 
contentFileInfo,
+      FileStatus stat, Path partPath, Reference<Long> numUnknownDiskIds)
+      throws CatalogException {
+    try {
+      IcebergFileDescriptor fd = createFd(contentFileInfo.getFirst(),
+          contentFileInfo.getSecond(), stat, partPath, numUnknownDiskIds);
+      loadedFds_.add(fd);
+      ++loadStats_.loadedFiles;
+      fileMetadataStats_.accumulate(fd);
+    } catch (IOException e) {
+      StringWriter w = new StringWriter();
+      e.printStackTrace(new PrintWriter(w));
+      LOG.warn(String.format("Failed to load Iceberg content file: '%s' Caused 
by: %s",
+          contentFileInfo.getSecond().path().toString(), w));
+    }
+  }
+
   @VisibleForTesting
   boolean useParallelListing() {
     return useParallelListing_;
@@ -194,7 +209,7 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
    *  which means we can safely reuse the old FDs.
    */
   private Iterable<ContentFile<?>> loadContentFilesWithOldFds(Path partPath)
-      throws IOException {
+      throws TableLoadingException {
     if (forceRefreshLocations || oldFdsByPath_.isEmpty()) {
       return icebergFiles_.getAllContentFiles();
     }
@@ -214,7 +229,7 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
 
   private IcebergFileDescriptor createFd(FileSystem fs, ContentFile<?> 
contentFile,
       FileStatus stat, Path partPath, Reference<Long> numUnknownDiskIds)
-      throws IOException {
+      throws CatalogException, IOException {
     if (stat == null) {
       Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
           new Path(contentFile.path().toString()));
@@ -230,8 +245,8 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
     String relPath = FileSystemUtil.relativizePathNoThrow(stat.getPath(), 
partPath);
     if (relPath == null) {
       if (requiresDataFilesInTableLocation_) {
-        throw new IOException(String.format("Failed to load Iceberg datafile 
%s, because "
-            + "it's outside of the table location", stat.getPath().toUri()));
+        throw new TableLoadingException(String.format("Failed to load Iceberg 
datafile " +
+            "%s, because it's outside of the table location", 
stat.getPath().toUri()));
       } else {
         absPath = stat.getPath().toString();
       }
@@ -303,14 +318,14 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
   }
 
   IcebergFileDescriptor getOldFd(ContentFile<?> contentFile, Path partPath)
-      throws IOException {
+      throws TableLoadingException {
     Path contentFilePath = FileSystemUtil.createFullyQualifiedPath(
         new Path(contentFile.path().toString()));
     String lookupPath = FileSystemUtil.relativizePathNoThrow(contentFilePath, 
partPath);
     if (lookupPath == null) {
       if (requiresDataFilesInTableLocation_) {
-        throw new IOException(String.format("Failed to load Iceberg datafile 
%s, because "
-            + "it's outside of the table location", contentFilePath));
+        throw new TableLoadingException(String.format("Failed to load Iceberg 
datafile " +
+            "%s, because it's outside of the table location", 
contentFilePath));
       } else {
         lookupPath = contentFilePath.toString();
       }
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index 1a20f487f..83cb0b555 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -177,6 +177,11 @@ public class IcebergScanPlanner {
 
   private void setFileDescriptorsBasedOnFileStore() throws ImpalaException {
     IcebergContentFileStore fileStore = getIceTable().getContentFileStore();
+    if (fileStore.hasMissingFile()) {
+      throw new ImpalaRuntimeException(String.format("Iceberg table '%s' 
cannot be" +
+          " fully loaded due to unavailable files: %s Check ImpalaD/CatalogD 
logs for" +
+          " details.", getIceTable().getFullName(), 
fileStore.getMissingFiles()));
+    }
     if (tblRef_.getSelectedDataFilesForOptimize() != null) {
       dataFilesWithoutDeletes_ = tblRef_.getSelectedDataFilesForOptimize();
     } else {
@@ -729,8 +734,11 @@ public class IcebergScanPlanner {
 
     if (tblRef_.getTimeTravelSpec() == null) {
       // We should always find the data files in the cache when not doing time 
travel.
-      throw new ImpalaRuntimeException("Cannot find file in cache: " + 
cf.path()
-          + " with snapshot id: " + getIceTable().snapshotId());
+      throw new ImpalaRuntimeException(String.format("Cannot find file: %s in" 
+
+          " Iceberg table %s (snapshot id: %d) It's possibly missing from 
storage." +
+          " Check ImpalaD/CatalogD logs for details. List of all missing 
files: %s",
+          cf.path(), getIceTable().getFullName(), getIceTable().snapshotId(),
+          fileStore.getMissingFiles()));
     }
     // We can still find the file descriptor among the old file descriptors.
     iceFileDesc = fileStore.getOldFileDescriptor(pathHash);
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-missing-data-files.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-missing-data-files.test
new file mode 100644
index 000000000..c518f6611
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-missing-data-files.test
@@ -0,0 +1,145 @@
+====
+---- QUERY
+select * from missing_files_nopart
+---- CATCH
+unavailable files
+====
+---- QUERY
+select * from missing_files_part
+---- CATCH
+unavailable files
+====
+---- QUERY
+select * from missing_files_part
+where p=2
+---- CATCH
+Cannot find file
+====
+---- QUERY
+# Time travel queries should work
+select * from missing_files_nopart
+for system_version as of $NOPART_FIRST_SNAPSHOT
+---- RESULTS
+1,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+select * from missing_files_part
+for system_version as of $PART_FIRST_SNAPSHOT
+---- RESULTS
+1,1
+---- TYPES
+INT,INT
+====
+====
+---- QUERY
+# We can still query partitions without missing files
+select * from missing_files_part
+where p=1
+---- RESULTS
+1,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+describe missing_files_part
+---- RESULTS
+'i','int','','true'
+'p','int','','true'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+describe formatted missing_files_part
+---- RESULTS: VERIFY_IS_SUBSET
+'# col_name            ','data_type           ','comment             '
+'','NULL','NULL'
+'i','int','NULL'
+'p','int','NULL'
+'','NULL','NULL'
+'# Partition Transform Information','NULL','NULL'
+'# col_name            ','transform_type      ','NULL'
+'','NULL','NULL'
+'p','IDENTITY','NULL'
+'','NULL','NULL'
+'# Detailed Table Information','NULL','NULL'
+'OwnerType:          ','USER                ','NULL'
+'Erasure Coding Policy:','NONE                ','NULL'
+'Table Type:         ','EXTERNAL_TABLE      ','NULL'
+'Table Parameters:','NULL','NULL'
+'','EXTERNAL            ','TRUE                '
+'','OBJCAPABILITIES     ','EXTREAD,EXTWRITE    '
+'','default-partition-spec','{\\"spec-id\\":0,\\"fields\\":[{\\"name\\":\\"p\\",\\"transform\\":\\"identity\\",\\"source-id\\":2,\\"field-id\\":1000}]}'
+'','engine.hive.enabled ','true                '
+'','external.table.purge','TRUE                '
+'','numFiles            ','2                   '
+'','numRows             ','2                   '
+'','snapshot-count      ','2                   '
+'','storage_handler     
','org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
+'','table_type          ','ICEBERG             '
+'','write.delete.mode   ','merge-on-read       '
+'','write.format.default','parquet             '
+'','write.merge.mode    ','merge-on-read       '
+'','write.update.mode   ','merge-on-read       '
+'','NULL','NULL'
+'# Storage Information','NULL','NULL'
+'SerDe Library:      ','org.apache.iceberg.mr.hive.HiveIcebergSerDe','NULL'
+'InputFormat:        
','org.apache.iceberg.mr.hive.HiveIcebergInputFormat','NULL'
+'OutputFormat:       
','org.apache.iceberg.mr.hive.HiveIcebergOutputFormat','NULL'
+'Compressed:         ','No                  ','NULL'
+'Sort Columns:       ','[]                  ','NULL'
+'','NULL','NULL'
+'# Constraints','NULL','NULL'
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Metadata queries work
+SELECT snapshot_id, operation from $DATABASE.missing_files_part.snapshots;
+---- RESULTS
+regex:\d+,'append'
+regex:\d+,'append'
+---- TYPES
+BIGINT,STRING
+====
+---- QUERY
+ALTER TABLE missing_files_part DROP PARTITION (p=2);
+SELECT * FROM missing_files_part;
+---- RESULTS
+1,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+ALTER TABLE missing_files_nopart EXECUTE ROLLBACK($NOPART_FIRST_SNAPSHOT);
+SELECT * FROM missing_files_nopart;
+---- RESULTS
+1,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+ALTER TABLE missing_files_part EXECUTE ROLLBACK($PART_FIRST_SNAPSHOT);
+SELECT * FROM missing_files_part;
+---- RESULTS
+1,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+INSERT INTO missing_files_nopart VALUES (3,3);
+---- DML_RESULTS: missing_files_nopart
+1,1
+3,3
+---- TYPES
+INT,INT
+====
+---- QUERY
+INSERT INTO missing_files_part VALUES (3,3);
+---- DML_RESULTS: missing_files_part
+1,1
+3,3
+---- TYPES
+INT,INT
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index ac6c11669..de7f772ff 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1606,6 +1606,58 @@ class TestIcebergV2Table(IcebergTestSuite):
           test_file_vars={'$OVERWRITE_SNAPSHOT_ID': 
str(overwrite_snapshot_id.data[0]),
                           '$OVERWRITE_SNAPSHOT_TS': 
str(overwrite_snapshot_ts.data[0])})
 
+  @SkipIf.not_hdfs
+  def test_missing_data_files(self, vector, unique_database):
+    def list_files(tbl):
+      query_result = self.execute_query("select file_path from 
{}.`files`".format(tbl))
+      return query_result.data
+
+    def first_snapshot(tbl):
+      query_result = self.execute_query(
+          "select snapshot_id from {}.`snapshots` order by 
committed_at".format(tbl))
+      return query_result.data[0]
+
+    def insert_values(tbl, values):
+      self.execute_query("insert into {} values {}".format(tbl, values))
+
+    missing_files_nopart = unique_database + ".missing_files_nopart"
+    missing_files_part = unique_database + ".missing_files_part"
+    self.execute_query("""CREATE TABLE {} (i int, p int)
+        STORED BY ICEBERG
+        TBLPROPERTIES('format-version'='2')""".format(missing_files_nopart))
+    insert_values(missing_files_nopart, "(1, 1)")
+    first_file = set(list_files(missing_files_nopart))
+    insert_values(missing_files_nopart, "(2, 2)")
+
+    all_files = set(list_files(missing_files_nopart))
+    assert len(all_files) == 2
+    second_file = next(iter(all_files - first_file))
+    check_output(["hdfs", "dfs", "-rm", second_file])
+
+    self.execute_query("""CREATE TABLE {} (i int, p int)
+        PARTITIONED BY SPEC (p)
+        STORED BY ICEBERG
+        TBLPROPERTIES('format-version'='2')""".format(missing_files_part))
+    insert_values(missing_files_part, "(1, 1)")
+    insert_values(missing_files_part, "(2, 2)")
+    files = list_files(missing_files_part)
+    part_2_f = None
+    for f in files:
+      if "p=2" in f:
+        part_2_f = f
+        break
+    assert part_2_f is not None
+    check_output(["hdfs", "dfs", "-rm", part_2_f])
+
+    self.execute_query("invalidate metadata {}".format(missing_files_nopart))
+    self.execute_query("invalidate metadata {}".format(missing_files_part))
+
+    self.run_test_case('QueryTest/iceberg-missing-data-files', vector,
+        unique_database,
+        test_file_vars={
+            '$NOPART_FIRST_SNAPSHOT': first_snapshot(missing_files_nopart),
+            '$PART_FIRST_SNAPSHOT': first_snapshot(missing_files_part)})
+
   def test_delete(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-delete', vector,
         unique_database)

Reply via email to