abmo-x commented on code in PR #7363: URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169168257
########## spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java: ########## @@ -996,4 +1027,134 @@ public String unknown( return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder); } } + + /** + * Implement our own index in-memory index which will only list directories to avoid unnecessary + * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to + * only visit partition dirs using number of partition columns depth recursively. Does NOT return + * files within leaf dir. + */ + private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex { + + private final Path rootPath; + private final FileStatusCache fileStatusCache; + private final SparkSession sparkSession; + private final StructType userSpecifiedSchema; + private LinkedHashMap<Path, FileStatus> cachedLeafFiles; + private scala.collection.immutable.Map<Path, FileStatus[]> cachedLeafDirToChildFiles; + private org.apache.spark.sql.execution.datasources.PartitionSpec cachedPartitionSpec; + + public InMemoryLeafDirOnlyIndex( + SparkSession sparkSession, + scala.collection.immutable.Map<String, String> parameters, + StructType userSpecifiedSchema, + FileStatusCache fileStatusCache, + Path rootPath) { + super(sparkSession, parameters, Option.apply(userSpecifiedSchema), fileStatusCache); + this.fileStatusCache = fileStatusCache; + this.rootPath = rootPath; + this.sparkSession = sparkSession; + this.userSpecifiedSchema = userSpecifiedSchema; + } + + @Override + public scala.collection.Seq<Path> rootPaths() { + return JavaConverters.asScalaBuffer(Arrays.asList(rootPath)).seq(); + } + + @Override + public void refresh() { + fileStatusCache.invalidateAll(); + cachedLeafFiles = null; + cachedLeafDirToChildFiles = null; + cachedPartitionSpec = null; + } + + @Override + public org.apache.spark.sql.execution.datasources.PartitionSpec partitionSpec() { + if (cachedPartitionSpec == null) { + cachedPartitionSpec = inferPartitioning(); + } + log.trace("Partition spec: {}", cachedPartitionSpec); + return cachedPartitionSpec; + } + + @Override + public LinkedHashMap<Path, FileStatus> leafFiles() { + if (cachedLeafFiles == null) { + try { + List<FileStatus> fileStatuses = + listLeafDirs(sparkSession, rootPath, userSpecifiedSchema, 0); + LinkedHashMap<Path, FileStatus> map = new LinkedHashMap<>(); + for (FileStatus fs : fileStatuses) { + map.put(fs.getPath(), fs); + } + cachedLeafFiles = map; + } catch (IOException e) { + throw new RuntimeException("error listing files for path=" + rootPath, e); + } + } + return cachedLeafFiles; + } + + static List<FileStatus> listLeafDirs( + SparkSession spark, Path path, StructType partitionSpec, int level) throws IOException { + List<FileStatus> leafDirs = new ArrayList<>(); Review Comment: addressed all checkstyle ########## spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java: ########## @@ -996,4 +1027,134 @@ public String unknown( return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder); } } + + /** + * Implement our own index in-memory index which will only list directories to avoid unnecessary + * file listings. Should ONLY be used to get partition directory paths. Uses table's schema to + * only visit partition dirs using number of partition columns depth recursively. Does NOT return + * files within leaf dir. + */ + private static class InMemoryLeafDirOnlyIndex extends PartitioningAwareFileIndex { + + private final Path rootPath; + private final FileStatusCache fileStatusCache; + private final SparkSession sparkSession; + private final StructType userSpecifiedSchema; + private LinkedHashMap<Path, FileStatus> cachedLeafFiles; + private scala.collection.immutable.Map<Path, FileStatus[]> cachedLeafDirToChildFiles; + private org.apache.spark.sql.execution.datasources.PartitionSpec cachedPartitionSpec; + + public InMemoryLeafDirOnlyIndex( Review Comment: addressed all checkstyle -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org