This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new de267858d [CH][CELEBORN] CHCelebornColumnarShuffleWriter supports
celeborn.client.spark.shuffle.writer to use memory sort shuffle in ClickHouse
backend (#6454)
de267858d is described below
commit de267858db6f3cdf09732f7d4402f51d40f16bf3
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Jul 15 15:49:34 2024 +0800
[CH][CELEBORN] CHCelebornColumnarShuffleWriter supports
celeborn.client.spark.shuffle.writer to use memory sort shuffle in ClickHouse
backend (#6454)
[CH][CELEBORN] CHCelebornColumnarShuffleWriter supports
celeborn.client.spark.shuffle.writer to use memory sort shuffle in ClickHouse
backend
---
.github/workflows/clickhouse_be_trigger.yml | 6 +++---
.../org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala | 5 +++--
2 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/clickhouse_be_trigger.yml
b/.github/workflows/clickhouse_be_trigger.yml
index 11e754355..7f45f7ca7 100644
--- a/.github/workflows/clickhouse_be_trigger.yml
+++ b/.github/workflows/clickhouse_be_trigger.yml
@@ -22,9 +22,9 @@ on:
- '.github/workflows/clickhouse_be_trigger.yml'
- 'pom.xml'
- 'backends-clickhouse/**'
- - 'gluten-celeborn/common'
- - 'gluten-celeborn/package'
- - 'gluten-celeborn/clickhouse'
+ - 'gluten-celeborn/common/**'
+ - 'gluten-celeborn/package/**'
+ - 'gluten-celeborn/clickhouse/**'
- 'gluten-core/**'
- 'gluten-ut/**'
- 'shims/**'
diff --git
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
index e5cd3d22f..40390a715 100644
---
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
+++
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
@@ -63,7 +63,7 @@ class CHCelebornColumnarShuffleWriter[K, V](
if (cb.numRows == 0 || cb.numCols == 0) {
logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols}
cols")
} else {
- initShuffleWriter()
+ initShuffleWriter(cb)
val col = cb.column(0).asInstanceOf[CHColumnVector]
jniWrapper.split(nativeShuffleWriter, col.getBlockAddress)
dep.metrics("numInputRows").add(cb.numRows)
@@ -118,7 +118,8 @@ class CHCelebornColumnarShuffleWriter[K, V](
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException(
"Fatal: spill() called before a celeborn shuffle writer is
created. " +
- "This behavior should be optimized by moving memory
allocations from make() to split()")
+ "This behavior should be optimized by moving memory
allocations " +
+ "from make() to split()")
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
val spilled = jniWrapper.evict(nativeShuffleWriter)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]