This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d53b6e53c [CELEBORN-946][GLUTEN] Record read metric should be
compatible with Gluten shuffle dependency
d53b6e53c is described below
commit d53b6e53c719d588da7f68185a2c5b8c453daa48
Author: xiyu.zk <[email protected]>
AuthorDate: Tue Sep 5 18:34:12 2023 +0800
[CELEBORN-946][GLUTEN] Record read metric should be compatible with Gluten
shuffle dependency
### What changes were proposed in this pull request?
Currently judging whether it is a Gluten shuffle through serde is only
applicable to Velox Backend. In order to adapt to ClickHouse Backend at the
same time, it is more generic to use ColumnarShuffleDependency as the judgment
basis.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1878 from kerwin-zk/gluten.
Authored-by: xiyu.zk <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 4 ++--
...BatchSerdeHelper.scala => GlutenShuffleDependencyHelper.scala} | 8 ++++----
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index f07ed4989..43cb22efd 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -131,8 +131,8 @@ class CelebornShuffleReader[K, C](
serializerInstance.deserializeStream(_).asKeyValueIterator)
val iterWithUpdatedRecordsRead =
- if
(GlutenColumnarBatchSerdeHelper.isGlutenSerde(serializerInstance.getClass.getName))
{
- GlutenColumnarBatchSerdeHelper.withUpdatedRecordsRead(recordIter,
metrics)
+ if (GlutenShuffleDependencyHelper.isGlutenDep(dep.getClass.getName)) {
+ GlutenShuffleDependencyHelper.withUpdatedRecordsRead(recordIter,
metrics)
} else {
recordIter.map { record =>
metrics.incRecordsRead(1)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
similarity index 80%
rename from
client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
rename to
client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
index 259bb954d..4d743cda8 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
@@ -23,14 +23,14 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
/**
* A helper class to be compatible with Gluten Celeborn.
*/
-object GlutenColumnarBatchSerdeHelper {
+object GlutenShuffleDependencyHelper {
- def isGlutenSerde(serdeName: String): Boolean = {
+ def isGlutenDep(depName: String): Boolean = {
// scalastyle:off
// see Gluten
- //
https://github.com/oap-project/gluten/blob/main/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala
+ //
https://github.com/oap-project/gluten/blob/main/gluten-core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleDependency.scala
// scalastyle:on
-
"org.apache.spark.shuffle.CelebornColumnarBatchSerializer".equals(serdeName)
+ "org.apache.spark.shuffle.ColumnarShuffleDependency".equals(depName)
}
def withUpdatedRecordsRead(