This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a31df44bd [CH][CELEBORN] CHCelebornColumnarBatchSerializer uses
AtomicBoolean to identify whether to call close() to avoid calling close()
twice situation (#6455)
a31df44bd is described below
commit a31df44bd6833ee5d10960bfe319b41c9f0c9691
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Jul 15 17:40:37 2024 +0800
[CH][CELEBORN] CHCelebornColumnarBatchSerializer uses AtomicBoolean to
identify whether to call close() to avoid calling close() twice situation
(#6455)
[CH][CELEBORN] CHCelebornColumnarBatchSerializer uses AtomicBoolean to
identify whether to call close() to avoid calling close() twice situation
---
.../CHCelebornColumnarBatchSerializer.scala | 26 ++++++++++++----------
1 file changed, 14 insertions(+), 12 deletions(-)
diff --git
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
index 39aefb01c..3619855f7 100644
---
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
+++
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
@@ -32,6 +32,7 @@ import org.apache.celeborn.client.read.CelebornInputStream
import java.io._
import java.nio.ByteBuffer
import java.util.Locale
+import java.util.concurrent.atomic.AtomicBoolean
import scala.reflect.ClassTag
@@ -74,7 +75,8 @@ private class CHCelebornColumnarBatchSerializerInstance(
private var numBatchesTotal: Long = _
private var numRowsTotal: Long = _
- private var isClosed: Boolean = false
+ // Otherwise calling close() twice would cause replication of metrics.
+ private val closeCalled: AtomicBoolean = new AtomicBoolean(false)
override def asIterator: Iterator[Any] = {
// This method is never called by shuffle code.
@@ -153,18 +155,18 @@ private class CHCelebornColumnarBatchSerializerInstance(
}
override def close(): Unit = {
- if (!isClosed) {
- if (numBatchesTotal > 0) {
- readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal)
- }
- numOutputRows += numRowsTotal
- if (cb != null) {
- cb.close()
- cb = null
- }
- closeReader()
- isClosed = true
+ if (!closeCalled.compareAndSet(false, true)) {
+ return
+ }
+ if (numBatchesTotal > 0) {
+ readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal)
+ }
+ numOutputRows += numRowsTotal
+ if (cb != null) {
+ cb.close()
+ cb = null
}
+ closeReader()
}
def getReader: CHStreamReader = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]