This is an automated email from the ASF dual-hosted git repository. kerwinzhang pushed a commit to branch gluten in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 584a437eeae0d6aecd8fe5988347ca009a16e432 Author: xiyu.zk <[email protected]> AuthorDate: Tue Sep 5 11:49:23 2023 +0800 [CELEBORN-946][GLUTEN] Record read metric should be compatible with Gluten shuffle dependency --- .../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 2 +- .../spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala index f07ed4989..ff07535f6 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala @@ -131,7 +131,7 @@ class CelebornShuffleReader[K, C]( serializerInstance.deserializeStream(_).asKeyValueIterator) val iterWithUpdatedRecordsRead = - if (GlutenColumnarBatchSerdeHelper.isGlutenSerde(serializerInstance.getClass.getName)) { + if (GlutenColumnarBatchSerdeHelper.isGlutenDep(dep.getClass.getName)) { GlutenColumnarBatchSerdeHelper.withUpdatedRecordsRead(recordIter, metrics) } else { recordIter.map { record => diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala index 259bb954d..942f5a2ed 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala @@ -25,12 +25,12 @@ import org.apache.spark.sql.vectorized.ColumnarBatch */ object GlutenColumnarBatchSerdeHelper { - def isGlutenSerde(serdeName: String): Boolean = { + def isGlutenDep(serdeName: String): Boolean = { // scalastyle:off // see Gluten - // https://github.com/oap-project/gluten/blob/main/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala + // https://github.com/oap-project/gluten/blob/main/gluten-core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleDependency.scala // scalastyle:on - "org.apache.spark.shuffle.CelebornColumnarBatchSerializer".equals(serdeName) + "org.apache.spark.shuffle.ColumnarShuffleDependency".equals(serdeName) } def withUpdatedRecordsRead(
