This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new bb0464797 [GLUTEN-3378][VL][FOLLOWUP] Use List to store Iceberg delete 
files (#4971)
bb0464797 is described below

commit bb0464797a2fb129b345cc5e0b815ddf53da9b2f
Author: Joey <[email protected]>
AuthorDate: Wed Mar 20 15:25:46 2024 +0800

    [GLUTEN-3378][VL][FOLLOWUP] Use List to store Iceberg delete files (#4971)
---
 .../java/io/glutenproject/substrait/rel/LocalFilesNode.java  |  5 +++--
 .../substrait/rel/IcebergLocalFilesBuilder.java              |  4 ++--
 .../glutenproject/substrait/rel/IcebergLocalFilesNode.java   | 12 +++++-------
 .../iceberg/spark/source/GlutenIcebergSourceUtil.scala       |  8 +++-----
 4 files changed, 13 insertions(+), 16 deletions(-)

diff --git 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
index 852d7558e..2a4866709 100644
--- 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
+++ 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
@@ -117,7 +117,8 @@ public class LocalFilesNode implements SplitInfo {
    * inserting delete files information. Different lake formats should 
override this method to
    * implement their corresponding logic.
    */
-  protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder 
fileBuilder) {}
+  protected void processFileBuilder(
+      ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder, int index) {}
 
   public ReadRel.LocalFiles toProtobuf() {
     ReadRel.LocalFiles.Builder localFilesBuilder = 
ReadRel.LocalFiles.newBuilder();
@@ -217,7 +218,7 @@ public class LocalFilesNode implements SplitInfo {
         default:
           break;
       }
-      processFileBuilder(fileBuilder);
+      processFileBuilder(fileBuilder, i);
       localFilesBuilder.addItems(fileBuilder.build());
     }
     return localFilesBuilder.build();
diff --git 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
 
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
index 773f3073b..b6ee70580 100644
--- 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
+++ 
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
@@ -30,7 +30,7 @@ public class IcebergLocalFilesBuilder {
       List<Map<String, String>> partitionColumns,
       LocalFilesNode.ReadFileFormat fileFormat,
       List<String> preferredLocations,
-      Map<String, List<DeleteFile>> deleteFilesMap) {
+      List<List<DeleteFile>> deleteFilesList) {
     return new IcebergLocalFilesNode(
         index,
         paths,
@@ -39,6 +39,6 @@ public class IcebergLocalFilesBuilder {
         partitionColumns,
         fileFormat,
         preferredLocations,
-        deleteFilesMap);
+        deleteFilesList);
   }
 }
diff --git 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
 
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
index 903bd198a..e25cd5a98 100644
--- 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
+++ 
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
@@ -22,12 +22,11 @@ import io.substrait.proto.ReadRel;
 import org.apache.iceberg.DeleteFile;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 public class IcebergLocalFilesNode extends LocalFilesNode {
-  private final Map<String, List<DeleteFile>> deleteFilesMap;
+  private final List<List<DeleteFile>> deleteFilesList;
 
   IcebergLocalFilesNode(
       Integer index,
@@ -37,7 +36,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
       List<Map<String, String>> partitionColumns,
       ReadFileFormat fileFormat,
       List<String> preferredLocations,
-      Map<String, List<DeleteFile>> deleteFilesMap) {
+      List<List<DeleteFile>> deleteFilesList) {
     super(
         index,
         paths,
@@ -47,13 +46,12 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
         new ArrayList<>(),
         fileFormat,
         preferredLocations);
-    this.deleteFilesMap = deleteFilesMap;
+    this.deleteFilesList = deleteFilesList;
   }
 
   @Override
-  protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder 
fileBuilder) {
-    List<DeleteFile> deleteFiles =
-        deleteFilesMap.getOrDefault(fileBuilder.getUriFile(), 
Collections.emptyList());
+  protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder 
fileBuilder, int index) {
+    List<DeleteFile> deleteFiles = deleteFilesList.get(index);
     ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.Builder icebergBuilder =
         ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.newBuilder();
 
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index 74add7a9a..e85c9f58a 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -39,7 +39,7 @@ object GlutenIcebergSourceUtil {
       val starts = new JArrayList[JLong]()
       val lengths = new JArrayList[JLong]()
       val partitionColumns = new JArrayList[JMap[String, String]]()
-      val deleteFilesMap = new JHashMap[String, JList[DeleteFile]]()
+      val deleteFilesList = new JArrayList[JList[DeleteFile]]()
       var fileFormat = ReadFileFormat.UnknownFormat
 
       val tasks = partition.taskGroup[ScanTask]().tasks().asScala
@@ -50,9 +50,7 @@ object GlutenIcebergSourceUtil {
           starts.add(task.start())
           lengths.add(task.length())
           partitionColumns.add(getPartitionColumns(task))
-          if (!task.deletes().isEmpty) {
-            deleteFilesMap.put(filePath, task.deletes())
-          }
+          deleteFilesList.add(task.deletes());
           val currentFileFormat = convertFileFormat(task.file().format())
           if (fileFormat == ReadFileFormat.UnknownFormat) {
             fileFormat = currentFileFormat
@@ -73,7 +71,7 @@ object GlutenIcebergSourceUtil {
         partitionColumns,
         fileFormat,
         preferredLoc.toList.asJava,
-        deleteFilesMap
+        deleteFilesList
       )
     case _ =>
       throw new UnsupportedOperationException("Only support iceberg 
SparkInputPartition.")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to