This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 80bb0cff3 [GLUTEN-3378][VL] Feat: Support read iceberg mor table for 
Velox backend (#4779)
80bb0cff3 is described below

commit 80bb0cff3d953fef3c133040776a791ccab87dc5
Author: Joey <[email protected]>
AuthorDate: Fri Mar 15 11:48:11 2024 +0800

    [GLUTEN-3378][VL] Feat: Support read iceberg mor table for Velox backend 
(#4779)
    
    Velox add iceberg mor table read support in facebookincubator/velox#7847. 
This PR supports read iceberg mor table for Velox backend.
---
 cpp/velox/CMakeLists.txt                           |   1 +
 cpp/velox/compute/VeloxPlanConverter.cc            |   4 +
 cpp/velox/compute/WholeStageResultIterator.cc      |  44 +++++--
 cpp/velox/compute/WholeStageResultIterator.h       |   2 +
 cpp/velox/compute/iceberg/IcebergPlanConverter.cc  |  84 +++++++++++++
 .../velox/compute/iceberg/IcebergPlanConverter.h   |  37 +++---
 cpp/velox/substrait/SubstraitToVeloxPlan.h         |   3 +
 .../substrait/rel/LocalFilesNode.java              |  10 +-
 .../substrait/proto/substrait/algebra.proto        |  29 ++++-
 .../substrait/rel/IcebergLocalFilesBuilder.java    |  17 ++-
 .../substrait/rel/IcebergLocalFilesNode.java       | 110 +++++++++++-----
 .../spark/source/GlutenIcebergSourceUtil.scala     |  28 +++--
 .../execution/VeloxIcebergSuite.scala              | 140 +++++++++++++++++++--
 13 files changed, 423 insertions(+), 86 deletions(-)

diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index b15fd395f..35d05d442 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -296,6 +296,7 @@ set(VELOX_SRCS
     compute/VeloxRuntime.cc
     compute/WholeStageResultIterator.cc
     compute/VeloxPlanConverter.cc
+    compute/iceberg/IcebergPlanConverter.cc
     jni/VeloxJniWrapper.cc
     jni/JniFileSystem.cc
     jni/JniUdf.cc
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc 
b/cpp/velox/compute/VeloxPlanConverter.cc
index 8ca9f85cd..370655c3b 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -20,6 +20,7 @@
 
 #include "compute/ResultIterator.h"
 #include "config/GlutenConfig.h"
+#include "iceberg/IcebergPlanConverter.h"
 #include "operators/plannodes/RowVectorStream.h"
 #include "velox/common/file/FileSystems.h"
 
@@ -93,6 +94,9 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
       case SubstraitFileFormatCase::kText:
         splitInfo->format = dwio::common::FileFormat::TEXT;
         break;
+      case SubstraitFileFormatCase::kIceberg:
+        splitInfo = IcebergPlanConverter::parseIcebergSplitInfo(file, 
std::move(splitInfo));
+        break;
       default:
         splitInfo->format = dwio::common::FileFormat::UNKNOWN;
         break;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 86431819b..f645661b7 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -154,19 +154,37 @@ WholeStageResultIterator::WholeStageResultIterator(
       auto metadataColumn = metadataColumns[idx];
       std::unordered_map<std::string, std::optional<std::string>> 
partitionKeys;
       constructPartitionColumns(partitionKeys, partitionColumn);
-      auto split = 
std::make_shared<velox::connector::hive::HiveConnectorSplit>(
-          kHiveConnectorId,
-          paths[idx],
-          format,
-          starts[idx],
-          lengths[idx],
-          partitionKeys,
-          std::nullopt,
-          std::unordered_map<std::string, std::string>(),
-          nullptr,
-          std::unordered_map<std::string, std::string>(),
-          0,
-          metadataColumn);
+      std::shared_ptr<velox::connector::ConnectorSplit> split;
+      if (auto icebergSplitInfo = 
std::dynamic_pointer_cast<IcebergSplitInfo>(scanInfo)) {
+        // Set Iceberg split.
+        std::unordered_map<std::string, std::string> 
customSplitInfo{{"table_format", "hive-iceberg"}};
+        auto deleteFiles = icebergSplitInfo->deleteFilesVec[idx];
+        split = 
std::make_shared<velox::connector::hive::iceberg::HiveIcebergSplit>(
+            kHiveConnectorId,
+            paths[idx],
+            format,
+            starts[idx],
+            lengths[idx],
+            partitionKeys,
+            std::nullopt,
+            customSplitInfo,
+            nullptr,
+            deleteFiles);
+      } else {
+        split = std::make_shared<velox::connector::hive::HiveConnectorSplit>(
+            kHiveConnectorId,
+            paths[idx],
+            format,
+            starts[idx],
+            lengths[idx],
+            partitionKeys,
+            std::nullopt,
+            std::unordered_map<std::string, std::string>(),
+            nullptr,
+            std::unordered_map<std::string, std::string>(),
+            0,
+            metadataColumn);
+      }
       connectorSplits.emplace_back(split);
     }
 
diff --git a/cpp/velox/compute/WholeStageResultIterator.h 
b/cpp/velox/compute/WholeStageResultIterator.h
index 082cc6397..10c1937b7 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -17,11 +17,13 @@
 #pragma once
 
 #include "compute/Runtime.h"
+#include "iceberg/IcebergPlanConverter.h"
 #include "memory/ColumnarBatchIterator.h"
 #include "memory/VeloxColumnarBatch.h"
 #include "substrait/SubstraitToVeloxPlan.h"
 #include "substrait/plan.pb.h"
 #include "utils/metrics.h"
+#include "velox/connectors/hive/iceberg/IcebergSplit.h"
 #include "velox/core/Config.h"
 #include "velox/core/PlanNode.h"
 #include "velox/exec/Task.h"
diff --git a/cpp/velox/compute/iceberg/IcebergPlanConverter.cc 
b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc
new file mode 100644
index 000000000..07c40e6e1
--- /dev/null
+++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "IcebergPlanConverter.h"
+
+namespace gluten {
+
+std::shared_ptr<IcebergSplitInfo> IcebergPlanConverter::parseIcebergSplitInfo(
+    substrait::ReadRel_LocalFiles_FileOrFiles file,
+    std::shared_ptr<SplitInfo> splitInfo) {
+  using SubstraitFileFormatCase = 
::substrait::ReadRel_LocalFiles_FileOrFiles::IcebergReadOptions::FileFormatCase;
+  using SubstraitDeleteFileFormatCase =
+      
::substrait::ReadRel_LocalFiles_FileOrFiles::IcebergReadOptions::DeleteFile::FileFormatCase;
+  auto icebergSplitInfo = 
std::dynamic_pointer_cast<IcebergSplitInfo>(splitInfo)
+      ? std::dynamic_pointer_cast<IcebergSplitInfo>(splitInfo)
+      : std::make_shared<IcebergSplitInfo>(*splitInfo);
+  auto icebergReadOption = file.iceberg();
+  switch (icebergReadOption.file_format_case()) {
+    case SubstraitFileFormatCase::kParquet:
+      icebergSplitInfo->format = dwio::common::FileFormat::PARQUET;
+      break;
+    case SubstraitFileFormatCase::kOrc:
+      icebergSplitInfo->format = dwio::common::FileFormat::ORC;
+      break;
+    default:
+      icebergSplitInfo->format = dwio::common::FileFormat::UNKNOWN;
+      break;
+  }
+  if (icebergReadOption.delete_files_size() > 0) {
+    auto deleteFiles = icebergReadOption.delete_files();
+    std::vector<IcebergDeleteFile> deletes;
+    deletes.reserve(icebergReadOption.delete_files_size());
+    for (auto i = 0; i < icebergReadOption.delete_files_size(); i++) {
+      auto deleteFile = icebergReadOption.delete_files().Get(i);
+      dwio::common::FileFormat format;
+      FileContent fileContent;
+      switch (deleteFile.file_format_case()) {
+        case SubstraitDeleteFileFormatCase::kParquet:
+          format = dwio::common::FileFormat::PARQUET;
+          break;
+        case SubstraitDeleteFileFormatCase::kOrc:
+          format = dwio::common::FileFormat::ORC;
+          break;
+        default:
+          format = dwio::common::FileFormat::UNKNOWN;
+      }
+      switch (deleteFile.filecontent()) {
+        case 
::substrait::ReadRel_LocalFiles_FileOrFiles_IcebergReadOptions_FileContent_POSITION_DELETES:
+          fileContent = FileContent::kPositionalDeletes;
+          break;
+        case 
::substrait::ReadRel_LocalFiles_FileOrFiles_IcebergReadOptions_FileContent_EQUALITY_DELETES:
+          fileContent = FileContent::kEqualityDeletes;
+          break;
+        default:
+          fileContent = FileContent::kData;
+          break;
+      }
+      deletes.emplace_back(IcebergDeleteFile(
+          fileContent, deleteFile.filepath(), format, 
deleteFile.recordcount(), deleteFile.filesize()));
+    }
+    icebergSplitInfo->deleteFilesVec.emplace_back(deletes);
+  } else {
+    // Add an empty delete files vector to indicate that this data file has no 
delete file.
+    
icebergSplitInfo->deleteFilesVec.emplace_back(std::vector<IcebergDeleteFile>{});
+  }
+
+  return icebergSplitInfo;
+}
+
+} // namespace gluten
diff --git 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
 b/cpp/velox/compute/iceberg/IcebergPlanConverter.h
similarity index 53%
copy from 
gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
copy to cpp/velox/compute/iceberg/IcebergPlanConverter.h
index 3452836cf..d634a861f 100644
--- 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
+++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.h
@@ -14,24 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package io.glutenproject.substrait.rel;
 
-import java.util.List;
-import java.util.Map;
+#pragma once
 
-public class IcebergLocalFilesBuilder {
+#include "substrait/SubstraitToVeloxPlan.h"
+#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
 
-  // TODO: Add makeIcebergLocalFiles for MOR iceberg table
+using namespace facebook::velox::connector::hive::iceberg;
 
-  public static IcebergLocalFilesNode makeIcebergLocalFiles(
-      Integer index,
-      List<String> paths,
-      List<Long> starts,
-      List<Long> lengths,
-      List<Map<String, String>> partitionColumns,
-      LocalFilesNode.ReadFileFormat fileFormat,
-      List<String> preferredLocations) {
-    return new IcebergLocalFilesNode(
-        index, paths, starts, lengths, partitionColumns, fileFormat, 
preferredLocations);
+namespace gluten {
+struct IcebergSplitInfo : SplitInfo {
+  std::vector<std::vector<IcebergDeleteFile>> deleteFilesVec;
+
+  IcebergSplitInfo(const SplitInfo& splitInfo) : SplitInfo(splitInfo) {
+    // Reserve the actual size of the deleteFilesVec.
+    deleteFilesVec.reserve(splitInfo.paths.capacity());
   }
-}
+};
+
+class IcebergPlanConverter {
+ public:
+  static std::shared_ptr<IcebergSplitInfo> parseIcebergSplitInfo(
+      substrait::ReadRel_LocalFiles_FileOrFiles file,
+      std::shared_ptr<SplitInfo> splitInfo);
+};
+
+} // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h 
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 895c1d24e..59d3312cb 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -50,6 +50,9 @@ struct SplitInfo {
 
   /// The file format of the files to be scanned.
   dwio::common::FileFormat format;
+
+  /// Make SplitInfo polymorphic
+  virtual ~SplitInfo() = default;
 };
 
 /// This class is used to convert the Substrait plan into Velox plan.
diff --git 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
index e0700ded2..852d7558e 100644
--- 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
+++ 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
@@ -50,7 +50,7 @@ public class LocalFilesNode implements SplitInfo {
     UnknownFormat()
   }
 
-  private ReadFileFormat fileFormat = ReadFileFormat.UnknownFormat;
+  protected ReadFileFormat fileFormat = ReadFileFormat.UnknownFormat;
   private Boolean iterAsInput = false;
   private StructType fileSchema;
   private Map<String, String> fileReadProperties;
@@ -112,6 +112,13 @@ public class LocalFilesNode implements SplitInfo {
     return this.preferredLocations;
   }
 
+  /**
+   * Data Lake formats require some additional processing to be done on the 
FileBuilder, such as
+   * inserting delete files information. Different lake formats should 
override this method to
+   * implement their corresponding logic.
+   */
+  protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder 
fileBuilder) {}
+
   public ReadRel.LocalFiles toProtobuf() {
     ReadRel.LocalFiles.Builder localFilesBuilder = 
ReadRel.LocalFiles.newBuilder();
     // The input is iterator, and the path is in the format of: Iterator:index.
@@ -210,6 +217,7 @@ public class LocalFilesNode implements SplitInfo {
         default:
           break;
       }
+      processFileBuilder(fileBuilder);
       localFilesBuilder.addItems(fileBuilder.build());
     }
     return localFilesBuilder.build();
diff --git 
a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto 
b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
index 63a0f36ea..e9ed0f5ef 100644
--- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
+++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -149,6 +149,28 @@ message ReadRel {
         uint64 max_block_size = 1;
         NamedStruct schema = 2 [deprecated=true];
       }
+      message IcebergReadOptions {
+        enum FileContent {
+          DATA = 0;
+          POSITION_DELETES = 1;
+          EQUALITY_DELETES = 2;
+        }
+        message DeleteFile {
+          FileContent fileContent = 1;
+          string filePath = 2;
+          uint64 fileSize = 3;
+          uint64 recordCount = 4;
+          oneof file_format {
+            ParquetReadOptions parquet = 5;
+            OrcReadOptions orc = 6;
+          }
+        }
+        oneof file_format {
+          ParquetReadOptions parquet = 1;
+          OrcReadOptions orc = 2;
+        }
+        repeated DeleteFile delete_files = 3;
+      }
 
       // File reading options
       oneof file_format {
@@ -159,22 +181,23 @@ message ReadRel {
         DwrfReadOptions dwrf = 13;
         TextReadOptions text = 14;
         JsonReadOptions json = 15;
+        IcebergReadOptions iceberg = 16;
       }
 
       message partitionColumn {
         string key = 1;
         string value = 2;
       }
-      repeated partitionColumn partition_columns = 16;
+      repeated partitionColumn partition_columns = 17;
 
       /// File schema
-      NamedStruct schema = 17;
+      NamedStruct schema = 18;
 
       message metadataColumn {
         string key = 1;
         string value = 2;
       }
-      repeated metadataColumn metadata_columns = 18;
+      repeated metadataColumn metadata_columns = 19;
     }
   }
 }
diff --git 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
 
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
index 3452836cf..773f3073b 100644
--- 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
+++ 
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
@@ -16,13 +16,12 @@
  */
 package io.glutenproject.substrait.rel;
 
+import org.apache.iceberg.DeleteFile;
+
 import java.util.List;
 import java.util.Map;
 
 public class IcebergLocalFilesBuilder {
-
-  // TODO: Add makeIcebergLocalFiles for MOR iceberg table
-
   public static IcebergLocalFilesNode makeIcebergLocalFiles(
       Integer index,
       List<String> paths,
@@ -30,8 +29,16 @@ public class IcebergLocalFilesBuilder {
       List<Long> lengths,
       List<Map<String, String>> partitionColumns,
       LocalFilesNode.ReadFileFormat fileFormat,
-      List<String> preferredLocations) {
+      List<String> preferredLocations,
+      Map<String, List<DeleteFile>> deleteFilesMap) {
     return new IcebergLocalFilesNode(
-        index, paths, starts, lengths, partitionColumns, fileFormat, 
preferredLocations);
+        index,
+        paths,
+        starts,
+        lengths,
+        partitionColumns,
+        fileFormat,
+        preferredLocations,
+        deleteFilesMap);
   }
 }
diff --git 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
 
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
index 98cc0d90e..903bd198a 100644
--- 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
+++ 
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
@@ -16,40 +16,18 @@
  */
 package io.glutenproject.substrait.rel;
 
+import io.glutenproject.GlutenConfig;
+
+import io.substrait.proto.ReadRel;
+import org.apache.iceberg.DeleteFile;
+
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 public class IcebergLocalFilesNode extends LocalFilesNode {
-
-  class DeleteFile {
-    private final String path;
-    private final Integer fileContent;
-    private final ReadFileFormat fileFormat;
-    private final Long fileSize;
-    private final Long recordCount;
-    private final Map<Integer, String> lowerBounds;
-    private final Map<Integer, String> upperBounds;
-
-    DeleteFile(
-        String path,
-        Integer fileContent,
-        ReadFileFormat fileFormat,
-        Long fileSize,
-        Long recordCount,
-        Map<Integer, String> lowerBounds,
-        Map<Integer, String> upperBounds) {
-      this.path = path;
-      this.fileContent = fileContent;
-      this.fileFormat = fileFormat;
-      this.fileSize = fileSize;
-      this.recordCount = recordCount;
-      this.lowerBounds = lowerBounds;
-      this.upperBounds = upperBounds;
-    }
-  }
-
-  // TODO: Add delete file support for MOR iceberg table
+  private final Map<String, List<DeleteFile>> deleteFilesMap;
 
   IcebergLocalFilesNode(
       Integer index,
@@ -58,7 +36,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
       List<Long> lengths,
       List<Map<String, String>> partitionColumns,
       ReadFileFormat fileFormat,
-      List<String> preferredLocations) {
+      List<String> preferredLocations,
+      Map<String, List<DeleteFile>> deleteFilesMap) {
     super(
         index,
         paths,
@@ -68,5 +47,76 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
         new ArrayList<>(),
         fileFormat,
         preferredLocations);
+    this.deleteFilesMap = deleteFilesMap;
+  }
+
+  @Override
+  protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder 
fileBuilder) {
+    List<DeleteFile> deleteFiles =
+        deleteFilesMap.getOrDefault(fileBuilder.getUriFile(), 
Collections.emptyList());
+    ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.Builder icebergBuilder =
+        ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.newBuilder();
+
+    switch (fileFormat) {
+      case ParquetReadFormat:
+        ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions =
+            ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder()
+                .setEnableRowGroupMaxminIndex(
+                    GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex())
+                .build();
+        icebergBuilder.setParquet(parquetReadOptions);
+        break;
+      case OrcReadFormat:
+        ReadRel.LocalFiles.FileOrFiles.OrcReadOptions orcReadOptions =
+            ReadRel.LocalFiles.FileOrFiles.OrcReadOptions.newBuilder().build();
+        icebergBuilder.setOrc(orcReadOptions);
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported file format " + fileFormat.name() + " for iceberg 
data file.");
+    }
+
+    for (DeleteFile delete : deleteFiles) {
+      ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.DeleteFile.Builder 
deleteFileBuilder =
+          
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.DeleteFile.newBuilder();
+      ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent 
fileContent;
+      switch (delete.content()) {
+        case EQUALITY_DELETES:
+          fileContent =
+              
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent.EQUALITY_DELETES;
+          break;
+        case POSITION_DELETES:
+          fileContent =
+              
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent.POSITION_DELETES;
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported FileCount " + delete.content().name() + " for 
delete file.");
+      }
+      deleteFileBuilder.setFileContent(fileContent);
+      deleteFileBuilder.setFilePath(delete.path().toString());
+      deleteFileBuilder.setFileSize(delete.fileSizeInBytes());
+      deleteFileBuilder.setRecordCount(delete.recordCount());
+      switch (delete.format()) {
+        case PARQUET:
+          ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions 
=
+              ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder()
+                  .setEnableRowGroupMaxminIndex(
+                      
GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex())
+                  .build();
+          deleteFileBuilder.setParquet(parquetReadOptions);
+          break;
+        case ORC:
+          ReadRel.LocalFiles.FileOrFiles.OrcReadOptions orcReadOptions =
+              
ReadRel.LocalFiles.FileOrFiles.OrcReadOptions.newBuilder().build();
+          deleteFileBuilder.setOrc(orcReadOptions);
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported format " + delete.format().name() + " for delete 
file.");
+      }
+      icebergBuilder.addDeleteFiles(deleteFileBuilder);
+    }
+    fileBuilder.setIceberg(icebergBuilder);
   }
 }
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index 6607ff3b9..74add7a9a 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -24,10 +24,10 @@ import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
 import org.apache.spark.sql.types.StructType
 
