abmo-x commented on code in PR #7363: URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169158840
########## spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java: ########## @@ -996,4 +1027,134 @@ public String unknown( return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder); } } + + /** + * Implement our own index in-memory index which will only list directories to avoid unnecessary + * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to + * only visit partition dirs using number of partition columns depth recursively. Does NOT return + * files within leaf dir. + */ + private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex { + + private final Path rootPath; + private final FileStatusCache fileStatusCache; + private final SparkSession sparkSession; + private final StructType userSpecifiedSchema; + private LinkedHashMap<Path, FileStatus> cachedLeafFiles; + private scala.collection.immutable.Map<Path, FileStatus[]> cachedLeafDirToChildFiles; + private org.apache.spark.sql.execution.datasources.PartitionSpec cachedPartitionSpec; + + public InMemoryLeafDirOnlyIndex( + SparkSession sparkSession, + scala.collection.immutable.Map<String, String> parameters, + StructType userSpecifiedSchema, + FileStatusCache fileStatusCache, + Path rootPath) { + super(sparkSession, parameters, Option.apply(userSpecifiedSchema), fileStatusCache); + this.fileStatusCache = fileStatusCache; + this.rootPath = rootPath; + this.sparkSession = sparkSession; + this.userSpecifiedSchema = userSpecifiedSchema; + } + + @Override + public scala.collection.Seq<Path> rootPaths() { + return JavaConverters.asScalaBuffer(Arrays.asList(rootPath)).seq(); + } + + @Override + public void refresh() { + fileStatusCache.invalidateAll(); + cachedLeafFiles = null; + cachedLeafDirToChildFiles = null; + cachedPartitionSpec = null; + } + + @Override + public org.apache.spark.sql.execution.datasources.PartitionSpec partitionSpec() { + if (cachedPartitionSpec == null) { + cachedPartitionSpec = inferPartitioning(); + } + log.trace("Partition spec: {}", cachedPartitionSpec); + return cachedPartitionSpec; + } + + @Override + public LinkedHashMap<Path, FileStatus> leafFiles() { + if (cachedLeafFiles == null) { + try { + List<FileStatus> fileStatuses = + listLeafDirs(sparkSession, rootPath, userSpecifiedSchema, 0); + LinkedHashMap<Path, FileStatus> map = new LinkedHashMap<>(); + for (FileStatus fs : fileStatuses) { + map.put(fs.getPath(), fs); + } + cachedLeafFiles = map; + } catch (IOException e) { + throw new RuntimeException("error listing files for path=" + rootPath, e); + } + } + return cachedLeafFiles; + } + + static List<FileStatus> listLeafDirs( + SparkSession spark, Path path, StructType partitionSpec, int level) throws IOException { + List<FileStatus> leafDirs = new ArrayList<>(); + int numPartitionCols = partitionSpec.fields().length; + if (level < numPartitionCols) { + try (FileSystem fs = path.getFileSystem(spark.sparkContext().hadoopConfiguration())) { + List<FileStatus> dirs = + Stream.of(fs.listStatus(path)) + .filter(FileStatus::isDirectory) + .collect(Collectors.toList()); + for (FileStatus dir : dirs) { + // stop recursive call once we reach the expected end of partitions as per table schema + if (level == numPartitionCols - 1) { + leafDirs.add(dir); + } else { + leafDirs.addAll(listLeafDirs(spark, dir.getPath(), partitionSpec, level + 1)); + } + } + } + } + return leafDirs; + } + + @Override + public scala.collection.immutable.Map<Path, FileStatus[]> leafDirToChildrenFiles() { + if (cachedLeafDirToChildFiles == null) { + List<Tuple2<Path, FileStatus[]>> tuple2s = + JavaConverters.seqAsJavaList(leafFiles().values().toSeq()).stream() + .map( + fileStatus -> { + // Create an empty data file in the leaf dir. + // As this index is only used to list partition directories, + // we can stop listing the leaf dir to avoid unnecessary listing which can + // take a while on folders with 1000s of files + return new Tuple2<>( + fileStatus.getPath(), + new FileStatus[] {createEmptyChildDataFileStatus(fileStatus)}); + }) + .collect(Collectors.toList()); + cachedLeafDirToChildFiles = + (scala.collection.immutable.Map<Path, FileStatus[]>) + Map$.MODULE$.apply(JavaConverters.asScalaBuffer(tuple2s)); + } + return cachedLeafDirToChildFiles; + } + + private FileStatus createEmptyChildDataFileStatus(FileStatus fileStatus) { + return new FileStatus( + 1L, + false, + fileStatus.getReplication(), + 1L, + fileStatus.getModificationTime(), + fileStatus.getAccessTime(), + fileStatus.getPermission(), + fileStatus.getOwner(), + fileStatus.getGroup(), + new Path(fileStatus.getPath(), fileStatus.getPath().toString() + "/dummyDataFile")); + } Review Comment: yeah, IntelliJ does that automatically now so is not obvious anymore. will add it so its useful for other IDEs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org