This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new f9cf5f3f6 [CELEBORN-917][GLUTEN] Record read metric should be
compatible with Gluten shuffle serde
f9cf5f3f6 is described below
commit f9cf5f3f61651796853eafe5ab5fc6006b1fdb9d
Author: youxiduo <[email protected]>
AuthorDate: Fri Aug 25 15:01:09 2023 +0800
[CELEBORN-917][GLUTEN] Record read metric should be compatible with Gluten
shuffle serde
### What changes were proposed in this pull request?
When updating record read metric, we should consider if the input record is
`ColumnarBatch`. So if the serde is the Gluten columnar batch, we should use
`ColumnarBatch.numRows`.
### Why are the changes needed?
Make the shuffle record read metric correct.
### Does this PR introduce _any_ user-facing change?
yes, the metrics changed
### How was this patch tested?
manually test
before:
<img width="415" alt="image"
src="https://github.com/apache/incubator-celeborn/assets/12025282/221ab814-4b02-4688-80ab-31f21cd900a4">
after:
<img width="415" alt="image"
src="https://github.com/apache/incubator-celeborn/assets/12025282/1c7257c0-2f30-41c3-9ea8-6bc5cda3de85">
Closes #1838 from ulysses-you/gluten.
Lead-authored-by: youxiduo <[email protected]>
Co-authored-by: Xiduo You <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit b3700dd169198af3982ff3525624eec2c1a59597)
Signed-off-by: Cheng Pan <[email protected]>
---
.../shuffle/celeborn/CelebornShuffleReader.scala | 14 +++++--
.../celeborn/GlutenColumnarBatchSerdeHelper.scala | 44 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 4 deletions(-)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 5ce891907..36ee31448 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -97,11 +97,17 @@ class CelebornShuffleReader[K, C](
}).flatMap(
serializerInstance.deserializeStream(_).asKeyValueIterator)
+ val iterWithUpdatedRecordsRead =
+ if
(GlutenColumnarBatchSerdeHelper.isGlutenSerde(serializerInstance.getClass.getName))
{
+ GlutenColumnarBatchSerdeHelper.withUpdatedRecordsRead(recordIter,
metrics)
+ } else {
+ recordIter.map { record =>
+ metrics.incRecordsRead(1)
+ record
+ }
+ }
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
- recordIter.map { record =>
- metrics.incRecordsRead(1)
- record
- },
+ iterWithUpdatedRecordsRead,
context.taskMetrics().mergeShuffleReadMetrics())
// An interruptible iterator must be used here in order to support task
cancellation
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
new file mode 100644
index 000000000..259bb954d
--- /dev/null
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.celeborn
+
+import org.apache.spark.shuffle.ShuffleReadMetricsReporter
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/**
+ * A helper class to be compatible with Gluten Celeborn.
+ */
+object GlutenColumnarBatchSerdeHelper {
+
+ def isGlutenSerde(serdeName: String): Boolean = {
+ // scalastyle:off
+ // see Gluten
+ //
https://github.com/oap-project/gluten/blob/main/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala
+ // scalastyle:on
+
"org.apache.spark.shuffle.CelebornColumnarBatchSerializer".equals(serdeName)
+ }
+
+ def withUpdatedRecordsRead(
+ input: Iterator[(Any, Any)],
+ metrics: ShuffleReadMetricsReporter): Iterator[(Any, Any)] = {
+ input.map { record =>
+ metrics.incRecordsRead(record._2.asInstanceOf[ColumnarBatch].numRows())
+ record
+ }
+ }
+}