This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a77a64b89 [CELEBORN-1835][CIP-8] Add tier writer base and memory tier 
writer
a77a64b89 is described below

commit a77a64b89a4e47d72a5482a8f924193f2f4ea654
Author: mingji <[email protected]>
AuthorDate: Thu Jan 23 09:48:14 2025 +0800

    [CELEBORN-1835][CIP-8] Add tier writer base and memory tier writer
    
    ### What changes were proposed in this pull request?
    1. Add tier writer base.
    2. Add memory tier writer.
    3. Add corresponding UTs.
    
    Code coverage report:
    <img width="739" alt="截屏2025-01-15 17 53 23" 
src="https://github.com/user-attachments/assets/722ea74e-d89d-4007-a105-8ffb0036cad2";
 />
    
    NOTE:
    Evict test needs file tier writer, so the branch coverage is not 100% in 
the memory tier.
    
    ### Why are the changes needed?
    Refactor partition data writer for CIP-8.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA.
    
    Closes #3065 from FMX/b1835.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../deploy/worker/storage/StorageManager.scala     |   2 +-
 .../service/deploy/worker/storage/TierWriter.scala | 332 +++++++++++++++++++++
 .../worker/storage/PartitionMetaHandlerSuite.scala |  33 +-
 .../deploy/worker/storage/TierWriterSuite.scala    | 171 +++++++++++
 .../deploy/worker/storage/WriterUtils.scala        |  58 ++++
 5 files changed, 563 insertions(+), 33 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index af32daa0b..5a5ab2506 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -208,7 +208,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
 
   val activeTypes = conf.availableStorageTypes
 
