This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 48788c437 [GLUTEN-6470][CH]Fix Task not serializable error when
inserting mergetree data (#6473)
48788c437 is described below
commit 48788c437d626da3400791555976b5004beb154d
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Jul 17 09:30:50 2024 +0800
[GLUTEN-6470][CH]Fix Task not serializable error when inserting mergetree
data (#6473)
* [GLUTEN-6470][CH]Fix Task not serializable error when inserting mergetree
data
When inserting mergetree data, it occurs the Task not serializable error in
some cases.
RC:
In the Delta, the options of the `DeltaOptions` is a `CaseInsensitiveMap`,
if calling the api `filterKeys()` of the `CaseInsensitiveMap`, it may become
not serializable, so lead to this error.
Close #6470.
* fix ut
* add ut
---
.../delta/ClickhouseOptimisticTransaction.scala | 12 ++-
.../delta/ClickhouseOptimisticTransaction.scala | 12 ++-
.../backendsapi/clickhouse/CHListenerApi.scala | 10 +-
.../utils/MergeTreePartsPartitionsUtil.scala | 1 +
...tenClickHouseMergeTreePathBasedWriteSuite.scala | 4 +-
...useMergeTreeWriteTaskNotSerializableSuite.scala | 120 +++++++++++++++++++++
.../apache/spark/softaffinity/SoftAffinity.scala | 2 +-
7 files changed, 143 insertions(+), 18 deletions(-)
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 0794b4515..4133b5c60 100644
---
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -116,11 +116,13 @@ class ClickhouseOptimisticTransaction(
var options = writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
- writeOptions.options.filterKeys {
- key =>
- key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
- key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
- }.toMap
+ writeOptions.options
+ .filterKeys {
+ key =>
+ key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
+ key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
+ }
+ .map(identity)
}
spark.conf.getAll.foreach(
diff --git
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 0794b4515..4133b5c60 100644
---
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -116,11 +116,13 @@ class ClickhouseOptimisticTransaction(
var options = writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
- writeOptions.options.filterKeys {
- key =>
- key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
- key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
- }.toMap
+ writeOptions.options
+ .filterKeys {
+ key =>
+ key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
+ key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
+ }
+ .map(identity)
}
spark.conf.getAll.foreach(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 43e0627df..0110d085b 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -84,11 +84,11 @@ class CHListenerApi extends ListenerApi with Logging {
s".max_bytes_before_external_sort"
if (conf.getLong(externalSortKey, -1) < 0) {
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
- val memSize =
JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size")).toInt
- if (memSize > 0) {
- val cores = conf.getInt("spark.executor.cores", 1)
- val sortMemLimit = ((memSize / cores) * 0.8).toInt
- logInfo(s"max memory for sorting: $sortMemLimit")
+ val memSize =
JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size"))
+ if (memSize > 0L) {
+ val cores = conf.getInt("spark.executor.cores", 1).toLong
+ val sortMemLimit = ((memSize / cores) * 0.8).toLong
+ logDebug(s"max memory for sorting: $sortMemLimit")
conf.set(externalSortKey, sortMemLimit.toString)
}
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
index ac6ac959f..228dc9feb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
@@ -231,6 +231,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
size * part.size / part.marks)
}
}
+ .sortBy(_.bytesOnDisk)(implicitly[Ordering[Long]].reverse)
var currentSize = 0L
val currentFiles = new ArrayBuffer[MergeTreePartSplit]
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index 6c3d7dea0..129f5405c 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -282,8 +282,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.format("clickhouse")
.load(dataPath)
.where("l_shipdate = date'1998-09-02'")
- .collect()
- assertResult(110501)(result.apply(0).get(0))
+ .count()
+ assertResult(183L)(result)
}
test("test mergetree path based insert overwrite partitioned table with
small table, static") {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala
new file mode 100644
index 000000000..e8550fb32
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val needCopyParquetToTablePath = true
+
+ override protected val tablesPath: String = basePath + "/tpch-data"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+
+ /** Run Gluten + ClickHouse Backend with SortShuffleManager */
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.sql.files.maxPartitionBytes", "20000000")
+ .set("spark.memory.offHeap.size", "4G")
+ }
+
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ test("GLUTEN-6470: Fix Task not serializable error when inserting mergetree
data") {
+
+ val externalSortKey =
s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
+ s".max_bytes_before_external_sort"
+ assertResult(3435973836L)(spark.conf.get(externalSortKey).toLong)
+
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_task_not_serializable;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_task_not_serializable
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_task_not_serializable'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_task_not_serializable
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_task_not_serializable
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr)(_ => {})
+ }
+}
+// scalastyle:off line.size.limit
diff --git
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
index 86133542e..cfdce5360 100644
---
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
@@ -39,7 +39,7 @@ abstract class Affinity(val manager: AffinityManager) extends
LogLevelUtil with
filePaths: Array[String],
preferredLocations: Array[String]): Array[String] = {
if (shouldUseSoftAffinity(filePaths, preferredLocations)) {
- internalGetHostLocations(filePaths.min)
+ internalGetHostLocations(filePaths(0))
} else {
preferredLocations
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]