-import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, 
ScanTask}
+import org.apache.iceberg.{CombinedScanTask, DeleteFile, FileFormat, 
FileScanTask, ScanTask}
 
 import java.lang.{Long => JLong}
-import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, 
Map => JMap}
 
 import scala.collection.JavaConverters._
 
@@ -39,22 +39,21 @@ object GlutenIcebergSourceUtil {
       val starts = new JArrayList[JLong]()
       val lengths = new JArrayList[JLong]()
       val partitionColumns = new JArrayList[JMap[String, String]]()
+      val deleteFilesMap = new JHashMap[String, JList[DeleteFile]]()
       var fileFormat = ReadFileFormat.UnknownFormat
 
       val tasks = partition.taskGroup[ScanTask]().tasks().asScala
       asFileScanTask(tasks.toList).foreach {
         task =>
-          paths.add(task.file().path().toString)
+          val filePath = task.file().path().toString
+          paths.add(filePath)
           starts.add(task.start())
           lengths.add(task.length())
           partitionColumns.add(getPartitionColumns(task))
-          val currentFileFormat = task.file().format() match {
-            case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat
-            case FileFormat.ORC => ReadFileFormat.OrcReadFormat
-            case _ =>
-              throw new UnsupportedOperationException(
-                "Iceberg Only support parquet and orc file format.")
+          if (!task.deletes().isEmpty) {
+            deleteFilesMap.put(filePath, task.deletes())
           }
+          val currentFileFormat = convertFileFormat(task.file().format())
           if (fileFormat == ReadFileFormat.UnknownFormat) {
             fileFormat = currentFileFormat
           } else if (fileFormat != currentFileFormat) {
@@ -73,7 +72,8 @@ object GlutenIcebergSourceUtil {
         lengths,
         partitionColumns,
         fileFormat,
-        preferredLoc.toList.asJava
+        preferredLoc.toList.asJava,
+        deleteFilesMap
       )
     case _ =>
       throw new UnsupportedOperationException("Only support iceberg 
SparkInputPartition.")
@@ -152,4 +152,12 @@ object GlutenIcebergSourceUtil {
     }
     partitionColumns
   }
+
+  def convertFileFormat(icebergFileFormat: FileFormat): ReadFileFormat =
+    icebergFileFormat match {
+      case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat
+      case FileFormat.ORC => ReadFileFormat.OrcReadFormat
+      case _ =>
+        throw new UnsupportedOperationException("Iceberg Only support parquet 
and orc file format.")
+    }
 }
diff --git 
a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala
 
b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala
index 019c60295..6b332641e 100644
--- 
a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala
@@ -59,15 +59,17 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
   }
 
   test("iceberg transformer exists") {
-    spark.sql("""
-                |create table iceberg_tb using iceberg as
-                |(select 1 as col1, 2 as col2, 3 as col3)
-                |""".stripMargin)
+    withTable("iceberg_tb") {
+      spark.sql("""
+                  |create table iceberg_tb using iceberg as
+                  |(select 1 as col1, 2 as col2, 3 as col3)
+                  |""".stripMargin)
 
-    runQueryAndCompare("""
-                         |select * from iceberg_tb;
-                         |""".stripMargin) {
-      checkOperatorMatch[IcebergScanTransformer]
+      runQueryAndCompare("""
+                           |select * from iceberg_tb;
+                           |""".stripMargin) {
+        checkOperatorMatch[IcebergScanTransformer]
+      }
     }
   }
 
@@ -314,4 +316,126 @@ class VeloxIcebergSuite extends 
WholeStageTransformerSuite {
       }
     }
   }
+
+  test("iceberg read mor table - delete and update") {
+    withTable("iceberg_mor_tb") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+        spark.sql("""
+                    |create table iceberg_mor_tb (
+                    |  id int,
+                    |  name string,
+                    |  p string
+                    |) using iceberg
+                    |tblproperties (
+                    |  'format-version' = '2',
+                    |  'write.delete.mode' = 'merge-on-read',
+                    |  'write.update.mode' = 'merge-on-read',
+                    |  'write.merge.mode' = 'merge-on-read'
+                    |)
+                    |partitioned by (p);
+                    |""".stripMargin)
+
+        // Insert some test rows.
+        spark.sql("""
+                    |insert into table iceberg_mor_tb
+                    |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+                    |       (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+                    |""".stripMargin)
+
+        // Delete row.
+        spark.sql(
+          """
+            |delete from iceberg_mor_tb where name = 'a1';
+            |""".stripMargin
+        )
+        // Update row.
+        spark.sql(
+          """
+            |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+            |""".stripMargin
+        )
+        // Delete row again.
+        spark.sql(
+          """
+            |delete from iceberg_mor_tb where id = 6;
+            |""".stripMargin
+        )
+      }
+      runQueryAndCompare("""
+                           |select * from iceberg_mor_tb;
+                           |""".stripMargin) {
+        checkOperatorMatch[IcebergScanTransformer]
+      }
+    }
+  }
+
+  test("iceberg read mor table - merge into") {
+    withTable("iceberg_mor_tb", "merge_into_source_tb") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+        spark.sql("""
+                    |create table iceberg_mor_tb (
+                    |  id int,
+                    |  name string,
+                    |  p string
+                    |) using iceberg
+                    |tblproperties (
+                    |  'format-version' = '2',
+                    |  'write.delete.mode' = 'merge-on-read',
+                    |  'write.update.mode' = 'merge-on-read',
+                    |  'write.merge.mode' = 'merge-on-read'
+                    |)
+                    |partitioned by (p);
+                    |""".stripMargin)
+        spark.sql("""
+                    |create table merge_into_source_tb (
+                    |  id int,
+                    |  name string,
+                    |  p string
+                    |) using iceberg;
+                    |""".stripMargin)
+
+        // Insert some test rows.
+        spark.sql("""
+                    |insert into table iceberg_mor_tb
+                    |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
+                    |""".stripMargin)
+        spark.sql("""
+                    |insert into table merge_into_source_tb
+                    |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 
'p1'),
+                    |       (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
+                    |""".stripMargin)
+
+        // Delete row.
+        spark.sql(
+          """
+            |delete from iceberg_mor_tb where name = 'a1';
+            |""".stripMargin
+        )
+        // Update row.
+        spark.sql(
+          """
+            |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+            |""".stripMargin
+        )
+
+        // Merge into.
+        spark.sql(
+          """
+            |merge into iceberg_mor_tb t
+            |using (select * from merge_into_source_tb) s
+            |on t.id = s.id
+            |when matched then
+            | update set t.name = s.name, t.p = s.p
+            |when not matched then
+            | insert (id, name, p) values (s.id, s.name, s.p);
+            |""".stripMargin
+        )
+      }
+      runQueryAndCompare("""
+                           |select * from iceberg_mor_tb;
+                           |""".stripMargin) {
+        checkOperatorMatch[IcebergScanTransformer]
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to