taiyang-li commented on code in PR #6432:
URL: https://github.com/apache/incubator-gluten/pull/6432#discussion_r1675644857


##########
gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala:
##########
@@ -55,58 +56,57 @@ class CHCelebornColumnarShuffleWriter[K, V](
 
   private var splitResult: CHSplitResult = _
 
-  private val nativeBufferSize: Int = 
GlutenConfig.getConf.shuffleWriterBufferSize
-
   @throws[IOException]
   override def internalWrite(records: Iterator[Product2[K, V]]): Unit = {
     if (!records.hasNext) {
       handleEmptyIterator()
       return
     }
 
-    if (nativeShuffleWriter == -1L) {
-      nativeShuffleWriter = jniWrapper.makeForRSS(
-        dep.nativePartitioning,
-        shuffleId,
-        mapId,
-        nativeBufferSize,
-        customizedCompressCodec,
-        GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
-        CHBackendSettings.shuffleHashAlgorithm,
-        celebornPartitionPusher,
-        GlutenConfig.getConf.chColumnarThrowIfMemoryExceed,
-        GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict,
-        GlutenConfig.getConf.chColumnarForceExternalSortShuffle,
-        GlutenConfig.getConf.chColumnarForceMemorySortShuffle
-      )
-      CHNativeMemoryAllocators.createSpillable(
-        "CelebornShuffleWriter",
-        new Spiller() {
-          override def spill(self: MemoryTarget, phase: Spiller.Phase, size: 
Long): Long = {
-            if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
-              return 0L
-            }
-            if (nativeShuffleWriter == -1L) {
-              throw new IllegalStateException(
-                "Fatal: spill() called before a celeborn shuffle writer " +
-                  "is created. This behavior should be" +
-                  "optimized by moving memory " +
-                  "allocations from make() to split()")
-            }
-            logInfo(s"Gluten shuffle writer: Trying to push $size bytes of 
data")
-            val spilled = jniWrapper.evict(nativeShuffleWriter)
-            logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of 
data")
-            spilled
-          }
-        }
-      )
-    }
     while (records.hasNext) {
       val cb = records.next()._2.asInstanceOf[ColumnarBatch]
       if (cb.numRows == 0 || cb.numCols == 0) {
         logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} 
cols")
       } else {
         val col = cb.column(0).asInstanceOf[CHColumnVector]
+        if (nativeShuffleWriter == -1L) {

Review Comment:
   better to wrap below initializing codes in a function to make current loop 
cleaner. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to