This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new bb0464797 [GLUTEN-3378][VL][FOLLOWUP] Use List to store Iceberg delete
files (#4971)
bb0464797 is described below
commit bb0464797a2fb129b345cc5e0b815ddf53da9b2f
Author: Joey <[email protected]>
AuthorDate: Wed Mar 20 15:25:46 2024 +0800
[GLUTEN-3378][VL][FOLLOWUP] Use List to store Iceberg delete files (#4971)
---
.../java/io/glutenproject/substrait/rel/LocalFilesNode.java | 5 +++--
.../substrait/rel/IcebergLocalFilesBuilder.java | 4 ++--
.../glutenproject/substrait/rel/IcebergLocalFilesNode.java | 12 +++++-------
.../iceberg/spark/source/GlutenIcebergSourceUtil.scala | 8 +++-----
4 files changed, 13 insertions(+), 16 deletions(-)
diff --git
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
index 852d7558e..2a4866709 100644
---
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
+++
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
@@ -117,7 +117,8 @@ public class LocalFilesNode implements SplitInfo {
* inserting delete files information. Different lake formats should
override this method to
* implement their corresponding logic.
*/
- protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder
fileBuilder) {}
+ protected void processFileBuilder(
+ ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder, int index) {}
public ReadRel.LocalFiles toProtobuf() {
ReadRel.LocalFiles.Builder localFilesBuilder =
ReadRel.LocalFiles.newBuilder();
@@ -217,7 +218,7 @@ public class LocalFilesNode implements SplitInfo {
default:
break;
}
- processFileBuilder(fileBuilder);
+ processFileBuilder(fileBuilder, i);
localFilesBuilder.addItems(fileBuilder.build());
}
return localFilesBuilder.build();
diff --git
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
index 773f3073b..b6ee70580 100644
---
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
+++
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
@@ -30,7 +30,7 @@ public class IcebergLocalFilesBuilder {
List<Map<String, String>> partitionColumns,
LocalFilesNode.ReadFileFormat fileFormat,
List<String> preferredLocations,
- Map<String, List<DeleteFile>> deleteFilesMap) {
+ List<List<DeleteFile>> deleteFilesList) {
return new IcebergLocalFilesNode(
index,
paths,
@@ -39,6 +39,6 @@ public class IcebergLocalFilesBuilder {
partitionColumns,
fileFormat,
preferredLocations,
- deleteFilesMap);
+ deleteFilesList);
}
}
diff --git
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
index 903bd198a..e25cd5a98 100644
---
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
+++
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
@@ -22,12 +22,11 @@ import io.substrait.proto.ReadRel;
import org.apache.iceberg.DeleteFile;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
public class IcebergLocalFilesNode extends LocalFilesNode {
- private final Map<String, List<DeleteFile>> deleteFilesMap;
+ private final List<List<DeleteFile>> deleteFilesList;
IcebergLocalFilesNode(
Integer index,
@@ -37,7 +36,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
List<Map<String, String>> partitionColumns,
ReadFileFormat fileFormat,
List<String> preferredLocations,
- Map<String, List<DeleteFile>> deleteFilesMap) {
+ List<List<DeleteFile>> deleteFilesList) {
super(
index,
paths,
@@ -47,13 +46,12 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
new ArrayList<>(),
fileFormat,
preferredLocations);
- this.deleteFilesMap = deleteFilesMap;
+ this.deleteFilesList = deleteFilesList;
}
@Override
- protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder
fileBuilder) {
- List<DeleteFile> deleteFiles =
- deleteFilesMap.getOrDefault(fileBuilder.getUriFile(),
Collections.emptyList());
+ protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder
fileBuilder, int index) {
+ List<DeleteFile> deleteFiles = deleteFilesList.get(index);
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.Builder icebergBuilder =
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.newBuilder();
diff --git
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index 74add7a9a..e85c9f58a 100644
---
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -39,7 +39,7 @@ object GlutenIcebergSourceUtil {
val starts = new JArrayList[JLong]()
val lengths = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]()
- val deleteFilesMap = new JHashMap[String, JList[DeleteFile]]()
+ val deleteFilesList = new JArrayList[JList[DeleteFile]]()
var fileFormat = ReadFileFormat.UnknownFormat
val tasks = partition.taskGroup[ScanTask]().tasks().asScala
@@ -50,9 +50,7 @@ object GlutenIcebergSourceUtil {
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(getPartitionColumns(task))
- if (!task.deletes().isEmpty) {
- deleteFilesMap.put(filePath, task.deletes())
- }
+ deleteFilesList.add(task.deletes());
val currentFileFormat = convertFileFormat(task.file().format())
if (fileFormat == ReadFileFormat.UnknownFormat) {
fileFormat = currentFileFormat
@@ -73,7 +71,7 @@ object GlutenIcebergSourceUtil {
partitionColumns,
fileFormat,
preferredLoc.toList.asJava,
- deleteFilesMap
+ deleteFilesList
)
case _ =>
throw new UnsupportedOperationException("Only support iceberg
SparkInputPartition.")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]