This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a77a64b89 [CELEBORN-1835][CIP-8] Add tier writer base and memory tier
writer
a77a64b89 is described below
commit a77a64b89a4e47d72a5482a8f924193f2f4ea654
Author: mingji <[email protected]>
AuthorDate: Thu Jan 23 09:48:14 2025 +0800
[CELEBORN-1835][CIP-8] Add tier writer base and memory tier writer
### What changes were proposed in this pull request?
1. Add tier writer base.
2. Add memory tier writer.
3. Add corresponding UTs.
Code coverage report:
<img width="739" alt="截屏2025-01-15 17 53 23"
src="https://github.com/user-attachments/assets/722ea74e-d89d-4007-a105-8ffb0036cad2"
/>
NOTE:
Evict test needs file tier writer, so the branch coverage is not 100% in
the memory tier.
### Why are the changes needed?
Refactor partition data writer for CIP-8.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes #3065 from FMX/b1835.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../deploy/worker/storage/StorageManager.scala | 2 +-
.../service/deploy/worker/storage/TierWriter.scala | 332 +++++++++++++++++++++
.../worker/storage/PartitionMetaHandlerSuite.scala | 33 +-
.../deploy/worker/storage/TierWriterSuite.scala | 171 +++++++++++
.../deploy/worker/storage/WriterUtils.scala | 58 ++++
5 files changed, 563 insertions(+), 33 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index af32daa0b..5a5ab2506 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -208,7 +208,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val activeTypes = conf.availableStorageTypes
- def localOrDfsStorageAvailable(): Boolean = {
+ lazy val localOrDfsStorageAvailable: Boolean = {
StorageInfo.OSSAvailable(activeTypes) || StorageInfo.HDFSAvailable(
activeTypes) || StorageInfo.localDiskAvailable(
activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
new file mode 100644
index 000000000..6723d0061
--- /dev/null
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.IOException
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
+
+import io.netty.buffer.{ByteBuf, CompositeByteBuf}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.exception.AlreadyClosedException
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.{FileInfo, MemoryFileInfo}
+import org.apache.celeborn.common.metrics.source.AbstractSource
+import org.apache.celeborn.common.protocol.StorageInfo
+import org.apache.celeborn.common.unsafe.Platform
+import org.apache.celeborn.service.deploy.worker.WorkerSource
+import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
+
+abstract class TierWriterBase(
+ val conf: CelebornConf,
+ val metaHandler: PartitionMetaHandler,
+ val numPendingWrites: AtomicInteger,
+ val notifier: FlushNotifier,
+ val fileInfo: FileInfo,
+ val source: AbstractSource,
+ val storageType: StorageInfo.Type,
+ val filename: String,
+ val shuffleKey: String,
+ val storageManager: StorageManager) extends Logging {
+ val metricsCollectCriticalEnabled: Boolean =
conf.metricsCollectCriticalEnabled
+ val flushLock = new AnyRef
+ val WAIT_INTERVAL_MS = 5
+
+ var flushBuffer: CompositeByteBuf = _
+ var writerCloseTimeoutMs: Long = conf.workerWriterCloseTimeoutMs
+ var flusherBufferSize = 0L
+ var chunkSize: Long = conf.shuffleChunkSize
+
+ @volatile var closed = false
+ @volatile var destroyed = false
+
+ takeBuffer()
+
+ def write(buf: ByteBuf): Unit = {
+ ensureNotClosed()
+ if (notifier.hasException) return
+
+ flushLock.synchronized {
+ metaHandler.beforeWrite(buf)
+ ensureNotClosed()
+ writerInternal(buf)
+ }
+
+ metaHandler.afterWrite(buf.readableBytes())
+ numPendingWrites.decrementAndGet()
+ }
+
+ protected def writerInternal(buf: ByteBuf): Unit
+
+ def needEvict(): Boolean
+
+ def evict(file: TierWriterBase): Unit
+
+ def swapFlushBuffer(inputBuffer: CompositeByteBuf): Unit = {
+ if (flushBuffer != null) {
+ returnBuffer(false)
+ }
+ flushBuffer = inputBuffer
+ }
+
+ def close(evict: Boolean = false): Long = {
+ try {
+ ensureNotClosed()
+ waitOnNoPending(numPendingWrites, false)
+ closed = true
+ finalFlush()
+
+ waitOnNoPending(notifier.numPendingFlushes, true)
+ metaHandler.afterClose()
+ } finally {
+ returnBuffer(false)
+ try {
+ closeStreams()
+ } catch {
+ case e: IOException =>
+ logWarning(s"close file writer ${this} failed", e)
+ }
+ }
+ if (!evict) {
+ notifyFileCommitted()
+ }
+
+ fileInfo.getFileLength
+ }
+
+ def ensureNotClosed(): Unit = {
+ if (closed) {
+ val msg = getFileAlreadyClosedMsg
+ logWarning(msg)
+ throw new AlreadyClosedException(msg)
+ }
+ }
+
+ def getFileAlreadyClosedMsg: String = {
+ s"PartitionDataWriter has already closed! File name:${filename}"
+ }
+
+ // this method is not used in memory tier writer
+ def notifyFileCommitted(): Unit = {}
+
+ // this method is not used in memory tier writer
+ def finalFlush(): Unit = {}
+
+ @throws[IOException]
+ protected def waitOnNoPending(counter: AtomicInteger, failWhenTimeout:
Boolean): Unit = {
+ var waitTime = writerCloseTimeoutMs
+ while (counter.get > 0 && waitTime > 0) {
+ try {
+ notifier.checkException()
+ TimeUnit.MILLISECONDS.sleep(WAIT_INTERVAL_MS)
+ } catch {
+ case e: InterruptedException =>
+ val ioe = new IOException(e)
+ notifier.setException(ioe)
+ throw ioe
+ }
+ waitTime -= WAIT_INTERVAL_MS
+ }
+ if (counter.get > 0 && failWhenTimeout) {
+ val ioe = new IOException("Wait pending actions timeout.")
+ notifier.setException(ioe)
+ throw ioe
+ }
+ notifier.checkException()
+ }
+
+ def closeStreams(): Unit
+
+ def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean): FlushTask
+
+ def flush(finalFlush: Boolean, rebuildChunkOffsets: Boolean = false): Unit =
{
+ if (flushBuffer != null) {
+ val numBytes = flushBuffer.readableBytes()
+ var flushTask: FlushTask = null
+ if (numBytes != 0) {
+ if (rebuildChunkOffsets) {
+ val dupBuf = flushBuffer.retainedDuplicate()
+ // this flusher buffer is from memory tier writer, so that we can
not keep the buffer
+ flushTask = genFlushTask(finalFlush, false)
+ if (numBytes > chunkSize) {
+ val headerBuf = ByteBuffer.allocate(16)
+ while (dupBuf.isReadable) {
+ headerBuf.rewind
+ dupBuf.readBytes(headerBuf)
+ val batchHeader = headerBuf.array
+ val compressedSize = Platform.getInt(batchHeader,
Platform.BYTE_ARRAY_OFFSET + 12)
+ dupBuf.skipBytes(compressedSize)
+ }
+ dupBuf.release
+ } else metaHandler.afterFlush(numBytes)
+ } else {
+ notifier.checkException()
+ // if a flush buffer is larger than the chunk size, it might contain
data of multiple chunks
+ flushTask = genFlushTask(finalFlush, true)
+ }
+ }
+ // task won't be null in real workloads unless it is a memory file
+ // task will be null in UT to check chunk size and offset
+ if (flushTask != null) {
+ addFlushTask(flushTask)
+ flushBuffer = null
+ if (!rebuildChunkOffsets) {
+ metaHandler.afterFlush(numBytes)
+ }
+ if (!finalFlush) {
+ takeBuffer()
+ }
+ }
+ }
+
+ }
+
+ def takeBuffer() = {
+ var metricsName: String = null
+ var fileAbsPath: String = null
+ if (metricsCollectCriticalEnabled) {
+ metricsName = WorkerSource.TAKE_BUFFER_TIME
+ fileAbsPath = fileInfo.getFilePath
+ source.startTimer(metricsName, fileAbsPath)
+ }
+
+ flushLock.synchronized {
+ flushBuffer = takeBufferInternal()
+ }
+
+ if (metricsCollectCriticalEnabled) source.stopTimer(metricsName,
fileAbsPath)
+ }
+
+ def addFlushTask(task: FlushTask): Unit
+
+ def takeBufferInternal(): CompositeByteBuf
+
+ def destroy(ioException: IOException): Unit = {
+ if (!closed) {
+ closed = true
+ if (!notifier.hasException) {
+ notifier.setException(ioException)
+ }
+ metaHandler.beforeDestroy()
+ returnBuffer(true)
+ closeResource()
+ }
+
+ if (!destroyed) {
+ destroyed = true
+ cleanLocalOrDfsFiles()
+ }
+ }
+
+ def returnBuffer(destroy: Boolean): Unit = {
+ flushLock.synchronized {
+ returnBufferInternal(destroy)
+ }
+ }
+
+ def closeResource(): Unit = {}
+
+ def cleanLocalOrDfsFiles(): Unit = {}
+
+ def returnBufferInternal(destroy: Boolean): Unit
+
+}
+
+class MemoryTierWriter(
+ conf: CelebornConf,
+ metaHandler: PartitionMetaHandler,
+ numPendingWriters: AtomicInteger,
+ notifier: FlushNotifier,
+ source: AbstractSource,
+ fileInfo: MemoryFileInfo,
+ storageType: StorageInfo.Type,
+ partitionDataWriterContext: PartitionDataWriterContext,
+ storageManager: StorageManager)
+ extends TierWriterBase(
+ conf,
+ metaHandler,
+ numPendingWriters,
+ notifier,
+ fileInfo,
+ source,
+ storageType,
+ partitionDataWriterContext.getPartitionLocation.getFileName,
+ partitionDataWriterContext.getShuffleKey,
+ storageManager) {
+
+ val memoryFileStorageMaxFileSize: Long =
conf.workerMemoryFileStorageMaxFileSize
+
+ override def needEvict(): Boolean = {
+ flushBuffer.readableBytes() > memoryFileStorageMaxFileSize &&
storageManager.localOrDfsStorageAvailable
+ }
+
+ override def evict(file: TierWriterBase): Unit = {
+ flushLock.synchronized {
+ // swap tier writer's flush buffer to memory tier writer's
+ // and handle its release
+ file.swapFlushBuffer(flushBuffer)
+ file.flush(false, true)
+ val numBytes = flushBuffer.readableBytes()
+ MemoryManager.instance.releaseMemoryFileStorage(numBytes)
+ MemoryManager.instance.incrementDiskBuffer(numBytes)
+ storageManager.unregisterMemoryPartitionWriterAndFileInfo(fileInfo,
shuffleKey, filename)
+ storageManager.evictedFileCount.incrementAndGet
+ }
+ }
+
+ // Memory file won't produce flush task
+ override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean):
FlushTask = null
+
+ override def writerInternal(buf: ByteBuf): Unit = {
+ buf.retain()
+ try {
+ flushBuffer.addComponent(true, buf)
+ } catch {
+ case oom: OutOfMemoryError =>
+ MemoryManager.instance.releaseMemoryFileStorage(buf.readableBytes())
+ throw oom
+ }
+ // memory tier writer will not flush
+ // add the bytes into flusher buffer is flush completed
+ val numBytes = buf.readableBytes()
+ metaHandler.afterFlush(numBytes)
+ MemoryManager.instance().incrementMemoryFileStorage(numBytes)
+ }
+
+ override def closeStreams(): Unit = {
+ flushBuffer.consolidate()
+ fileInfo.setBuffer(flushBuffer)
+ }
+
+ override def takeBufferInternal(): CompositeByteBuf = {
+ storageManager.storageBufferAllocator.compositeBuffer(Integer.MAX_VALUE)
+ }
+
+ override def returnBufferInternal(destroy: Boolean): Unit = {
+ if (destroy && flushBuffer != null) {
+ flushBuffer.removeComponents(0, flushBuffer.numComponents)
+ flushBuffer.release
+ }
+ }
+
+ override def addFlushTask(task: FlushTask): Unit = {
+ // memory tier write does not need flush tasks
+ }
+}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
index 93bdc184d..ff62cb9d5 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
@@ -28,41 +28,10 @@ import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{DiskFileInfo, MapFileMeta,
ReduceFileMeta}
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.unsafe.Platform
+import
org.apache.celeborn.service.deploy.worker.storage.WriterUtils.{generateFlinkFormatData,
generateSparkFormatData}
class PartitionMetaHandlerSuite extends CelebornFunSuite with MockitoHelper {
- private def generateFlinkFormatData(
- byteBufAllocator: UnpooledByteBufAllocator,
- partitionId: Int) = {
- val dataBuf = byteBufAllocator.buffer(1024)
- // partitionId attemptId batchId size
- dataBuf.writeInt(partitionId)
- dataBuf.writeInt(0)
- dataBuf.writeInt(0)
- dataBuf.writeInt(1008)
- for (i <- 1 to 1008) {
- dataBuf.writeByte(1)
- }
- assert(1024 === dataBuf.readableBytes())
- dataBuf
- }
-
- private def generateSparkFormatData(
- byteBufAllocator: UnpooledByteBufAllocator,
- attemptId: Int) = {
- val dataBuf = byteBufAllocator.buffer(1024)
- val arr = new Array[Byte](1024)
- // mapId attemptId batchId size
- Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET, attemptId)
- Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 4, 0)
- Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 8, 0)
- Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 12, 1008)
-
- dataBuf.writeBytes(arr)
- assert(1024 === dataBuf.readableBytes())
- dataBuf
- }
-
test("test map partition meta handler") {
val byteBufAllocator = UnpooledByteBufAllocator.DEFAULT
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
new file mode 100644
index 000000000..5aa2a40e3
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.IOException
+import java.util.concurrent.atomic.AtomicInteger
+
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.mockito.Mockito
+import org.mockito.MockitoSugar.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.exception.AlreadyClosedException
+import org.apache.celeborn.common.identity.UserIdentifier
+import org.apache.celeborn.common.meta.{MemoryFileInfo, ReduceFileMeta}
+import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
+import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType, StorageInfo}
+import org.apache.celeborn.service.deploy.worker.WorkerSource
+import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
+
+class TierWriterSuite extends AnyFunSuite with BeforeAndAfterEach {
+
+ private def prepareMemoryWriter = {
+ val celebornConf = new CelebornConf()
+ celebornConf.set("celeborn.worker.memoryFileStorage.maxFileSize", "80k")
+ val reduceFileMeta = new ReduceFileMeta(celebornConf.shuffleChunkSize)
+ val userIdentifier = UserIdentifier("`aa`.`bb`")
+ val memoryFileInfo = new MemoryFileInfo(userIdentifier, false,
reduceFileMeta)
+ val numPendingWriters = new AtomicInteger()
+ val flushNotifier = new FlushNotifier()
+
+ val SPLIT_THRESHOLD = 256 * 1024 * 1024L
+ val splitMode = PartitionSplitMode.HARD
+
+ val writerContext = new PartitionDataWriterContext(
+ SPLIT_THRESHOLD,
+ splitMode,
+ false,
+ new PartitionLocation(
+ 1,
+ 0,
+ "host",
+ 1111,
+ 1112,
+ 1113,
+ 1114,
+ PartitionLocation.Mode.PRIMARY,
+ null),
+ "app1-1",
+ 1,
+ userIdentifier,
+ PartitionType.REDUCE,
+ false)
+
+ val source = new WorkerSource(celebornConf)
+
+ val storageManager: StorageManager = Mockito.mock(classOf[StorageManager])
+ val transConf = new TransportConf("shuffle", new CelebornConf)
+ val allocator = NettyUtils.getByteBufAllocator(transConf, source, false)
+ when(storageManager.storageBufferAllocator).thenReturn(allocator)
+ when(storageManager.localOrDfsStorageAvailable).thenReturn(true)
+
+ MemoryManager.initialize(celebornConf, storageManager, null)
+
+ val tierMemoryWriter = new MemoryTierWriter(
+ celebornConf,
+ new
ReducePartitionMetaHandler(celebornConf.shuffleRangeReadFilterEnabled,
memoryFileInfo),
+ numPendingWriters,
+ flushNotifier,
+ source,
+ memoryFileInfo,
+ StorageInfo.Type.MEMORY,
+ writerContext,
+ storageManager)
+ tierMemoryWriter
+ }
+
+ test("test memory tier writer case1") {
+
+ val tierMemoryWriter: MemoryTierWriter = prepareMemoryWriter
+
+ val buf1 =
WriterUtils.generateSparkFormatData(UnpooledByteBufAllocator.DEFAULT, 0)
+ tierMemoryWriter.numPendingWrites.incrementAndGet()
+ tierMemoryWriter.write(buf1)
+ assert(tierMemoryWriter.fileInfo.getFileLength === 1024)
+
+ val needEvict = tierMemoryWriter.needEvict()
+ assert(needEvict === false)
+
+ for (i <- 2 to 80) {
+ tierMemoryWriter.numPendingWrites.incrementAndGet()
+ tierMemoryWriter.write(WriterUtils.generateSparkFormatData(
+ UnpooledByteBufAllocator.DEFAULT,
+ 0))
+ assert(tierMemoryWriter.fileInfo.getFileLength === 1024 * i)
+ }
+
+ // 8 MB is lesser than the evict threshold
+ assert(tierMemoryWriter.needEvict() === false)
+ tierMemoryWriter.numPendingWrites.incrementAndGet()
+ tierMemoryWriter.write(WriterUtils.generateSparkFormatData(
+ UnpooledByteBufAllocator.DEFAULT,
+ 0))
+
+ assert(tierMemoryWriter.needEvict() === true)
+
+ val filelen = tierMemoryWriter.close()
+ assert(filelen === 81 * 1024)
+
+ assert(tierMemoryWriter.closed === true)
+
+ try {
+ tierMemoryWriter.write((WriterUtils.generateSparkFormatData(
+ UnpooledByteBufAllocator.DEFAULT,
+ 0)))
+ // expect already closed exception here
+ assert(false)
+ } catch {
+ case e: AlreadyClosedException =>
+ assert(true)
+ }
+
+ }
+
+ test("test memory tier writer case2") {
+
+ val tierMemoryWriter
+ :
_root_.org.apache.celeborn.service.deploy.worker.storage.MemoryTierWriter =
+ prepareMemoryWriter
+
+ val buf1 =
WriterUtils.generateSparkFormatData(UnpooledByteBufAllocator.DEFAULT, 0)
+ tierMemoryWriter.numPendingWrites.incrementAndGet()
+ tierMemoryWriter.write(buf1)
+ assert(tierMemoryWriter.fileInfo.getFileLength === 1024)
+
+ val needEvict = tierMemoryWriter.needEvict()
+ assert(needEvict === false)
+
+ tierMemoryWriter.destroy(new IOException("test"))
+ assert(tierMemoryWriter.flushBuffer.refCnt() === 0)
+
+ try {
+ tierMemoryWriter.write((WriterUtils.generateSparkFormatData(
+ UnpooledByteBufAllocator.DEFAULT,
+ 0)))
+ // expect already closed exception here
+ assert(false)
+ } catch {
+ case e: AlreadyClosedException =>
+ assert(true)
+ }
+
+ }
+}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WriterUtils.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WriterUtils.scala
new file mode 100644
index 000000000..d6115debc
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WriterUtils.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.scalatest.Assertions.convertToEqualizer
+
+import org.apache.celeborn.common.unsafe.Platform
+
+object WriterUtils {
+
+ def generateFlinkFormatData(
+ byteBufAllocator: UnpooledByteBufAllocator,
+ partitionId: Int) = {
+ val dataBuf = byteBufAllocator.buffer(1024)
+ // partitionId attemptId batchId size
+ dataBuf.writeInt(partitionId)
+ dataBuf.writeInt(0)
+ dataBuf.writeInt(0)
+ dataBuf.writeInt(1008)
+ for (i <- 1 to 1008) {
+ dataBuf.writeByte(1)
+ }
+ assert(1024 === dataBuf.readableBytes())
+ dataBuf
+ }
+
+ def generateSparkFormatData(
+ byteBufAllocator: UnpooledByteBufAllocator,
+ attemptId: Int) = {
+ val dataBuf = byteBufAllocator.buffer(1024)
+ val arr = new Array[Byte](1024)
+ // mapId attemptId batchId size
+ Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET, attemptId)
+ Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 4, 0)
+ Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 8, 0)
+ Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 12, 1008)
+
+ dataBuf.writeBytes(arr)
+ assert(1024 === dataBuf.readableBytes())
+ dataBuf
+ }
+}