This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 80bb0cff3 [GLUTEN-3378][VL] Feat: Support read iceberg mor table for
Velox backend (#4779)
80bb0cff3 is described below
commit 80bb0cff3d953fef3c133040776a791ccab87dc5
Author: Joey <[email protected]>
AuthorDate: Fri Mar 15 11:48:11 2024 +0800
[GLUTEN-3378][VL] Feat: Support read iceberg mor table for Velox backend
(#4779)
Velox add iceberg mor table read support in facebookincubator/velox#7847.
This PR supports read iceberg mor table for Velox backend.
---
cpp/velox/CMakeLists.txt | 1 +
cpp/velox/compute/VeloxPlanConverter.cc | 4 +
cpp/velox/compute/WholeStageResultIterator.cc | 44 +++++--
cpp/velox/compute/WholeStageResultIterator.h | 2 +
cpp/velox/compute/iceberg/IcebergPlanConverter.cc | 84 +++++++++++++
.../velox/compute/iceberg/IcebergPlanConverter.h | 37 +++---
cpp/velox/substrait/SubstraitToVeloxPlan.h | 3 +
.../substrait/rel/LocalFilesNode.java | 10 +-
.../substrait/proto/substrait/algebra.proto | 29 ++++-
.../substrait/rel/IcebergLocalFilesBuilder.java | 17 ++-
.../substrait/rel/IcebergLocalFilesNode.java | 110 +++++++++++-----
.../spark/source/GlutenIcebergSourceUtil.scala | 28 +++--
.../execution/VeloxIcebergSuite.scala | 140 +++++++++++++++++++--
13 files changed, 423 insertions(+), 86 deletions(-)
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index b15fd395f..35d05d442 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -296,6 +296,7 @@ set(VELOX_SRCS
compute/VeloxRuntime.cc
compute/WholeStageResultIterator.cc
compute/VeloxPlanConverter.cc
+ compute/iceberg/IcebergPlanConverter.cc
jni/VeloxJniWrapper.cc
jni/JniFileSystem.cc
jni/JniUdf.cc
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc
b/cpp/velox/compute/VeloxPlanConverter.cc
index 8ca9f85cd..370655c3b 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -20,6 +20,7 @@
#include "compute/ResultIterator.h"
#include "config/GlutenConfig.h"
+#include "iceberg/IcebergPlanConverter.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/common/file/FileSystems.h"
@@ -93,6 +94,9 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
case SubstraitFileFormatCase::kText:
splitInfo->format = dwio::common::FileFormat::TEXT;
break;
+ case SubstraitFileFormatCase::kIceberg:
+ splitInfo = IcebergPlanConverter::parseIcebergSplitInfo(file,
std::move(splitInfo));
+ break;
default:
splitInfo->format = dwio::common::FileFormat::UNKNOWN;
break;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 86431819b..f645661b7 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -154,19 +154,37 @@ WholeStageResultIterator::WholeStageResultIterator(
auto metadataColumn = metadataColumns[idx];
std::unordered_map<std::string, std::optional<std::string>>
partitionKeys;
constructPartitionColumns(partitionKeys, partitionColumn);
- auto split =
std::make_shared<velox::connector::hive::HiveConnectorSplit>(
- kHiveConnectorId,
- paths[idx],
- format,
- starts[idx],
- lengths[idx],
- partitionKeys,
- std::nullopt,
- std::unordered_map<std::string, std::string>(),
- nullptr,
- std::unordered_map<std::string, std::string>(),
- 0,
- metadataColumn);
+ std::shared_ptr<velox::connector::ConnectorSplit> split;
+ if (auto icebergSplitInfo =
std::dynamic_pointer_cast<IcebergSplitInfo>(scanInfo)) {
+ // Set Iceberg split.
+ std::unordered_map<std::string, std::string>
customSplitInfo{{"table_format", "hive-iceberg"}};
+ auto deleteFiles = icebergSplitInfo->deleteFilesVec[idx];
+ split =
std::make_shared<velox::connector::hive::iceberg::HiveIcebergSplit>(
+ kHiveConnectorId,
+ paths[idx],
+ format,
+ starts[idx],
+ lengths[idx],
+ partitionKeys,
+ std::nullopt,
+ customSplitInfo,
+ nullptr,
+ deleteFiles);
+ } else {
+ split = std::make_shared<velox::connector::hive::HiveConnectorSplit>(
+ kHiveConnectorId,
+ paths[idx],
+ format,
+ starts[idx],
+ lengths[idx],
+ partitionKeys,
+ std::nullopt,
+ std::unordered_map<std::string, std::string>(),
+ nullptr,
+ std::unordered_map<std::string, std::string>(),
+ 0,
+ metadataColumn);
+ }
connectorSplits.emplace_back(split);
}
diff --git a/cpp/velox/compute/WholeStageResultIterator.h
b/cpp/velox/compute/WholeStageResultIterator.h
index 082cc6397..10c1937b7 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -17,11 +17,13 @@
#pragma once
#include "compute/Runtime.h"
+#include "iceberg/IcebergPlanConverter.h"
#include "memory/ColumnarBatchIterator.h"
#include "memory/VeloxColumnarBatch.h"
#include "substrait/SubstraitToVeloxPlan.h"
#include "substrait/plan.pb.h"
#include "utils/metrics.h"
+#include "velox/connectors/hive/iceberg/IcebergSplit.h"
#include "velox/core/Config.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/Task.h"
diff --git a/cpp/velox/compute/iceberg/IcebergPlanConverter.cc
b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc
new file mode 100644
index 000000000..07c40e6e1
--- /dev/null
+++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "IcebergPlanConverter.h"
+
+namespace gluten {
+
+std::shared_ptr<IcebergSplitInfo> IcebergPlanConverter::parseIcebergSplitInfo(
+ substrait::ReadRel_LocalFiles_FileOrFiles file,
+ std::shared_ptr<SplitInfo> splitInfo) {
+ using SubstraitFileFormatCase =
::substrait::ReadRel_LocalFiles_FileOrFiles::IcebergReadOptions::FileFormatCase;
+ using SubstraitDeleteFileFormatCase =
+
::substrait::ReadRel_LocalFiles_FileOrFiles::IcebergReadOptions::DeleteFile::FileFormatCase;
+ auto icebergSplitInfo =
std::dynamic_pointer_cast<IcebergSplitInfo>(splitInfo)
+ ? std::dynamic_pointer_cast<IcebergSplitInfo>(splitInfo)
+ : std::make_shared<IcebergSplitInfo>(*splitInfo);
+ auto icebergReadOption = file.iceberg();
+ switch (icebergReadOption.file_format_case()) {
+ case SubstraitFileFormatCase::kParquet:
+ icebergSplitInfo->format = dwio::common::FileFormat::PARQUET;
+ break;
+ case SubstraitFileFormatCase::kOrc:
+ icebergSplitInfo->format = dwio::common::FileFormat::ORC;
+ break;
+ default:
+ icebergSplitInfo->format = dwio::common::FileFormat::UNKNOWN;
+ break;
+ }
+ if (icebergReadOption.delete_files_size() > 0) {
+ auto deleteFiles = icebergReadOption.delete_files();
+ std::vector<IcebergDeleteFile> deletes;
+ deletes.reserve(icebergReadOption.delete_files_size());
+ for (auto i = 0; i < icebergReadOption.delete_files_size(); i++) {
+ auto deleteFile = icebergReadOption.delete_files().Get(i);
+ dwio::common::FileFormat format;
+ FileContent fileContent;
+ switch (deleteFile.file_format_case()) {
+ case SubstraitDeleteFileFormatCase::kParquet:
+ format = dwio::common::FileFormat::PARQUET;
+ break;
+ case SubstraitDeleteFileFormatCase::kOrc:
+ format = dwio::common::FileFormat::ORC;
+ break;
+ default:
+ format = dwio::common::FileFormat::UNKNOWN;
+ }
+ switch (deleteFile.filecontent()) {
+ case
::substrait::ReadRel_LocalFiles_FileOrFiles_IcebergReadOptions_FileContent_POSITION_DELETES:
+ fileContent = FileContent::kPositionalDeletes;
+ break;
+ case
::substrait::ReadRel_LocalFiles_FileOrFiles_IcebergReadOptions_FileContent_EQUALITY_DELETES:
+ fileContent = FileContent::kEqualityDeletes;
+ break;
+ default:
+ fileContent = FileContent::kData;
+ break;
+ }
+ deletes.emplace_back(IcebergDeleteFile(
+ fileContent, deleteFile.filepath(), format,
deleteFile.recordcount(), deleteFile.filesize()));
+ }
+ icebergSplitInfo->deleteFilesVec.emplace_back(deletes);
+ } else {
+ // Add an empty delete files vector to indicate that this data file has no
delete file.
+
icebergSplitInfo->deleteFilesVec.emplace_back(std::vector<IcebergDeleteFile>{});
+ }
+
+ return icebergSplitInfo;
+}
+
+} // namespace gluten
diff --git
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
b/cpp/velox/compute/iceberg/IcebergPlanConverter.h
similarity index 53%
copy from
gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
copy to cpp/velox/compute/iceberg/IcebergPlanConverter.h
index 3452836cf..d634a861f 100644
---
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
+++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.h
@@ -14,24 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.glutenproject.substrait.rel;
-import java.util.List;
-import java.util.Map;
+#pragma once
-public class IcebergLocalFilesBuilder {
+#include "substrait/SubstraitToVeloxPlan.h"
+#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
- // TODO: Add makeIcebergLocalFiles for MOR iceberg table
+using namespace facebook::velox::connector::hive::iceberg;
- public static IcebergLocalFilesNode makeIcebergLocalFiles(
- Integer index,
- List<String> paths,
- List<Long> starts,
- List<Long> lengths,
- List<Map<String, String>> partitionColumns,
- LocalFilesNode.ReadFileFormat fileFormat,
- List<String> preferredLocations) {
- return new IcebergLocalFilesNode(
- index, paths, starts, lengths, partitionColumns, fileFormat,
preferredLocations);
+namespace gluten {
+struct IcebergSplitInfo : SplitInfo {
+ std::vector<std::vector<IcebergDeleteFile>> deleteFilesVec;
+
+ IcebergSplitInfo(const SplitInfo& splitInfo) : SplitInfo(splitInfo) {
+ // Reserve the actual size of the deleteFilesVec.
+ deleteFilesVec.reserve(splitInfo.paths.capacity());
}
-}
+};
+
+class IcebergPlanConverter {
+ public:
+ static std::shared_ptr<IcebergSplitInfo> parseIcebergSplitInfo(
+ substrait::ReadRel_LocalFiles_FileOrFiles file,
+ std::shared_ptr<SplitInfo> splitInfo);
+};
+
+} // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 895c1d24e..59d3312cb 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -50,6 +50,9 @@ struct SplitInfo {
/// The file format of the files to be scanned.
dwio::common::FileFormat format;
+
+ /// Make SplitInfo polymorphic
+ virtual ~SplitInfo() = default;
};
/// This class is used to convert the Substrait plan into Velox plan.
diff --git
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
index e0700ded2..852d7558e 100644
---
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
+++
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
@@ -50,7 +50,7 @@ public class LocalFilesNode implements SplitInfo {
UnknownFormat()
}
- private ReadFileFormat fileFormat = ReadFileFormat.UnknownFormat;
+ protected ReadFileFormat fileFormat = ReadFileFormat.UnknownFormat;
private Boolean iterAsInput = false;
private StructType fileSchema;
private Map<String, String> fileReadProperties;
@@ -112,6 +112,13 @@ public class LocalFilesNode implements SplitInfo {
return this.preferredLocations;
}
+ /**
+ * Data Lake formats require some additional processing to be done on the
FileBuilder, such as
+ * inserting delete files information. Different lake formats should
override this method to
+ * implement their corresponding logic.
+ */
+ protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder
fileBuilder) {}
+
public ReadRel.LocalFiles toProtobuf() {
ReadRel.LocalFiles.Builder localFilesBuilder =
ReadRel.LocalFiles.newBuilder();
// The input is iterator, and the path is in the format of: Iterator:index.
@@ -210,6 +217,7 @@ public class LocalFilesNode implements SplitInfo {
default:
break;
}
+ processFileBuilder(fileBuilder);
localFilesBuilder.addItems(fileBuilder.build());
}
return localFilesBuilder.build();
diff --git
a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
index 63a0f36ea..e9ed0f5ef 100644
--- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
+++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -149,6 +149,28 @@ message ReadRel {
uint64 max_block_size = 1;
NamedStruct schema = 2 [deprecated=true];
}
+ message IcebergReadOptions {
+ enum FileContent {
+ DATA = 0;
+ POSITION_DELETES = 1;
+ EQUALITY_DELETES = 2;
+ }
+ message DeleteFile {
+ FileContent fileContent = 1;
+ string filePath = 2;
+ uint64 fileSize = 3;
+ uint64 recordCount = 4;
+ oneof file_format {
+ ParquetReadOptions parquet = 5;
+ OrcReadOptions orc = 6;
+ }
+ }
+ oneof file_format {
+ ParquetReadOptions parquet = 1;
+ OrcReadOptions orc = 2;
+ }
+ repeated DeleteFile delete_files = 3;
+ }
// File reading options
oneof file_format {
@@ -159,22 +181,23 @@ message ReadRel {
DwrfReadOptions dwrf = 13;
TextReadOptions text = 14;
JsonReadOptions json = 15;
+ IcebergReadOptions iceberg = 16;
}
message partitionColumn {
string key = 1;
string value = 2;
}
- repeated partitionColumn partition_columns = 16;
+ repeated partitionColumn partition_columns = 17;
/// File schema
- NamedStruct schema = 17;
+ NamedStruct schema = 18;
message metadataColumn {
string key = 1;
string value = 2;
}
- repeated metadataColumn metadata_columns = 18;
+ repeated metadataColumn metadata_columns = 19;
}
}
}
diff --git
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
index 3452836cf..773f3073b 100644
---
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
+++
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java
@@ -16,13 +16,12 @@
*/
package io.glutenproject.substrait.rel;
+import org.apache.iceberg.DeleteFile;
+
import java.util.List;
import java.util.Map;
public class IcebergLocalFilesBuilder {
-
- // TODO: Add makeIcebergLocalFiles for MOR iceberg table
-
public static IcebergLocalFilesNode makeIcebergLocalFiles(
Integer index,
List<String> paths,
@@ -30,8 +29,16 @@ public class IcebergLocalFilesBuilder {
List<Long> lengths,
List<Map<String, String>> partitionColumns,
LocalFilesNode.ReadFileFormat fileFormat,
- List<String> preferredLocations) {
+ List<String> preferredLocations,
+ Map<String, List<DeleteFile>> deleteFilesMap) {
return new IcebergLocalFilesNode(
- index, paths, starts, lengths, partitionColumns, fileFormat,
preferredLocations);
+ index,
+ paths,
+ starts,
+ lengths,
+ partitionColumns,
+ fileFormat,
+ preferredLocations,
+ deleteFilesMap);
}
}
diff --git
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
index 98cc0d90e..903bd198a 100644
---
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
+++
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
@@ -16,40 +16,18 @@
*/
package io.glutenproject.substrait.rel;
+import io.glutenproject.GlutenConfig;
+
+import io.substrait.proto.ReadRel;
+import org.apache.iceberg.DeleteFile;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
public class IcebergLocalFilesNode extends LocalFilesNode {
-
- class DeleteFile {
- private final String path;
- private final Integer fileContent;
- private final ReadFileFormat fileFormat;
- private final Long fileSize;
- private final Long recordCount;
- private final Map<Integer, String> lowerBounds;
- private final Map<Integer, String> upperBounds;
-
- DeleteFile(
- String path,
- Integer fileContent,
- ReadFileFormat fileFormat,
- Long fileSize,
- Long recordCount,
- Map<Integer, String> lowerBounds,
- Map<Integer, String> upperBounds) {
- this.path = path;
- this.fileContent = fileContent;
- this.fileFormat = fileFormat;
- this.fileSize = fileSize;
- this.recordCount = recordCount;
- this.lowerBounds = lowerBounds;
- this.upperBounds = upperBounds;
- }
- }
-
- // TODO: Add delete file support for MOR iceberg table
+ private final Map<String, List<DeleteFile>> deleteFilesMap;
IcebergLocalFilesNode(
Integer index,
@@ -58,7 +36,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
List<Long> lengths,
List<Map<String, String>> partitionColumns,
ReadFileFormat fileFormat,
- List<String> preferredLocations) {
+ List<String> preferredLocations,
+ Map<String, List<DeleteFile>> deleteFilesMap) {
super(
index,
paths,
@@ -68,5 +47,76 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
new ArrayList<>(),
fileFormat,
preferredLocations);
+ this.deleteFilesMap = deleteFilesMap;
+ }
+
+ @Override
+ protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder
fileBuilder) {
+ List<DeleteFile> deleteFiles =
+ deleteFilesMap.getOrDefault(fileBuilder.getUriFile(),
Collections.emptyList());
+ ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.Builder icebergBuilder =
+ ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.newBuilder();
+
+ switch (fileFormat) {
+ case ParquetReadFormat:
+ ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions =
+ ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder()
+ .setEnableRowGroupMaxminIndex(
+ GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex())
+ .build();
+ icebergBuilder.setParquet(parquetReadOptions);
+ break;
+ case OrcReadFormat:
+ ReadRel.LocalFiles.FileOrFiles.OrcReadOptions orcReadOptions =
+ ReadRel.LocalFiles.FileOrFiles.OrcReadOptions.newBuilder().build();
+ icebergBuilder.setOrc(orcReadOptions);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported file format " + fileFormat.name() + " for iceberg
data file.");
+ }
+
+ for (DeleteFile delete : deleteFiles) {
+ ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.DeleteFile.Builder
deleteFileBuilder =
+
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.DeleteFile.newBuilder();
+ ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent
fileContent;
+ switch (delete.content()) {
+ case EQUALITY_DELETES:
+ fileContent =
+
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent.EQUALITY_DELETES;
+ break;
+ case POSITION_DELETES:
+ fileContent =
+
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent.POSITION_DELETES;
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported FileCount " + delete.content().name() + " for
delete file.");
+ }
+ deleteFileBuilder.setFileContent(fileContent);
+ deleteFileBuilder.setFilePath(delete.path().toString());
+ deleteFileBuilder.setFileSize(delete.fileSizeInBytes());
+ deleteFileBuilder.setRecordCount(delete.recordCount());
+ switch (delete.format()) {
+ case PARQUET:
+ ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions
=
+ ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder()
+ .setEnableRowGroupMaxminIndex(
+
GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex())
+ .build();
+ deleteFileBuilder.setParquet(parquetReadOptions);
+ break;
+ case ORC:
+ ReadRel.LocalFiles.FileOrFiles.OrcReadOptions orcReadOptions =
+
ReadRel.LocalFiles.FileOrFiles.OrcReadOptions.newBuilder().build();
+ deleteFileBuilder.setOrc(orcReadOptions);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported format " + delete.format().name() + " for delete
file.");
+ }
+ icebergBuilder.addDeleteFiles(deleteFileBuilder);
+ }
+ fileBuilder.setIceberg(icebergBuilder);
}
}
diff --git
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index 6607ff3b9..74add7a9a 100644
---
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -24,10 +24,10 @@ import
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.types.StructType
-import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask,
ScanTask}
+import org.apache.iceberg.{CombinedScanTask, DeleteFile, FileFormat,
FileScanTask, ScanTask}
import java.lang.{Long => JLong}
-import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList,
Map => JMap}
import scala.collection.JavaConverters._
@@ -39,22 +39,21 @@ object GlutenIcebergSourceUtil {
val starts = new JArrayList[JLong]()
val lengths = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]()
+ val deleteFilesMap = new JHashMap[String, JList[DeleteFile]]()
var fileFormat = ReadFileFormat.UnknownFormat
val tasks = partition.taskGroup[ScanTask]().tasks().asScala
asFileScanTask(tasks.toList).foreach {
task =>
- paths.add(task.file().path().toString)
+ val filePath = task.file().path().toString
+ paths.add(filePath)
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(getPartitionColumns(task))
- val currentFileFormat = task.file().format() match {
- case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat
- case FileFormat.ORC => ReadFileFormat.OrcReadFormat
- case _ =>
- throw new UnsupportedOperationException(
- "Iceberg Only support parquet and orc file format.")
+ if (!task.deletes().isEmpty) {
+ deleteFilesMap.put(filePath, task.deletes())
}
+ val currentFileFormat = convertFileFormat(task.file().format())
if (fileFormat == ReadFileFormat.UnknownFormat) {
fileFormat = currentFileFormat
} else if (fileFormat != currentFileFormat) {
@@ -73,7 +72,8 @@ object GlutenIcebergSourceUtil {
lengths,
partitionColumns,
fileFormat,
- preferredLoc.toList.asJava
+ preferredLoc.toList.asJava,
+ deleteFilesMap
)
case _ =>
throw new UnsupportedOperationException("Only support iceberg
SparkInputPartition.")
@@ -152,4 +152,12 @@ object GlutenIcebergSourceUtil {
}
partitionColumns
}
+
+ def convertFileFormat(icebergFileFormat: FileFormat): ReadFileFormat =
+ icebergFileFormat match {
+ case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat
+ case FileFormat.ORC => ReadFileFormat.OrcReadFormat
+ case _ =>
+ throw new UnsupportedOperationException("Iceberg Only support parquet
and orc file format.")
+ }
}
diff --git
a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala
b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala
index 019c60295..6b332641e 100644
---
a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala
@@ -59,15 +59,17 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
}
test("iceberg transformer exists") {
- spark.sql("""
- |create table iceberg_tb using iceberg as
- |(select 1 as col1, 2 as col2, 3 as col3)
- |""".stripMargin)
+ withTable("iceberg_tb") {
+ spark.sql("""
+ |create table iceberg_tb using iceberg as
+ |(select 1 as col1, 2 as col2, 3 as col3)
+ |""".stripMargin)
- runQueryAndCompare("""
- |select * from iceberg_tb;
- |""".stripMargin) {
- checkOperatorMatch[IcebergScanTransformer]
+ runQueryAndCompare("""
+ |select * from iceberg_tb;
+ |""".stripMargin) {
+ checkOperatorMatch[IcebergScanTransformer]
+ }
}
}
@@ -314,4 +316,126 @@ class VeloxIcebergSuite extends
WholeStageTransformerSuite {
}
}
}
+
+ test("iceberg read mor table - delete and update") {
+ withTable("iceberg_mor_tb") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+ spark.sql("""
+ |create table iceberg_mor_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'merge-on-read',
+ | 'write.update.mode' = 'merge-on-read',
+ | 'write.merge.mode' = 'merge-on-read'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql("""
+ |insert into table iceberg_mor_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+ | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+ |""".stripMargin)
+
+ // Delete row.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where name = 'a1';
+ |""".stripMargin
+ )
+ // Update row.
+ spark.sql(
+ """
+ |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+ |""".stripMargin
+ )
+ // Delete row again.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where id = 6;
+ |""".stripMargin
+ )
+ }
+ runQueryAndCompare("""
+ |select * from iceberg_mor_tb;
+ |""".stripMargin) {
+ checkOperatorMatch[IcebergScanTransformer]
+ }
+ }
+ }
+
+ test("iceberg read mor table - merge into") {
+ withTable("iceberg_mor_tb", "merge_into_source_tb") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+ spark.sql("""
+ |create table iceberg_mor_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'merge-on-read',
+ | 'write.update.mode' = 'merge-on-read',
+ | 'write.merge.mode' = 'merge-on-read'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+ spark.sql("""
+ |create table merge_into_source_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg;
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql("""
+ |insert into table iceberg_mor_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
+ |""".stripMargin)
+ spark.sql("""
+ |insert into table merge_into_source_tb
+ |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1',
'p1'),
+ | (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
+ |""".stripMargin)
+
+ // Delete row.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where name = 'a1';
+ |""".stripMargin
+ )
+ // Update row.
+ spark.sql(
+ """
+ |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+ |""".stripMargin
+ )
+
+ // Merge into.
+ spark.sql(
+ """
+ |merge into iceberg_mor_tb t
+ |using (select * from merge_into_source_tb) s
+ |on t.id = s.id
+ |when matched then
+ | update set t.name = s.name, t.p = s.p
+ |when not matched then
+ | insert (id, name, p) values (s.id, s.name, s.p);
+ |""".stripMargin
+ )
+ }
+ runQueryAndCompare("""
+ |select * from iceberg_mor_tb;
+ |""".stripMargin) {
+ checkOperatorMatch[IcebergScanTransformer]
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]