-  def localOrDfsStorageAvailable(): Boolean = {
+  lazy val localOrDfsStorageAvailable: Boolean = {
     StorageInfo.OSSAvailable(activeTypes) || StorageInfo.HDFSAvailable(
       activeTypes) || StorageInfo.localDiskAvailable(
       activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
new file mode 100644
index 000000000..6723d0061
--- /dev/null
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.IOException
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
+
+import io.netty.buffer.{ByteBuf, CompositeByteBuf}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.exception.AlreadyClosedException
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.{FileInfo, MemoryFileInfo}
+import org.apache.celeborn.common.metrics.source.AbstractSource
+import org.apache.celeborn.common.protocol.StorageInfo
+import org.apache.celeborn.common.unsafe.Platform
+import org.apache.celeborn.service.deploy.worker.WorkerSource
+import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
+
+abstract class TierWriterBase(
+    val conf: CelebornConf,
+    val metaHandler: PartitionMetaHandler,
+    val numPendingWrites: AtomicInteger,
+    val notifier: FlushNotifier,
+    val fileInfo: FileInfo,
+    val source: AbstractSource,
+    val storageType: StorageInfo.Type,
+    val filename: String,
+    val shuffleKey: String,
+    val storageManager: StorageManager) extends Logging {
+  val metricsCollectCriticalEnabled: Boolean = 
conf.metricsCollectCriticalEnabled
+  val flushLock = new AnyRef
+  val WAIT_INTERVAL_MS = 5
+
+  var flushBuffer: CompositeByteBuf = _
+  var writerCloseTimeoutMs: Long = conf.workerWriterCloseTimeoutMs
+  var flusherBufferSize = 0L
+  var chunkSize: Long = conf.shuffleChunkSize
+
+  @volatile var closed = false
+  @volatile var destroyed = false
+
+  takeBuffer()
+
+  def write(buf: ByteBuf): Unit = {
+    ensureNotClosed()
+    if (notifier.hasException) return
+
+    flushLock.synchronized {
+      metaHandler.beforeWrite(buf)
+      ensureNotClosed()
+      writerInternal(buf)
+    }
+
+    metaHandler.afterWrite(buf.readableBytes())
+    numPendingWrites.decrementAndGet()
+  }
+
+  protected def writerInternal(buf: ByteBuf): Unit
+
+  def needEvict(): Boolean
+
+  def evict(file: TierWriterBase): Unit
+
+  def swapFlushBuffer(inputBuffer: CompositeByteBuf): Unit = {
+    if (flushBuffer != null) {
+      returnBuffer(false)
+    }
+    flushBuffer = inputBuffer
+  }
+
+  def close(evict: Boolean = false): Long = {
+    try {
+      ensureNotClosed()
+      waitOnNoPending(numPendingWrites, false)
+      closed = true
+      finalFlush()
+
+      waitOnNoPending(notifier.numPendingFlushes, true)
+      metaHandler.afterClose()
+    } finally {
+      returnBuffer(false)
+      try {
+        closeStreams()
+      } catch {
+        case e: IOException =>
+          logWarning(s"close file writer ${this} failed", e)
+      }
+    }
+    if (!evict) {
+      notifyFileCommitted()
+    }
+
+    fileInfo.getFileLength
+  }
+
+  def ensureNotClosed(): Unit = {
+    if (closed) {
+      val msg = getFileAlreadyClosedMsg
+      logWarning(msg)
+      throw new AlreadyClosedException(msg)
+    }
+  }
+
+  def getFileAlreadyClosedMsg: String = {
+    s"PartitionDataWriter has already closed! File name:${filename}"
+  }
+
+  // this method is not used in memory tier writer
+  def notifyFileCommitted(): Unit = {}
+
+  // this method is not used in memory tier writer
+  def finalFlush(): Unit = {}
+
+  @throws[IOException]
+  protected def waitOnNoPending(counter: AtomicInteger, failWhenTimeout: 
Boolean): Unit = {
+    var waitTime = writerCloseTimeoutMs
+    while (counter.get > 0 && waitTime > 0) {
+      try {
+        notifier.checkException()
+        TimeUnit.MILLISECONDS.sleep(WAIT_INTERVAL_MS)
+      } catch {
+        case e: InterruptedException =>
+          val ioe = new IOException(e)
+          notifier.setException(ioe)
+          throw ioe
+      }
+      waitTime -= WAIT_INTERVAL_MS
+    }
+    if (counter.get > 0 && failWhenTimeout) {
+      val ioe = new IOException("Wait pending actions timeout.")
+      notifier.setException(ioe)
+      throw ioe
+    }
+    notifier.checkException()
+  }
+
+  def closeStreams(): Unit
+
+  def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean): FlushTask
+
+  def flush(finalFlush: Boolean, rebuildChunkOffsets: Boolean = false): Unit = 
{
+    if (flushBuffer != null) {
+      val numBytes = flushBuffer.readableBytes()
+      var flushTask: FlushTask = null
+      if (numBytes != 0) {
+        if (rebuildChunkOffsets) {
+          val dupBuf = flushBuffer.retainedDuplicate()
+          // this flusher buffer is from memory tier writer, so that we can 
not keep the buffer
+          flushTask = genFlushTask(finalFlush, false)
+          if (numBytes > chunkSize) {
+            val headerBuf = ByteBuffer.allocate(16)
+            while (dupBuf.isReadable) {
+              headerBuf.rewind
+              dupBuf.readBytes(headerBuf)
+              val batchHeader = headerBuf.array
+              val compressedSize = Platform.getInt(batchHeader, 
Platform.BYTE_ARRAY_OFFSET + 12)
+              dupBuf.skipBytes(compressedSize)
+            }
+            dupBuf.release
+          } else metaHandler.afterFlush(numBytes)
+        } else {
+          notifier.checkException()
+          // if a flush buffer is larger than the chunk size, it might contain 
data of multiple chunks
+          flushTask = genFlushTask(finalFlush, true)
+        }
+      }
+      // task won't be null in real workloads unless it is a memory file
+      // task will be null in UT to check chunk size and offset
+      if (flushTask != null) {
+        addFlushTask(flushTask)
+        flushBuffer = null
+        if (!rebuildChunkOffsets) {
+          metaHandler.afterFlush(numBytes)
+        }
+        if (!finalFlush) {
+          takeBuffer()
+        }
+      }
+    }
+
+  }
+
+  def takeBuffer() = {
+    var metricsName: String = null
+    var fileAbsPath: String = null
+    if (metricsCollectCriticalEnabled) {
+      metricsName = WorkerSource.TAKE_BUFFER_TIME
+      fileAbsPath = fileInfo.getFilePath
+      source.startTimer(metricsName, fileAbsPath)
+    }
+
+    flushLock.synchronized {
+      flushBuffer = takeBufferInternal()
+    }
+
+    if (metricsCollectCriticalEnabled) source.stopTimer(metricsName, 
fileAbsPath)
+  }
+
+  def addFlushTask(task: FlushTask): Unit
+
+  def takeBufferInternal(): CompositeByteBuf
+
+  def destroy(ioException: IOException): Unit = {
+    if (!closed) {
+      closed = true
+      if (!notifier.hasException) {
+        notifier.setException(ioException)
+      }
+      metaHandler.beforeDestroy()
+      returnBuffer(true)
+      closeResource()
+    }
+
+    if (!destroyed) {
+      destroyed = true
+      cleanLocalOrDfsFiles()
+    }
+  }
+
+  def returnBuffer(destroy: Boolean): Unit = {
+    flushLock.synchronized {
+      returnBufferInternal(destroy)
+    }
+  }
+
+  def closeResource(): Unit = {}
+
+  def cleanLocalOrDfsFiles(): Unit = {}
+
+  def returnBufferInternal(destroy: Boolean): Unit
+
+}
+
+class MemoryTierWriter(
+    conf: CelebornConf,
+    metaHandler: PartitionMetaHandler,
+    numPendingWriters: AtomicInteger,
+    notifier: FlushNotifier,
+    source: AbstractSource,
+    fileInfo: MemoryFileInfo,
+    storageType: StorageInfo.Type,
+    partitionDataWriterContext: PartitionDataWriterContext,
+    storageManager: StorageManager)
+  extends TierWriterBase(
+    conf,
+    metaHandler,
+    numPendingWriters,
+    notifier,
+    fileInfo,
+    source,
+    storageType,
+    partitionDataWriterContext.getPartitionLocation.getFileName,
+    partitionDataWriterContext.getShuffleKey,
+    storageManager) {
+
+  val memoryFileStorageMaxFileSize: Long = 
conf.workerMemoryFileStorageMaxFileSize
+
+  override def needEvict(): Boolean = {
+    flushBuffer.readableBytes() > memoryFileStorageMaxFileSize && 
storageManager.localOrDfsStorageAvailable
+  }
+
+  override def evict(file: TierWriterBase): Unit = {
+    flushLock.synchronized {
+      // swap tier writer's flush buffer to memory tier writer's
+      // and handle its release
+      file.swapFlushBuffer(flushBuffer)
+      file.flush(false, true)
+      val numBytes = flushBuffer.readableBytes()
+      MemoryManager.instance.releaseMemoryFileStorage(numBytes)
+      MemoryManager.instance.incrementDiskBuffer(numBytes)
+      storageManager.unregisterMemoryPartitionWriterAndFileInfo(fileInfo, 
shuffleKey, filename)
+      storageManager.evictedFileCount.incrementAndGet
+    }
+  }
+
+  // Memory file won't produce flush task
+  override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean): 
FlushTask = null
+
+  override def writerInternal(buf: ByteBuf): Unit = {
+    buf.retain()
+    try {
+      flushBuffer.addComponent(true, buf)
+    } catch {
+      case oom: OutOfMemoryError =>
+        MemoryManager.instance.releaseMemoryFileStorage(buf.readableBytes())
+        throw oom
+    }
+    // memory tier writer will not flush
+    // add the bytes into flusher buffer is flush completed
+    val numBytes = buf.readableBytes()
+    metaHandler.afterFlush(numBytes)
+    MemoryManager.instance().incrementMemoryFileStorage(numBytes)
+  }
+
+  override def closeStreams(): Unit = {
+    flushBuffer.consolidate()
+    fileInfo.setBuffer(flushBuffer)
+  }
+
+  override def takeBufferInternal(): CompositeByteBuf = {
+    storageManager.storageBufferAllocator.compositeBuffer(Integer.MAX_VALUE)
+  }
+
+  override def returnBufferInternal(destroy: Boolean): Unit = {
+    if (destroy && flushBuffer != null) {
+      flushBuffer.removeComponents(0, flushBuffer.numComponents)
+      flushBuffer.release
+    }
+  }
+
+  override def addFlushTask(task: FlushTask): Unit = {
+    // memory tier write does not need flush tasks
+  }
+}
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
index 93bdc184d..ff62cb9d5 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
@@ -28,41 +28,10 @@ import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.meta.{DiskFileInfo, MapFileMeta, 
ReduceFileMeta}
 import org.apache.celeborn.common.protocol._
 import org.apache.celeborn.common.unsafe.Platform
+import 
org.apache.celeborn.service.deploy.worker.storage.WriterUtils.{generateFlinkFormatData,
 generateSparkFormatData}
 
 class PartitionMetaHandlerSuite extends CelebornFunSuite with MockitoHelper {
 
-  private def generateFlinkFormatData(
-      byteBufAllocator: UnpooledByteBufAllocator,
-      partitionId: Int) = {
-    val dataBuf = byteBufAllocator.buffer(1024)
-    // partitionId attemptId batchId size
-    dataBuf.writeInt(partitionId)
-    dataBuf.writeInt(0)
-    dataBuf.writeInt(0)
-    dataBuf.writeInt(1008)
-    for (i <- 1 to 1008) {
-      dataBuf.writeByte(1)
-    }
-    assert(1024 === dataBuf.readableBytes())
-    dataBuf
-  }
-
-  private def generateSparkFormatData(
-      byteBufAllocator: UnpooledByteBufAllocator,
-      attemptId: Int) = {
-    val dataBuf = byteBufAllocator.buffer(1024)
-    val arr = new Array[Byte](1024)
-    // mapId attemptId batchId size
-    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET, attemptId)
-    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 4, 0)
-    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 8, 0)
-    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 12, 1008)
-
-    dataBuf.writeBytes(arr)
-    assert(1024 === dataBuf.readableBytes())
-    dataBuf
-  }
-
   test("test map partition meta handler") {
     val byteBufAllocator = UnpooledByteBufAllocator.DEFAULT
 
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
new file mode 100644
index 000000000..5aa2a40e3
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.IOException
+import java.util.concurrent.atomic.AtomicInteger
+
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.mockito.Mockito
+import org.mockito.MockitoSugar.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.exception.AlreadyClosedException
+import org.apache.celeborn.common.identity.UserIdentifier
+import org.apache.celeborn.common.meta.{MemoryFileInfo, ReduceFileMeta}
+import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
+import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, StorageInfo}
+import org.apache.celeborn.service.deploy.worker.WorkerSource
+import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
+
+class TierWriterSuite extends AnyFunSuite with BeforeAndAfterEach {
+
+  private def prepareMemoryWriter = {
+    val celebornConf = new CelebornConf()
+    celebornConf.set("celeborn.worker.memoryFileStorage.maxFileSize", "80k")
+    val reduceFileMeta = new ReduceFileMeta(celebornConf.shuffleChunkSize)
+    val userIdentifier = UserIdentifier("`aa`.`bb`")
+    val memoryFileInfo = new MemoryFileInfo(userIdentifier, false, 
reduceFileMeta)
+    val numPendingWriters = new AtomicInteger()
+    val flushNotifier = new FlushNotifier()
+
+    val SPLIT_THRESHOLD = 256 * 1024 * 1024L
+    val splitMode = PartitionSplitMode.HARD
+
+    val writerContext = new PartitionDataWriterContext(
+      SPLIT_THRESHOLD,
+      splitMode,
+      false,
+      new PartitionLocation(
+        1,
+        0,
+        "host",
+        1111,
+        1112,
+        1113,
+        1114,
+        PartitionLocation.Mode.PRIMARY,
+        null),
+      "app1-1",
+      1,
+      userIdentifier,
+      PartitionType.REDUCE,
+      false)
+
+    val source = new WorkerSource(celebornConf)
+
+    val storageManager: StorageManager = Mockito.mock(classOf[StorageManager])
+    val transConf = new TransportConf("shuffle", new CelebornConf)
+    val allocator = NettyUtils.getByteBufAllocator(transConf, source, false)
+    when(storageManager.storageBufferAllocator).thenReturn(allocator)
+    when(storageManager.localOrDfsStorageAvailable).thenReturn(true)
+
+    MemoryManager.initialize(celebornConf, storageManager, null)
+
+    val tierMemoryWriter = new MemoryTierWriter(
+      celebornConf,
+      new 
ReducePartitionMetaHandler(celebornConf.shuffleRangeReadFilterEnabled, 
memoryFileInfo),
+      numPendingWriters,
+      flushNotifier,
+      source,
+      memoryFileInfo,
+      StorageInfo.Type.MEMORY,
+      writerContext,
+      storageManager)
+    tierMemoryWriter
+  }
+
+  test("test memory tier writer case1") {
+
+    val tierMemoryWriter: MemoryTierWriter = prepareMemoryWriter
+
+    val buf1 = 
WriterUtils.generateSparkFormatData(UnpooledByteBufAllocator.DEFAULT, 0)
+    tierMemoryWriter.numPendingWrites.incrementAndGet()
+    tierMemoryWriter.write(buf1)
+    assert(tierMemoryWriter.fileInfo.getFileLength === 1024)
+
+    val needEvict = tierMemoryWriter.needEvict()
+    assert(needEvict === false)
+
+    for (i <- 2 to 80) {
+      tierMemoryWriter.numPendingWrites.incrementAndGet()
+      tierMemoryWriter.write(WriterUtils.generateSparkFormatData(
+        UnpooledByteBufAllocator.DEFAULT,
+        0))
+      assert(tierMemoryWriter.fileInfo.getFileLength === 1024 * i)
+    }
+
+    // 8 MB is lesser than the evict threshold
+    assert(tierMemoryWriter.needEvict() === false)
+    tierMemoryWriter.numPendingWrites.incrementAndGet()
+    tierMemoryWriter.write(WriterUtils.generateSparkFormatData(
+      UnpooledByteBufAllocator.DEFAULT,
+      0))
+
+    assert(tierMemoryWriter.needEvict() === true)
+
+    val filelen = tierMemoryWriter.close()
+    assert(filelen === 81 * 1024)
+
+    assert(tierMemoryWriter.closed === true)
+
+    try {
+      tierMemoryWriter.write((WriterUtils.generateSparkFormatData(
+        UnpooledByteBufAllocator.DEFAULT,
+        0)))
+      // expect already closed exception here
+      assert(false)
+    } catch {
+      case e: AlreadyClosedException =>
+        assert(true)
+    }
+
+  }
+
+  test("test memory tier writer case2") {
+
+    val tierMemoryWriter
+        : 
_root_.org.apache.celeborn.service.deploy.worker.storage.MemoryTierWriter =
+      prepareMemoryWriter
+
+    val buf1 = 
WriterUtils.generateSparkFormatData(UnpooledByteBufAllocator.DEFAULT, 0)
+    tierMemoryWriter.numPendingWrites.incrementAndGet()
+    tierMemoryWriter.write(buf1)
+    assert(tierMemoryWriter.fileInfo.getFileLength === 1024)
+
+    val needEvict = tierMemoryWriter.needEvict()
+    assert(needEvict === false)
+
+    tierMemoryWriter.destroy(new IOException("test"))
+    assert(tierMemoryWriter.flushBuffer.refCnt() === 0)
+
+    try {
+      tierMemoryWriter.write((WriterUtils.generateSparkFormatData(
+        UnpooledByteBufAllocator.DEFAULT,
+        0)))
+      // expect already closed exception here
+      assert(false)
+    } catch {
+      case e: AlreadyClosedException =>
+        assert(true)
+    }
+
+  }
+}
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WriterUtils.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WriterUtils.scala
new file mode 100644
index 000000000..d6115debc
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WriterUtils.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.scalatest.Assertions.convertToEqualizer
+
+import org.apache.celeborn.common.unsafe.Platform
+
+object WriterUtils {
+
+  def generateFlinkFormatData(
+      byteBufAllocator: UnpooledByteBufAllocator,
+      partitionId: Int) = {
+    val dataBuf = byteBufAllocator.buffer(1024)
+    // partitionId attemptId batchId size
+    dataBuf.writeInt(partitionId)
+    dataBuf.writeInt(0)
+    dataBuf.writeInt(0)
+    dataBuf.writeInt(1008)
+    for (i <- 1 to 1008) {
+      dataBuf.writeByte(1)
+    }
+    assert(1024 === dataBuf.readableBytes())
+    dataBuf
+  }
+
+  def generateSparkFormatData(
+      byteBufAllocator: UnpooledByteBufAllocator,
+      attemptId: Int) = {
+    val dataBuf = byteBufAllocator.buffer(1024)
+    val arr = new Array[Byte](1024)
+    // mapId attemptId batchId size
+    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET, attemptId)
+    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 4, 0)
+    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 8, 0)
+    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 12, 1008)
+
+    dataBuf.writeBytes(arr)
+    assert(1024 === dataBuf.readableBytes())
+    dataBuf
+  }
+}

Reply via email to