dramaticlly commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169153191


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
       return String.format("%s(%s) %s %s", transform, sourceName, direction, 
nullOrder);
     }
   }
+
+  /**
+   * Implement our own index in-memory index which will only list directories 
to avoid unnecessary
+   * file listings. Should ONLY be used to get partition directory paths. Uses 
table's schema to
+   * only visit partition dirs using number of partition columns depth 
recursively. Does NOT return
+   * files within leaf dir.
+   */
+  private static class InMemoryLeafDirOnlyIndex extends 
PartitioningAwareFileIndex {
+
+    private final Path rootPath;
+    private final FileStatusCache fileStatusCache;
+    private final SparkSession sparkSession;
+    private final StructType userSpecifiedSchema;
+    private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+    private scala.collection.immutable.Map<Path, FileStatus[]> 
cachedLeafDirToChildFiles;
+    private org.apache.spark.sql.execution.datasources.PartitionSpec 
cachedPartitionSpec;
+
+    public InMemoryLeafDirOnlyIndex(
+        SparkSession sparkSession,
+        scala.collection.immutable.Map<String, String> parameters,
+        StructType userSpecifiedSchema,
+        FileStatusCache fileStatusCache,
+        Path rootPath) {
+      super(sparkSession, parameters, Option.apply(userSpecifiedSchema), 
fileStatusCache);
+      this.fileStatusCache = fileStatusCache;
+      this.rootPath = rootPath;
+      this.sparkSession = sparkSession;
+      this.userSpecifiedSchema = userSpecifiedSchema;
+    }
+
+    @Override
+    public scala.collection.Seq<Path> rootPaths() {
+      return JavaConverters.asScalaBuffer(Arrays.asList(rootPath)).seq();
+    }
+
+    @Override
+    public void refresh() {
+      fileStatusCache.invalidateAll();
+      cachedLeafFiles = null;
+      cachedLeafDirToChildFiles = null;
+      cachedPartitionSpec = null;
+    }
+
+    @Override
+    public org.apache.spark.sql.execution.datasources.PartitionSpec 
partitionSpec() {
+      if (cachedPartitionSpec == null) {
+        cachedPartitionSpec = inferPartitioning();
+      }
+      log.trace("Partition spec: {}", cachedPartitionSpec);
+      return cachedPartitionSpec;
+    }
+
+    @Override
+    public LinkedHashMap<Path, FileStatus> leafFiles() {
+      if (cachedLeafFiles == null) {
+        try {
+          List<FileStatus> fileStatuses =
+              listLeafDirs(sparkSession, rootPath, userSpecifiedSchema, 0);
+          LinkedHashMap<Path, FileStatus> map = new LinkedHashMap<>();
+          for (FileStatus fs : fileStatuses) {
+            map.put(fs.getPath(), fs);
+          }
+          cachedLeafFiles = map;
+        } catch (IOException e) {
+          throw new RuntimeException("error listing files for path=" + 
rootPath, e);
+        }
+      }
+      return cachedLeafFiles;
+    }
+
+    static List<FileStatus> listLeafDirs(
+        SparkSession spark, Path path, StructType partitionSpec, int level) 
throws IOException {
+      List<FileStatus> leafDirs = new ArrayList<>();
+      int numPartitionCols = partitionSpec.fields().length;
+      if (level < numPartitionCols) {
+        try (FileSystem fs = 
path.getFileSystem(spark.sparkContext().hadoopConfiguration())) {
+          List<FileStatus> dirs =
+              Stream.of(fs.listStatus(path))
+                  .filter(FileStatus::isDirectory)
+                  .collect(Collectors.toList());
+          for (FileStatus dir : dirs) {
+            // stop recursive call once we reach the expected end of 
partitions as per table schema
+            if (level == numPartitionCols - 1) {
+              leafDirs.add(dir);
+            } else {
+              leafDirs.addAll(listLeafDirs(spark, dir.getPath(), 
partitionSpec, level + 1));
+            }
+          }
+        }
+      }
+      return leafDirs;
+    }
+
+    @Override
+    public scala.collection.immutable.Map<Path, FileStatus[]> 
leafDirToChildrenFiles() {
+      if (cachedLeafDirToChildFiles == null) {
+        List<Tuple2<Path, FileStatus[]>> tuple2s =
+            JavaConverters.seqAsJavaList(leafFiles().values().toSeq()).stream()
+                .map(
+                    fileStatus -> {
+                      // Create an empty data file in the leaf dir.
+                      // As this index is only used to list partition 
directories,
+                      // we can stop listing the leaf dir to avoid unnecessary 
listing which can
+                      // take a while on folders with 1000s of files
+                      return new Tuple2<>(
+                          fileStatus.getPath(),
+                          new FileStatus[] 
{createEmptyChildDataFileStatus(fileStatus)});
+                    })
+                .collect(Collectors.toList());
+        cachedLeafDirToChildFiles =
+            (scala.collection.immutable.Map<Path, FileStatus[]>)
+                Map$.MODULE$.apply(JavaConverters.asScalaBuffer(tuple2s));
+      }
+      return cachedLeafDirToChildFiles;
+    }
+
+    private FileStatus createEmptyChildDataFileStatus(FileStatus fileStatus) {
+      return new FileStatus(
+          1L,
+          false,
+          fileStatus.getReplication(),
+          1L,
+          fileStatus.getModificationTime(),
+          fileStatus.getAccessTime(),
+          fileStatus.getPermission(),
+          fileStatus.getOwner(),
+          fileStatus.getGroup(),
+          new Path(fileStatus.getPath(), fileStatus.getPath().toString() + 
"/dummyDataFile"));
+    }

Review Comment:
   maybe add a comment beside boolean parameter to help with reading? 
https://github.com/apache/iceberg/blob/master/CONTRIBUTING.md#boolean-arguments
   ```java
         return new FileStatus(
             1L,
             false, /* is directory */
             fileStatus.getReplication(),
             1L,
             fileStatus.getModificationTime(),
             fileStatus.getAccessTime(),
             fileStatus.getPermission(),
             fileStatus.getOwner(),
             fileStatus.getGroup(),
             new Path(fileStatus.getPath(), fileStatus.getPath().toString() + 
"/dummyDataFile"));
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to