This is an automated email from the ASF dual-hosted git repository.

felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d5e55446f0 Add compression codec extension to velox written parquet 
file (#8000)
d5e55446f0 is described below

commit d5e55446f0c57173d0a3b5004bf25d824ae54de2
Author: Joey <[email protected]>
AuthorDate: Thu Nov 21 14:30:52 2024 +0800

    Add compression codec extension to velox written parquet file (#8000)
    
    To align with Vanilla's parquet file name. Like 
gluten-part-b35dab49-3eb9-434b-abb9-7de2bc180a06.snappy.parquet
---
 .../sql/execution/VeloxParquetWriteSuite.scala     | 21 ++++++++++--
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        | 38 ++++++++++++++++++++--
 2 files changed, 54 insertions(+), 5 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index 4c76c753b9..d19d279fbb 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -31,6 +31,22 @@ class VeloxParquetWriteSuite extends 
VeloxWholeStageTransformerSuite {
   override protected val resourcePath: String = "/tpch-data-parquet"
   override protected val fileFormat: String = "parquet"
 
+  // The parquet compression codec extensions
+  private val parquetCompressionCodecExtensions = Map(
+    "none" -> "",
+    "uncompressed" -> "",
+    "snappy" -> ".snappy",
+    "gzip" -> ".gz",
+    "lzo" -> ".lzo",
+    "lz4" -> ".lz4",
+    "brotli" -> ".br",
+    "zstd" -> ".zstd"
+  )
+
+  private def getParquetFileExtension(codec: String): String = {
+    s"${parquetCompressionCodecExtensions(codec)}.parquet"
+  }
+
   override def beforeAll(): Unit = {
     super.beforeAll()
     createTPCHNotNullTables()
@@ -49,8 +65,8 @@ class VeloxParquetWriteSuite extends 
VeloxWholeStageTransformerSuite {
           spark.sql("select array(struct(1), null) as 
var1").write.mode("overwrite").save(path)
         }
         assert(
-          testAppender.loggingEvents.exists(
-            _.getMessage.toString.contains("Use Gluten parquet write for 
hive")) == false)
+          !testAppender.loggingEvents.exists(
+            _.getMessage.toString.contains("Use Gluten parquet write for 
hive")))
     }
   }
 
@@ -77,6 +93,7 @@ class VeloxParquetWriteSuite extends 
VeloxWholeStageTransformerSuite {
                     parquetFiles.forall {
                       file =>
                         val path = new Path(f.getCanonicalPath, file)
+                        assert(file.endsWith(getParquetFileExtension(codec)))
                         val in = HadoopInputFile.fromPath(path, 
spark.sessionState.newHadoopConf())
                         Utils.tryWithResource(ParquetFileReader.open(in)) {
                           reader =>
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index cdd9269e14..1efa733879 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -493,13 +493,43 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
   }
 }
 
+std::string makeUuid() {
+  return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
+}
+
+std::string compressionFileNameSuffix(common::CompressionKind kind) {
+  switch (static_cast<int32_t>(kind)) {
+    case common::CompressionKind_ZLIB:
+      return ".zlib";
+    case common::CompressionKind_SNAPPY:
+      return ".snappy";
+    case common::CompressionKind_LZO:
+      return ".lzo";
+    case common::CompressionKind_ZSTD:
+      return ".zstd";
+    case common::CompressionKind_LZ4:
+      return ".lz4";
+    case common::CompressionKind_GZIP:
+      return ".gz";
+    case common::CompressionKind_NONE:
+    default:
+      return "";
+  }
+}
+
 std::shared_ptr<connector::hive::LocationHandle> makeLocationHandle(
     const std::string& targetDirectory,
+    dwio::common::FileFormat fileFormat,
+    common::CompressionKind compression,
     const std::optional<std::string>& writeDirectory = std::nullopt,
     const connector::hive::LocationHandle::TableType& tableType =
         connector::hive::LocationHandle::TableType::kExisting) {
+  std::string targetFileName = "";
+  if (fileFormat == dwio::common::FileFormat::PARQUET) {
+    targetFileName = fmt::format("gluten-part-{}{}{}", makeUuid(), 
compressionFileNameSuffix(compression), ".parquet");
+  }
   return std::make_shared<connector::hive::LocationHandle>(
-      targetDirectory, writeDirectory.value_or(targetDirectory), tableType);
+      targetDirectory, writeDirectory.value_or(targetDirectory), tableType, 
targetFileName);
 }
 
 std::shared_ptr<connector::hive::HiveInsertTableHandle> 
makeHiveInsertTableHandle(
@@ -615,6 +645,8 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
 
   // Do not hard-code connector ID and allow for connectors other than Hive.
   static const std::string kHiveConnectorId = "test-hive";
+  // Currently only support parquet format.
+  dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET;
 
   return std::make_shared<core::TableWriteNode>(
       nextPlanNodeId(),
@@ -628,8 +660,8 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
               inputType->children(),
               partitionedKey,
               nullptr /*bucketProperty*/,
-              makeLocationHandle(writePath),
-              dwio::common::FileFormat::PARQUET, // Currently only support 
parquet format.
+              makeLocationHandle(writePath, fileFormat, compressionCodec),
+              fileFormat,
               compressionCodec)),
       (!partitionedKey.empty()),
       exec::TableWriteTraits::outputType(nullptr),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to