This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 68ab52f2c770c233bc1e287b6d3c40df1cdc8775
Author: Riza Suminto <riza.sumi...@cloudera.com>
AuthorDate: Sat Sep 13 22:04:13 2025 -0700

    IMPALA-14437: Fix regression in FileMetadataLoader.createFd()
    
    IMPALA-14349 caused a regression due to change in
    FileMetadataLoader.createFd(). When default FS is S3, all files is S3
    should not have any FileBlock. However, after IMPALA-14349, CTAS query
    that scans functional.alltypes table in S3 hit following Preconditions
    in HdfsScanNode.java:
    
      if (!fsHasBlocks) {
        Preconditions.checkState(fileDesc.getNumFileBlocks() == 0);
    
    This is because FileMetadataLoader.createFd() skip checking if the
    originating FileSystem support supportsStorageIds() or not. S3
    dataloading from HDFS snapshot consistently failed due this regression.
    
    This patch fix the issue by restoring FileMetadataLoader.createFd() to
    its state before IMPALA-14349. It also make
    FileMetadataLoader.createFd() calls more consistent by not allowing null
    parameters except for 'absPath' that is only not null for Iceberg data
    files. Generalize numUnknownDiskIds parameter from Reference<Long> to
    AtomicLong for parallel usage.
    
    Testing:
    Pass dataloading, FE_TEST, EE_TEST, and CLUSTER_TEST in S3.
    
    Change-Id: Ie16c5d7b020a59b5937b52dfbf66175ac94f60cd
    Reviewed-on: http://gerrit.cloudera.org:8080/23423
    Reviewed-by: Zoltan Borok-Nagy <borokna...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../org/apache/impala/catalog/FeIcebergTable.java   |  4 ++--
 .../java/org/apache/impala/catalog/FileBlock.java   | 11 ++++-------
 .../org/apache/impala/catalog/FileDescriptor.java   | 12 +++++++-----
 .../apache/impala/catalog/FileMetadataLoader.java   | 19 ++++++++++---------
 .../impala/catalog/IcebergFileMetadataLoader.java   | 21 ++++++++++-----------
 5 files changed, 33 insertions(+), 34 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 51d68ad5b..ed31ecbbd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
@@ -65,7 +66,6 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.PrintUtils;
-import org.apache.impala.common.Reference;
 import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TColumn;
@@ -826,7 +826,7 @@ public interface FeIcebergTable extends FeFsTable {
         FileStatus fileStatus, Table iceApiTable,
         boolean requiresDataFilesInTableLocation,
         ListMap<TNetworkAddress> hostIndex) throws IOException {
-      Reference<Long> numUnknownDiskIds = new Reference<>(0L);
+      AtomicLong numUnknownDiskIds = new AtomicLong(0);
 
       String absPath = null;
       Path tableLoc = new Path(iceApiTable.location());
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileBlock.java 
b/fe/src/main/java/org/apache/impala/catalog/FileBlock.java
index acdb3c6a0..7ccd58ce8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileBlock.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileBlock.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -29,7 +30,6 @@ import com.google.common.collect.Sets;
 import com.google.flatbuffers.FlatBufferBuilder;
 
 import org.apache.hadoop.fs.BlockLocation;
-import org.apache.impala.common.Reference;
 import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.util.ListMap;
@@ -57,7 +57,7 @@ public class FileBlock {
       FlatBufferBuilder fbb,
       BlockLocation loc,
       ListMap<TNetworkAddress> hostIndex,
-      Reference<Long> numUnknownDiskIds) throws IOException {
+      AtomicLong numUnknownDiskIds) throws IOException {
     Preconditions.checkNotNull(fbb);
     Preconditions.checkNotNull(loc);
     Preconditions.checkNotNull(hostIndex);
@@ -123,8 +123,7 @@ public class FileBlock {
    */
   private static short[] createDiskIds(
       BlockLocation location,
-      Reference<Long> numUnknownDiskIds) throws IOException {
-    long unknownDiskIdCount = 0;
+      AtomicLong numUnknownDiskIds) throws IOException {
     String[] storageIds = location.getStorageIds();
     String[] hosts = location.getHosts();
     if (storageIds.length != hosts.length) {
@@ -139,13 +138,11 @@ public class FileBlock {
     for (int i = 0; i < storageIds.length; ++i) {
       if (Strings.isNullOrEmpty(storageIds[i])) {
         diskIDs[i] = (short) -1;
-        ++unknownDiskIdCount;
+        numUnknownDiskIds.incrementAndGet();
       } else {
         diskIDs[i] = DiskIdMapper.INSTANCE.getDiskId(hosts[i], storageIds[i]);
       }
     }
-    long count = numUnknownDiskIds.getRef() + unknownDiskIdCount;
-    numUnknownDiskIds.setRef(Long.valueOf(count));
     return diskIDs;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileDescriptor.java 
b/fe/src/main/java/org/apache/impala/catalog/FileDescriptor.java
index 63a8db098..16b3009e7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileDescriptor.java
@@ -29,12 +29,14 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.impala.common.Reference;
 import org.apache.impala.fb.FbCompression;
 import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.fb.FbFileDesc;
@@ -128,8 +130,8 @@ public class FileDescriptor implements 
Comparable<FileDescriptor> {
       ListMap<TNetworkAddress> hostIndex,
       boolean isEncrypted,
       boolean isEc,
-      Reference<Long> numUnknownDiskIds,
-      String absPath) throws IOException {
+      AtomicLong numUnknownDiskIds,
+      @Nullable String absPath) throws IOException {
     FlatBufferBuilder fbb = new FlatBufferBuilder(1);
     int[] fbFileBlockOffsets;
     if (blockLocations == null) {
@@ -160,7 +162,7 @@ public class FileDescriptor implements 
Comparable<FileDescriptor> {
    * resides in a filesystem that doesn't support the BlockLocation API (e.g. 
S3).
    */
   public static FileDescriptor createWithNoBlocks(
-      FileStatus fileStatus, String relPath, String absPath) {
+      FileStatus fileStatus, String relPath, @Nullable String absPath) {
     FlatBufferBuilder fbb = new FlatBufferBuilder(1);
     return new FileDescriptor(
         createFbFileDesc(fbb, fileStatus, relPath, null, false, false, 
absPath));
@@ -179,7 +181,7 @@ public class FileDescriptor implements 
Comparable<FileDescriptor> {
       int[] fbFileBlockOffsets,
       boolean isEncrypted,
       boolean isEc,
-      String absPath) {
+      @Nullable String absPath) {
     int relPathOffset = fbb.createString(relPath == null ? StringUtils.EMPTY : 
relPath);
     // A negative block vector offset is used when no block offsets are 
specified.
     int blockVectorOffset = -1;
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index 749721e6a..8969aa321 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.impala.catalog.FeFsTable.FileMetadataStats;
 import org.apache.impala.common.FileSystemUtil;
-import org.apache.impala.common.Reference;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.HudiUtil;
@@ -45,6 +44,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
@@ -206,7 +206,7 @@ public class FileMetadataLoader {
       loadedFds_ = new ArrayList<>();
       if (fileStatuses == null) return;
 
-      Reference<Long> numUnknownDiskIds = new Reference<>(0L);
+      AtomicLong numUnknownDiskIds = new AtomicLong(0);
 
       if (writeIds_ != null) {
         fileStatuses = AcidUtils.filterFilesForAcidState(fileStatuses, 
partPath,
@@ -243,7 +243,7 @@ public class FileMetadataLoader {
           }
         }
       }
-      loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
+      loadStats_.unknownDiskIds += numUnknownDiskIds.get();
       if (LOG.isTraceEnabled()) {
         LOG.trace(loadStats_.debugString());
       }
@@ -258,7 +258,7 @@ public class FileMetadataLoader {
    * Return fd created by the given fileStatus or from the 
cache(oldFdsByPath_).
    */
   protected FileDescriptor getFileDescriptor(FileSystem fs, boolean 
listWithLocations,
-      Reference<Long> numUnknownDiskIds, FileStatus fileStatus, Path partPath)
+      AtomicLong numUnknownDiskIds, FileStatus fileStatus, Path partPath)
       throws IOException {
     String relPath = FileSystemUtil.relativizePath(fileStatus.getPath(), 
partPath);
     FileDescriptor fd = oldFdsByPath_.get(relPath);
@@ -305,15 +305,16 @@ public class FileMetadataLoader {
    * Iceberg tables may not be in the table location.
    */
   protected FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
-      String relPath, Reference<Long> numUnknownDiskIds, String absPath)
+      String relPath, AtomicLong numUnknownDiskIds, @Nullable String absPath)
       throws IOException {
+    if (!FileSystemUtil.supportsStorageIds(fs)) {
+      return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
+    }
     BlockLocation[] locations;
     if (fileStatus instanceof LocatedFileStatus) {
       locations = ((LocatedFileStatus) fileStatus).getBlockLocations();
-    } else if (FileSystemUtil.supportsStorageIds(fs)) {
-      locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
     } else {
-      return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
+      locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
     }
     return FileDescriptor.create(fileStatus, relPath, locations, hostIndex_,
         fileStatus.isEncrypted(), fileStatus.isErasureCoded(), 
numUnknownDiskIds,
@@ -321,7 +322,7 @@ public class FileMetadataLoader {
   }
 
   private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
-      String relPath, Reference<Long> numUnknownDiskIds) throws IOException {
+      String relPath, AtomicLong numUnknownDiskIds) throws IOException {
     return createFd(fs, fileStatus, relPath, numUnknownDiskIds, null);
   }
 
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
index 742765ac7..b0bf86363 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
@@ -51,7 +51,6 @@ import org.apache.impala.catalog.FeIcebergTable.Utils;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.PrintUtils;
-import org.apache.impala.common.Reference;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TIcebergPartition;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -133,6 +132,7 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
     List<ContentFile<?>> filesSupportsStorageIds = Lists.newArrayList();
     FileSystem fsForTable = FileSystemUtil.getFileSystemForPath(tablePath_);
     FileSystem defaultFs = FileSystemUtil.getDefaultFileSystem();
+    AtomicLong numUnknownDiskIds = new AtomicLong();
     for (ContentFile<?> contentFile : newContentFiles) {
       FileSystem fsForPath = fsForTable;
       // If requiresDataFilesInTableLocation_ is true, we assume that the file 
system
@@ -147,11 +147,11 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
       if (FileSystemUtil.supportsStorageIds(fsForPath)) {
         filesSupportsStorageIds.add(contentFile);
       } else {
-        IcebergFileDescriptor fd = createNonLocatedFd(fsForPath, contentFile, 
tablePath_);
+        IcebergFileDescriptor fd =
+            createNonLocatedFd(fsForPath, contentFile, tablePath_, 
numUnknownDiskIds);
         registerNewlyLoadedFd(fd);
       }
     }
-    AtomicLong numUnknownDiskIds = new AtomicLong();
     List<IcebergFileDescriptor> newFds = 
parallelListing(filesSupportsStorageIds,
         numUnknownDiskIds);
     for (IcebergFileDescriptor fd : newFds) {
@@ -203,7 +203,8 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
   }
 
   private IcebergFileDescriptor createNonLocatedFd(FileSystem fs,
-      ContentFile<?> contentFile, Path partPath) throws CatalogException, 
IOException {
+      ContentFile<?> contentFile, Path partPath, AtomicLong numUnknownDiskIds)
+      throws CatalogException, IOException {
     Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
         new Path(contentFile.path().toString()));
     // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus 
ourselves.
@@ -215,12 +216,12 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
     int partitionId = addPartitionInfo(contentFile);
 
     return IcebergFileDescriptor.cloneWithFileMetadata(
-        createFd(fs, stat, relPath, null, absPath),
+        createFd(fs, stat, relPath, numUnknownDiskIds, absPath),
         IcebergUtil.createIcebergMetadata(iceTbl_, contentFile, partitionId));
   }
 
-  private IcebergFileDescriptor createLocatedFd(ContentFile<?> contentFile,
-      FileStatus stat, Path partPath, Reference<Long> numUnknownDiskIds)
+  private IcebergFileDescriptor createLocatedFd(FileSystem fs, ContentFile<?> 
contentFile,
+      FileStatus stat, Path partPath, AtomicLong numUnknownDiskIds)
       throws CatalogException, IOException {
     Preconditions.checkState(stat instanceof LocatedFileStatus);
 
@@ -230,7 +231,7 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
     int partitionId = addPartitionInfo(contentFile);
 
     return IcebergFileDescriptor.cloneWithFileMetadata(
-        createFd(null, stat, relPath, numUnknownDiskIds, absPath),
+        createFd(fs, stat, relPath, numUnknownDiskIds, absPath),
         IcebergUtil.createIcebergMetadata(iceTbl_, contentFile, partitionId));
   }
 
@@ -332,7 +333,6 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
       pathToFileStatus.put(status.getPath(), status);
     }
     List<IcebergFileDescriptor> ret = new ArrayList<>();
-    Reference<Long> localNumUnknownDiskIds = new Reference<>(0L);
     for (ContentFile<?> contentFile : contentFiles) {
       Path path = FileSystemUtil.createFullyQualifiedPath(
           new Path(contentFile.path().toString()));
@@ -343,9 +343,8 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
             contentFile.path().toString()));
         continue;
       }
-      ret.add(createLocatedFd(contentFile, stat, tablePath_, 
localNumUnknownDiskIds));
+      ret.add(createLocatedFd(fs, contentFile, stat, tablePath_, 
numUnknownDiskIds));
     }
-    numUnknownDiskIds.addAndGet(localNumUnknownDiskIds.getRef());
     return ret;
   }
 

Reply via email to