This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 48788c437 [GLUTEN-6470][CH]Fix Task not serializable error when 
inserting mergetree data (#6473)
48788c437 is described below

commit 48788c437d626da3400791555976b5004beb154d
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Jul 17 09:30:50 2024 +0800

    [GLUTEN-6470][CH]Fix Task not serializable error when inserting mergetree 
data (#6473)
    
    * [GLUTEN-6470][CH]Fix Task not serializable error when inserting mergetree 
data
    
    When inserting mergetree data, it occurs the Task not serializable error in 
some cases.
    
    RC:
    In the Delta, the options of the `DeltaOptions` is a `CaseInsensitiveMap`, 
if calling the api `filterKeys()` of the `CaseInsensitiveMap`, it may become 
not serializable, so lead to this error.
    
    Close #6470.
    
    * fix ut
    
    * add ut
---
 .../delta/ClickhouseOptimisticTransaction.scala    |  12 ++-
 .../delta/ClickhouseOptimisticTransaction.scala    |  12 ++-
 .../backendsapi/clickhouse/CHListenerApi.scala     |  10 +-
 .../utils/MergeTreePartsPartitionsUtil.scala       |   1 +
 ...tenClickHouseMergeTreePathBasedWriteSuite.scala |   4 +-
 ...useMergeTreeWriteTaskNotSerializableSuite.scala | 120 +++++++++++++++++++++
 .../apache/spark/softaffinity/SoftAffinity.scala   |   2 +-
 7 files changed, 143 insertions(+), 18 deletions(-)

diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 0794b4515..4133b5c60 100644
--- 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -116,11 +116,13 @@ class ClickhouseOptimisticTransaction(
         var options = writeOptions match {
           case None => Map.empty[String, String]
           case Some(writeOptions) =>
-            writeOptions.options.filterKeys {
-              key =>
-                key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
-                key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
-            }.toMap
+            writeOptions.options
+              .filterKeys {
+                key =>
+                  key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
+                  key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
+              }
+              .map(identity)
         }
 
         spark.conf.getAll.foreach(
diff --git 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 0794b4515..4133b5c60 100644
--- 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -116,11 +116,13 @@ class ClickhouseOptimisticTransaction(
         var options = writeOptions match {
           case None => Map.empty[String, String]
           case Some(writeOptions) =>
-            writeOptions.options.filterKeys {
-              key =>
-                key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
-                key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
-            }.toMap
+            writeOptions.options
+              .filterKeys {
+                key =>
+                  key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
+                  key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
+              }
+              .map(identity)
         }
 
         spark.conf.getAll.foreach(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 43e0627df..0110d085b 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -84,11 +84,11 @@ class CHListenerApi extends ListenerApi with Logging {
       s".max_bytes_before_external_sort"
     if (conf.getLong(externalSortKey, -1) < 0) {
       if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
-        val memSize = 
JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size")).toInt
-        if (memSize > 0) {
-          val cores = conf.getInt("spark.executor.cores", 1)
-          val sortMemLimit = ((memSize / cores) * 0.8).toInt
-          logInfo(s"max memory for sorting: $sortMemLimit")
+        val memSize = 
JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size"))
+        if (memSize > 0L) {
+          val cores = conf.getInt("spark.executor.cores", 1).toLong
+          val sortMemLimit = ((memSize / cores) * 0.8).toLong
+          logDebug(s"max memory for sorting: $sortMemLimit")
           conf.set(externalSortKey, sortMemLimit.toString)
         }
       }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
index ac6ac959f..228dc9feb 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
@@ -231,6 +231,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
                 size * part.size / part.marks)
           }
       }
+      .sortBy(_.bytesOnDisk)(implicitly[Ordering[Long]].reverse)
 
     var currentSize = 0L
     val currentFiles = new ArrayBuffer[MergeTreePartSplit]
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index 6c3d7dea0..129f5405c 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -282,8 +282,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
       .format("clickhouse")
       .load(dataPath)
       .where("l_shipdate = date'1998-09-02'")
-      .collect()
-    assertResult(110501)(result.apply(0).get(0))
+      .count()
+    assertResult(183L)(result)
   }
 
   test("test mergetree path based insert overwrite partitioned table with 
small table, static") {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala
new file mode 100644
index 000000000..e8550fb32
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite
+  extends GlutenClickHouseTPCHAbstractSuite
+  with AdaptiveSparkPlanHelper {
+
+  override protected val needCopyParquetToTablePath = true
+
+  override protected val tablesPath: String = basePath + "/tpch-data"
+  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
+  override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
+
+  /** Run Gluten + ClickHouse Backend with SortShuffleManager */
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.io.compression.codec", "LZ4")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.sql.files.maxPartitionBytes", "20000000")
+      .set("spark.memory.offHeap.size", "4G")
+  }
+
+  override protected def createTPCHNotNullTables(): Unit = {
+    createNotNullTPCHTablesInParquet(tablesPath)
+  }
+
+  test("GLUTEN-6470: Fix Task not serializable error when inserting mergetree 
data") {
+
+    val externalSortKey = 
s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
+      s".max_bytes_before_external_sort"
+    assertResult(3435973836L)(spark.conf.get(externalSortKey).toLong)
+
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_task_not_serializable;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_task_not_serializable
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$basePath/lineitem_task_not_serializable'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_task_not_serializable
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_task_not_serializable
+         |WHERE
+         |    l_shipdate <= date'1998-09-02' - interval 1 day
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runTPCHQueryBySQL(1, sqlStr)(_ => {})
+  }
+}
+// scalastyle:off line.size.limit
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala 
b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
index 86133542e..cfdce5360 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
@@ -39,7 +39,7 @@ abstract class Affinity(val manager: AffinityManager) extends 
LogLevelUtil with
       filePaths: Array[String],
       preferredLocations: Array[String]): Array[String] = {
     if (shouldUseSoftAffinity(filePaths, preferredLocations)) {
-      internalGetHostLocations(filePaths.min)
+      internalGetHostLocations(filePaths(0))
     } else {
       preferredLocations
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to