This is an automated email from the ASF dual-hosted git repository.
felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d5e55446f0 Add compression codec extension to velox written parquet
file (#8000)
d5e55446f0 is described below
commit d5e55446f0c57173d0a3b5004bf25d824ae54de2
Author: Joey <[email protected]>
AuthorDate: Thu Nov 21 14:30:52 2024 +0800
Add compression codec extension to velox written parquet file (#8000)
To align with Vanilla's parquet file name. Like
gluten-part-b35dab49-3eb9-434b-abb9-7de2bc180a06.snappy.parquet
---
.../sql/execution/VeloxParquetWriteSuite.scala | 21 ++++++++++--
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 38 ++++++++++++++++++++--
2 files changed, 54 insertions(+), 5 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index 4c76c753b9..d19d279fbb 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -31,6 +31,22 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
override protected val resourcePath: String = "/tpch-data-parquet"
override protected val fileFormat: String = "parquet"
+ // The parquet compression codec extensions
+ private val parquetCompressionCodecExtensions = Map(
+ "none" -> "",
+ "uncompressed" -> "",
+ "snappy" -> ".snappy",
+ "gzip" -> ".gz",
+ "lzo" -> ".lzo",
+ "lz4" -> ".lz4",
+ "brotli" -> ".br",
+ "zstd" -> ".zstd"
+ )
+
+ private def getParquetFileExtension(codec: String): String = {
+ s"${parquetCompressionCodecExtensions(codec)}.parquet"
+ }
+
override def beforeAll(): Unit = {
super.beforeAll()
createTPCHNotNullTables()
@@ -49,8 +65,8 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
spark.sql("select array(struct(1), null) as
var1").write.mode("overwrite").save(path)
}
assert(
- testAppender.loggingEvents.exists(
- _.getMessage.toString.contains("Use Gluten parquet write for
hive")) == false)
+ !testAppender.loggingEvents.exists(
+ _.getMessage.toString.contains("Use Gluten parquet write for
hive")))
}
}
@@ -77,6 +93,7 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
parquetFiles.forall {
file =>
val path = new Path(f.getCanonicalPath, file)
+ assert(file.endsWith(getParquetFileExtension(codec)))
val in = HadoopInputFile.fromPath(path,
spark.sessionState.newHadoopConf())
Utils.tryWithResource(ParquetFileReader.open(in)) {
reader =>
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index cdd9269e14..1efa733879 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -493,13 +493,43 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}
}
+std::string makeUuid() {
+ return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
+}
+
+std::string compressionFileNameSuffix(common::CompressionKind kind) {
+ switch (static_cast<int32_t>(kind)) {
+ case common::CompressionKind_ZLIB:
+ return ".zlib";
+ case common::CompressionKind_SNAPPY:
+ return ".snappy";
+ case common::CompressionKind_LZO:
+ return ".lzo";
+ case common::CompressionKind_ZSTD:
+ return ".zstd";
+ case common::CompressionKind_LZ4:
+ return ".lz4";
+ case common::CompressionKind_GZIP:
+ return ".gz";
+ case common::CompressionKind_NONE:
+ default:
+ return "";
+ }
+}
+
std::shared_ptr<connector::hive::LocationHandle> makeLocationHandle(
const std::string& targetDirectory,
+ dwio::common::FileFormat fileFormat,
+ common::CompressionKind compression,
const std::optional<std::string>& writeDirectory = std::nullopt,
const connector::hive::LocationHandle::TableType& tableType =
connector::hive::LocationHandle::TableType::kExisting) {
+ std::string targetFileName = "";
+ if (fileFormat == dwio::common::FileFormat::PARQUET) {
+ targetFileName = fmt::format("gluten-part-{}{}{}", makeUuid(),
compressionFileNameSuffix(compression), ".parquet");
+ }
return std::make_shared<connector::hive::LocationHandle>(
- targetDirectory, writeDirectory.value_or(targetDirectory), tableType);
+ targetDirectory, writeDirectory.value_or(targetDirectory), tableType,
targetFileName);
}
std::shared_ptr<connector::hive::HiveInsertTableHandle>
makeHiveInsertTableHandle(
@@ -615,6 +645,8 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
// Do not hard-code connector ID and allow for connectors other than Hive.
static const std::string kHiveConnectorId = "test-hive";
+ // Currently only support parquet format.
+ dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET;
return std::make_shared<core::TableWriteNode>(
nextPlanNodeId(),
@@ -628,8 +660,8 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
inputType->children(),
partitionedKey,
nullptr /*bucketProperty*/,
- makeLocationHandle(writePath),
- dwio::common::FileFormat::PARQUET, // Currently only support
parquet format.
+ makeLocationHandle(writePath, fileFormat, compressionCodec),
+ fileFormat,
compressionCodec)),
(!partitionedKey.empty()),
exec::TableWriteTraits::outputType(nullptr),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]