This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new df2512994 [CELEBORN-1482][CIP-8] Add partition meta handler
df2512994 is described below

commit df2512994d549eb1d6d6270821ec81ddf66736b2
Author: mingji <[email protected]>
AuthorDate: Thu Jan 9 21:30:17 2025 +0800

    [CELEBORN-1482][CIP-8] Add partition meta handler
    
    ### What changes were proposed in this pull request?
    1. Add partition meta handler to eliminate reduce/map/mapSegment partition 
writers.
    
    ### Why are the changes needed?
    
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-8+Refactor+Partition+Data+Writer
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA.
    
    Closes #3058 from FMX/b1482-1.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../worker/storage/PartitionMetaHandler.scala      | 491 +++++++++++++++++++++
 .../worker/storage/PartitionMetaHandlerSuite.scala | 251 +++++++++++
 2 files changed, 742 insertions(+)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
new file mode 100644
index 000000000..b4dbb9d38
--- /dev/null
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.IOException
+import java.nio.{ByteBuffer, ByteOrder}
+import java.nio.channels.FileChannel
+import java.util
+
+import com.google.common.annotations.VisibleForTesting
+import com.google.protobuf.GeneratedMessageV3
+import io.netty.buffer.ByteBuf
+import org.roaringbitmap.RoaringBitmap
+import org.slf4j.LoggerFactory
+
+import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, 
MemoryFileInfo}
+import org.apache.celeborn.common.protocol.{PbPushDataHandShake, 
PbRegionFinish, PbRegionStart, PbSegmentStart}
+import org.apache.celeborn.common.unsafe.Platform
+import org.apache.celeborn.common.util.FileChannelUtils
+
+/**
+ * It's the specific logic for reduce partition writer, map partition writer 
and map segment partition writer
+ */
+trait PartitionMetaHandler {
+
+  /**
+   * For reduce partition meta handler, this method won't be invoked.
+   * For map partition meta handler, this method accept 1,2,4 messages.
+   * For map segment partition meta handler, this method accept 1,2,3,4 
messages.
+   * @param message This method accept protobuf messages only
+   * There are only four message types can be accepted.
+   * 1. PbPushDataHandShake
+   * 2. PbRegionStart
+   * 3. PbSegmentStart
+   * 4. PbRegionFinish
+   */
+  def handleEvent(message: GeneratedMessageV3): Unit
+
+  /**
+   * For reduce partition meta handler, this method will add map id into 
roaringbitmap if rangeReadFilter is on
+   * For map partition meta handler, this method will get partition id from 
bytebuf and update the length of this partition
+   * For map segment partition meta handler, this method will check the 
segment contains this partition id
+   * @param bytes The bytes that contains shuffle data
+   */
+  def beforeWrite(bytes: ByteBuf): Unit
+
+  /**
+   * For reduce partition meta handler, this method will do nothing
+   * For map partition meta handler, this method will ensure that this region 
is not finished
+   * For map segmenta partition meta handler, this method will update segment 
index
+   * @param size processed shuffle data size
+   */
+  def afterWrite(size: Int): Unit
+
+  /**
+   * Update file meta about file written bytes changed
+   * @param size the size of data that put into the data flusher
+   */
+  def afterFlush(size: Int): Unit
+
+  /**
+   * For reduce partition meta handler, this method will do nothing
+   * For map partition meta handler, this method will clear its index buffer
+   */
+  def beforeDestroy(): Unit
+
+  /**
+   * For reduce partition meta handler, this method will update file meta's 
chunk offsets
+   * For map partition meta handler, this method will flush index buffer to 
disk
+   * For map segment partition meta handler, this method will flush index 
buffer to disk and clear
+   * segment index
+   */
+  def afterClose(): Unit
+}
+
+class MapPartitionMetaHandler(
+    diskFileInfo: DiskFileInfo,
+    notifier: FlushNotifier) extends PartitionMetaHandler {
+  lazy val hadoopFs = StorageManager.hadoopFs.get()
+  val logger = LoggerFactory.getLogger(classOf[MapPartitionMetaHandler])
+  val fileMeta = diskFileInfo.getFileMeta.asInstanceOf[MapFileMeta]
+  var numSubpartitions = 0
+  var currentDataRegionIndex = 0
+  var isBroadcastRegion = false
+  var numSubpartitionBytes: Array[Long] = null
+  var indexBuffer: ByteBuffer = null
+  var currentSubpartition = 0
+  var totalBytes = 0L
+  var regionStartingOffset = 0L
+  var indexChannel: FileChannel =
+    FileChannelUtils.createWritableFileChannel(diskFileInfo.getIndexPath)
+  @volatile var isRegionFinished = true
+
+  override def handleEvent(message: GeneratedMessageV3): Unit = {
+    // only accept protobuf messages
+    message match {
+      case pb: PbPushDataHandShake =>
+        pushDataHandShake(pb.getNumPartitions, pb.getBufferSize)
+      case pb: PbRegionStart =>
+        regionStart(
+          pb.getCurrentRegionIndex,
+          pb.getIsBroadcast)
+      case pb: PbRegionFinish =>
+        regionFinish()
+      case _ =>
+      // do not handle
+    }
+
+  }
+
+  def pushDataHandShake(numSubpartitions: Int, bufferSize: Int): Unit = {
+    logger.debug(
+      s"FileWriter:${diskFileInfo.getFilePath} " +
+        s"pushDataHandShake numReducePartitions:${numSubpartitions} " +
+        s"bufferSize:${bufferSize}")
+    this.numSubpartitions = numSubpartitions
+    numSubpartitionBytes = new Array[Long](numSubpartitions)
+    fileMeta.setBufferSize(bufferSize)
+    fileMeta.setNumSubPartitions(numSubpartitions)
+  }
+
+  def regionStart(currentDataRegionIndex: Int, isBroadcastRegion: Boolean): 
Unit = {
+    logger.debug(
+      s"FileWriter:${diskFileInfo.getFilePath} " +
+        s"regionStart currentDataRegionIndex:${currentDataRegionIndex} " +
+        s"isBroadcastRegion:${isBroadcastRegion}")
+    this.currentSubpartition = 0
+    this.currentDataRegionIndex = currentDataRegionIndex
+    this.isBroadcastRegion = isBroadcastRegion
+  }
+
+  @throws[IOException]
+  def regionFinish(): Unit = {
+    // TODO: When region is finished, flush the data to be ready for the 
reading, in scenarios that
+    // the upstream task writes and the downstream task reads simultaneously, 
such as flink hybrid
+    // shuffle
+    logger.debug("FileWriter:{} regionFinish", diskFileInfo.getFilePath)
+    if (regionStartingOffset == totalBytes) return
+    var fileOffset = regionStartingOffset
+    if (indexBuffer == null) indexBuffer = 
allocateIndexBuffer(numSubpartitions)
+    // write the index information of the current data region
+    for (partitionIndex <- 0 until numSubpartitions) {
+      indexBuffer.putLong(fileOffset)
+      if (!isBroadcastRegion) {
+        logger.debug(
+          s"flush index filename:${diskFileInfo.getFilePath} " +
+            s"region:${currentDataRegionIndex} " +
+            s"partitionId:${partitionIndex} " +
+            s"flush index fileOffset:${fileOffset}, " +
+            s"size:${numSubpartitionBytes(partitionIndex)} ")
+        indexBuffer.putLong(numSubpartitionBytes(partitionIndex))
+        fileOffset += numSubpartitionBytes(partitionIndex)
+      } else {
+        logger.debug(
+          s"flush index broadcast filename:${diskFileInfo.getFilePath} " +
+            s"region:${currentDataRegionIndex} " +
+            s"partitionId:${partitionIndex} " +
+            s"fileOffset:${fileOffset}, " +
+            s"size:${numSubpartitionBytes(0)} ")
+        indexBuffer.putLong(numSubpartitionBytes(0))
+      }
+    }
+    if (!indexBuffer.hasRemaining) flushIndex()
+    regionStartingOffset = totalBytes
+    util.Arrays.fill(numSubpartitionBytes, 0)
+    isRegionFinished = true
+  }
+
+  protected def allocateIndexBuffer(numSubpartitions: Int): ByteBuffer = {
+    // the returned buffer size is no smaller than 4096 bytes to improve disk 
IO performance
+    val minBufferSize = 4096
+    val indexRegionSize = numSubpartitions * (8 + 8)
+    if (indexRegionSize >= minBufferSize) {
+      val buffer = ByteBuffer.allocateDirect(indexRegionSize)
+      buffer.order(ByteOrder.BIG_ENDIAN)
+      return buffer
+    }
+    var numRegions = minBufferSize / indexRegionSize
+    if (minBufferSize % indexRegionSize != 0) numRegions += 1
+    val buffer = ByteBuffer.allocateDirect(numRegions * indexRegionSize)
+    buffer.order(ByteOrder.BIG_ENDIAN)
+    buffer
+  }
+
+  @SuppressWarnings(Array("ByteBufferBackingArray"))
+  @throws[IOException]
+  protected def flushIndex(): Unit = {
+    // TODO: force flush the index file channel in scenarios which the 
upstream task writes and
+    // downstream task reads simultaneously, such as flink hybrid shuffle
+    if (indexBuffer != null) {
+      logger.debug("flushIndex start:{}", diskFileInfo.getIndexPath)
+      val startTime = System.currentTimeMillis
+      indexBuffer.flip
+      notifier.checkException()
+      try {
+        if (indexBuffer.hasRemaining) {
+          // mappartition synchronously writes file index
+          if (indexChannel != null) while (indexBuffer.hasRemaining) 
indexChannel.write(indexBuffer)
+          else if (diskFileInfo.isDFS) {
+            val dfsStream = hadoopFs.append(diskFileInfo.getDfsIndexPath)
+            dfsStream.write(indexBuffer.array)
+            dfsStream.close()
+          }
+        }
+        indexBuffer.clear
+      } finally logger.debug(
+        s"flushIndex end:${diskFileInfo.getIndexPath}, " +
+          s"cost:${System.currentTimeMillis - startTime}")
+    }
+  }
+
+  @throws[InterruptedException]
+  def checkPartitionRegionFinished(timeout: Long): Boolean = {
+    val delta = 100
+    var times = 0
+    while (delta * times < timeout) {
+      if (this.isRegionFinished) return true
+      Thread.sleep(delta)
+      times += 1
+    }
+    false
+  }
+
+  def getCurrentSubpartition: Int = currentSubpartition
+
+  def setCurrentSubpartition(currentSubpartition: Int): Unit = {
+    this.currentSubpartition = currentSubpartition
+  }
+
+  def getNumSubpartitionBytes: Array[Long] = numSubpartitionBytes
+
+  def getTotalBytes: Long = totalBytes
+
+  def setTotalBytes(totalBytes: Long): Unit = {
+    this.totalBytes = totalBytes
+  }
+
+  def setRegionFinished(regionFinished: Boolean): Unit = {
+    isRegionFinished = regionFinished
+  }
+
+  override def afterFlush(size: Int): Unit = {
+    diskFileInfo.updateBytesFlushed(size)
+  }
+
+  override def afterClose(): Unit = {
+    // TODO: force flush the index file channel in scenarios which the 
upstream task writes and
+    // downstream task reads simultaneously, such as flink hybrid shuffle
+    if (indexBuffer != null) {
+      logger.debug(s"flushIndex start:${diskFileInfo.getIndexPath}")
+      val startTime = System.currentTimeMillis
+      indexBuffer.flip
+      notifier.checkException()
+      try {
+        if (indexBuffer.hasRemaining) {
+          // mappartition synchronously writes file index
+          if (indexChannel != null) while (indexBuffer.hasRemaining) 
indexChannel.write(indexBuffer)
+          else if (diskFileInfo.isDFS) {
+            val dfsStream = hadoopFs.append(diskFileInfo.getDfsIndexPath)
+            dfsStream.write(indexBuffer.array)
+            dfsStream.close()
+          }
+        }
+        indexBuffer.clear
+      } finally logger.debug(
+        s"flushIndex end:${diskFileInfo.getIndexPath}, " +
+          s"cost:${System.currentTimeMillis - startTime}")
+    }
+  }
+
+  override def beforeWrite(bytes: ByteBuf): Unit = {
+    bytes.markReaderIndex()
+    val partitionId = bytes.readInt
+    val attemptId = bytes.readInt
+    val batchId = bytes.readInt
+    val size = bytes.readInt
+    bytes.resetReaderIndex()
+    logger.debug(
+      s"map partition filename:${diskFileInfo.getFilePath} " +
+        s"write partition:${partitionId} " +
+        s"attemptId:${attemptId} " +
+        s"batchId:${batchId} " +
+        s"size:${size}")
+
+    if (partitionId < currentSubpartition) throw new IOException(
+      s"Must writing data in reduce partition index order, " +
+        s"but now partitionId is ${partitionId} " +
+        s"and pre partitionId is ${currentSubpartition}")
+
+    if (partitionId > currentSubpartition) currentSubpartition = partitionId
+    val length = bytes.readableBytes
+    totalBytes += length
+    numSubpartitionBytes(partitionId) += length
+  }
+
+  override def afterWrite(size: Int): Unit = {
+    isRegionFinished = false
+  }
+
+  override def beforeDestroy(): Unit = {
+    try if (indexChannel != null) indexChannel.close()
+    catch {
+      case e: IOException =>
+        logger.warn(
+          s"Close channel failed for file ${diskFileInfo.getIndexPath} caused 
by {}.",
+          e.getMessage)
+    }
+  }
+
+}
+
+class ReducePartitionMetaHandler(val rangeReadFilter: Boolean, val fileInfo: 
FileInfo)
+  extends PartitionMetaHandler {
+  val logger = LoggerFactory.getLogger(classOf[MapPartitionMetaHandler])
+  lazy val mapIdBitMap: Option[RoaringBitmap] =
+    if (rangeReadFilter) Some(new RoaringBitmap()) else None
+
+  override def afterFlush(size: Int): Unit = {
+    fileInfo.updateBytesFlushed(size)
+  }
+
+  override def afterClose(): Unit = {
+    // update offset if it is not matched
+    if (!isChunkOffsetValid()) {
+      fileInfo.getReduceFileMeta.updateChunkOffset(fileInfo.getFileLength, 
true)
+    }
+  }
+
+  private def isChunkOffsetValid(): Boolean = {
+    // Consider a scenario where some bytes have been flushed
+    // but the chunk offset boundary has not yet been updated.
+    // we should check if the chunk offset boundary equals
+    // bytesFlush or not. For example:
+    // The last record is a giant record and it has been flushed
+    // but its size is smaller than the nextBoundary, then the
+    // chunk offset will not be set after flushing. we should
+    // set it during FileWriter close.
+    if (fileInfo.isInstanceOf[DiskFileInfo]) {
+      val diskFileInfo = fileInfo.asInstanceOf[DiskFileInfo]
+      diskFileInfo.getReduceFileMeta.getLastChunkOffset == 
diskFileInfo.getFileLength
+    }
+    if (fileInfo.isInstanceOf[MemoryFileInfo]) {
+      val memoryFileInfo = fileInfo.asInstanceOf[MemoryFileInfo]
+      memoryFileInfo.getReduceFileMeta.getLastChunkOffset == 
memoryFileInfo.getFileLength
+    }
+    // this should not happen
+    false
+  }
+
+  override def beforeWrite(bytes: ByteBuf): Unit = {
+    if (rangeReadFilter) {
+      val mapId = getMapIdFromBuf(bytes)
+      mapIdBitMap.get.add(mapId)
+    }
+  }
+
+  override def afterWrite(size: Int): Unit = {}
+
+  def getMapIdFromBuf(buf: ByteBuf): Int = {
+    if (rangeReadFilter) {
+      val header = new Array[Byte](4)
+      buf.markReaderIndex
+      buf.readBytes(header)
+      buf.resetReaderIndex
+      Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET)
+    } else {
+      0
+    }
+  }
+
+  def getMapIdBitmap(): Option[RoaringBitmap] = {
+    mapIdBitMap
+  }
+
+  override def beforeDestroy(): Unit = {}
+
+  override def handleEvent(message: GeneratedMessageV3): Unit = {
+    // reduce partition have no message to handle
+  }
+}
+
+class SegmentMapPartitionMetaHandler(diskFileInfo: DiskFileInfo, notifier: 
FlushNotifier)
+  extends MapPartitionMetaHandler(diskFileInfo, notifier) {
+
+  @VisibleForTesting
+  val subPartitionHasStartSegment: util.Map[Integer, Boolean] =
+    new util.HashMap[Integer, Boolean];
+  @VisibleForTesting
+  var subPartitionBufferIndex: Array[Int] = null
+  private var dataHeaders: List[Int] = _
+
+  override def handleEvent(message: GeneratedMessageV3): Unit = {
+    super.handleEvent(message)
+    message match {
+      case pb: PbSegmentStart =>
+        segmentStart(pb.getSubPartitionId, pb.getSegmentId)
+      case _ =>
+      // do not handle
+    }
+  }
+
+  def segmentStart(subPartitionId: Int, segmentId: Int): Unit = {
+    fileMeta.addPartitionSegmentId(
+      subPartitionId,
+      segmentId)
+    subPartitionHasStartSegment.put(subPartitionId, true)
+  }
+
+  override def pushDataHandShake(numSubpartitions: Int, bufferSize: Int): Unit 
= {
+    super.pushDataHandShake(numSubpartitions, bufferSize)
+    subPartitionBufferIndex = new Array[Int](numSubpartitions)
+    util.Arrays.fill(subPartitionBufferIndex, 0)
+    fileMeta.setIsWriterClosed(false)
+    fileMeta.setSegmentGranularityVisible(true)
+  }
+
+  override def afterFlush(size: Int): Unit = {
+    diskFileInfo.updateBytesFlushed(size)
+  }
+
+  override def afterClose(): Unit = {
+    subPartitionHasStartSegment.clear()
+    super.afterClose()
+    logger.debug(s"Close ${this} for file ${diskFileInfo.getFile}")
+    fileMeta.setIsWriterClosed(true)
+  }
+
+  override def beforeWrite(bytes: ByteBuf): Unit = {
+    bytes.markReaderIndex
+    val subPartitionId = bytes.readInt
+    val attemptId = bytes.readInt
+    val batchId = bytes.readInt
+    val size = bytes.readInt
+    dataHeaders = List(subPartitionId, attemptId, batchId, size)
+
+    if (!subPartitionHasStartSegment.containsKey(subPartitionId))
+      throw new IllegalStateException(String.format(
+        s"This partition may not start a segment: 
subPartitionId:${subPartitionId} attemptId:${attemptId} batchId:${batchId} 
size:${size}"))
+    val currentSubpartition = getCurrentSubpartition
+    // the subPartitionId must be ordered in a region// the subPartitionId 
must be ordered in a region
+    if (subPartitionId < currentSubpartition) throw new 
IOException(String.format(
+      s"Must writing data in reduce partition index order, but now 
supPartitionId is ${subPartitionId} and the previous supPartitionId is 
${currentSubpartition}, attemptId is ${attemptId}, batchId is ${batchId}, size 
is ${size}"))
+    bytes.resetReaderIndex
+    logger.debug(
+      s"mappartition filename:${diskFileInfo.getFilePath} write 
partition:${subPartitionId} currentSubPartition:${currentSubpartition} 
attemptId:${attemptId} batchId:${batchId} size:${size}")
+    if (subPartitionId > currentSubpartition) 
setCurrentSubpartition(subPartitionId)
+    val length = bytes.readableBytes
+    setTotalBytes(getTotalBytes + length)
+    getNumSubpartitionBytes(subPartitionId) += length
+
+  }
+
+  override def afterWrite(size: Int): Unit = {
+    super.afterWrite(size)
+    val subPartitionId = dataHeaders(0)
+    val attemptId = dataHeaders(1)
+    if (subPartitionHasStartSegment.get(subPartitionId)) {
+      fileMeta.addSegmentIdAndFirstBufferIndex(
+        subPartitionId,
+        subPartitionBufferIndex(subPartitionId),
+        fileMeta.getPartitionWritingSegmentId(subPartitionId))
+      logger.debug(
+        s"Add a segment id, partitionId:${subPartitionId}, " +
+          s"bufferIndex:${subPartitionBufferIndex(subPartitionId)}, " +
+          
s"segmentId:${fileMeta.getPartitionWritingSegmentId(subPartitionId)}, " +
+          s"filename:${diskFileInfo.getFilePath}, " +
+          s"attemptId:${attemptId}.")
+      // After the first buffer index of the segment is added, the following 
buffers in the segment
+      // should not be added anymore, so the subPartitionHasStartSegment is 
updated to false.
+      subPartitionHasStartSegment.put(subPartitionId, false)
+    }
+    subPartitionBufferIndex(subPartitionId) += 1
+  }
+
+  override def beforeDestroy(): Unit = super.beforeDestroy()
+}
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
new file mode 100644
index 000000000..93bdc184d
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.{File, FileInputStream}
+import java.nio.ByteBuffer
+import java.nio.file.Files
+
+import io.netty.buffer.{ByteBuf, UnpooledByteBufAllocator}
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.identity.UserIdentifier
+import org.apache.celeborn.common.meta.{DiskFileInfo, MapFileMeta, 
ReduceFileMeta}
+import org.apache.celeborn.common.protocol._
+import org.apache.celeborn.common.unsafe.Platform
+
+class PartitionMetaHandlerSuite extends CelebornFunSuite with MockitoHelper {
+
+  private def generateFlinkFormatData(
+      byteBufAllocator: UnpooledByteBufAllocator,
+      partitionId: Int) = {
+    val dataBuf = byteBufAllocator.buffer(1024)
+    // partitionId attemptId batchId size
+    dataBuf.writeInt(partitionId)
+    dataBuf.writeInt(0)
+    dataBuf.writeInt(0)
+    dataBuf.writeInt(1008)
+    for (i <- 1 to 1008) {
+      dataBuf.writeByte(1)
+    }
+    assert(1024 === dataBuf.readableBytes())
+    dataBuf
+  }
+
+  private def generateSparkFormatData(
+      byteBufAllocator: UnpooledByteBufAllocator,
+      attemptId: Int) = {
+    val dataBuf = byteBufAllocator.buffer(1024)
+    val arr = new Array[Byte](1024)
+    // mapId attemptId batchId size
+    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET, attemptId)
+    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 4, 0)
+    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 8, 0)
+    Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 12, 1008)
+
+    dataBuf.writeBytes(arr)
+    assert(1024 === dataBuf.readableBytes())
+    dataBuf
+  }
+
+  test("test map partition meta handler") {
+    val byteBufAllocator = UnpooledByteBufAllocator.DEFAULT
+
+    val tmpFilePath = Files.createTempFile("abc", "def")
+    val fileMeta = new MapFileMeta()
+    val notifier = new FlushNotifier()
+    val diskFileInfo = new DiskFileInfo(
+      UserIdentifier.apply("`a`.`b`"),
+      true,
+      fileMeta,
+      tmpFilePath.toString,
+      StorageInfo.Type.HDD)
+
+    val mapMetaHandler = new MapPartitionMetaHandler(diskFileInfo, notifier)
+    val pbPushDataHandShake =
+      
PbPushDataHandShake.newBuilder().setNumPartitions(10).setBufferSize(1024).build()
+    mapMetaHandler.handleEvent(pbPushDataHandShake)
+
+    assert(10 === mapMetaHandler.numSubpartitions)
+    assert(10 === mapMetaHandler.numSubpartitionBytes.length)
+
+    val pbRegionStart = PbRegionStart.newBuilder()
+      .setCurrentRegionIndex(0)
+      .setIsBroadcast(true)
+      .build()
+
+    mapMetaHandler.handleEvent(pbRegionStart)
+
+    assert(mapMetaHandler.currentSubpartition === 0)
+    assert(mapMetaHandler.isBroadcastRegion === true)
+    assert(mapMetaHandler.currentDataRegionIndex === 0)
+
+    for (i <- 0 until 10) {
+      val dataBuf: ByteBuf = generateFlinkFormatData(byteBufAllocator, i)
+      mapMetaHandler.beforeWrite(dataBuf)
+    }
+
+    val pbRegionFinish = PbRegionFinish.newBuilder()
+      .build()
+
+    mapMetaHandler.handleEvent(pbRegionFinish)
+
+    assert(mapMetaHandler.indexBuffer.position() === 160)
+    val start = mapMetaHandler.indexBuffer.getLong(0)
+    val len = mapMetaHandler.indexBuffer.getLong(8)
+    assert(0 === start)
+    assert(1024 === len)
+
+    assert(mapMetaHandler.indexChannel !== null)
+    mapMetaHandler.afterClose()
+
+    val file = new File(diskFileInfo.getIndexPath)
+    assert(file.length() == 160)
+    val inputStream = new FileInputStream(file)
+    val array = new Array[Byte](160)
+    inputStream.read(array)
+
+    val readIndex = ByteBuffer.wrap(array)
+    val start2 = readIndex.getLong()
+    val len2 = readIndex.getLong()
+
+    assert(0 === start2)
+    assert(1024 === len2)
+
+  }
+
+  test("test reduce partition meta handler") {
+    val byteBufAllocator = UnpooledByteBufAllocator.DEFAULT
+
+    val tmpFilePath = Files.createTempFile("abc", "def")
+    val fileMeta = new ReduceFileMeta(8192)
+    val diskFileInfo = new DiskFileInfo(
+      UserIdentifier.apply("`a`.`b`"),
+      true,
+      fileMeta,
+      tmpFilePath.toString,
+      StorageInfo.Type.HDD)
+
+    val handler1 = new ReducePartitionMetaHandler(true, diskFileInfo)
+    handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 0))
+    handler1.afterFlush(1024)
+    handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 1))
+    handler1.afterFlush(1024)
+    handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 2))
+    handler1.afterFlush(1024)
+    handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 3))
+    handler1.afterFlush(1024)
+    handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 4))
+    handler1.afterFlush(1024)
+
+    assert(handler1.mapIdBitMap.get.getCardinality === 5)
+
+    assert(diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks 
=== 0)
+
+    handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 5))
+    handler1.afterFlush(1024)
+    handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 6))
+    handler1.afterFlush(1024)
+    handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 7))
+    handler1.afterFlush(1024)
+
+    assert(diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks 
=== 1)
+
+    handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 8))
+    handler1.afterFlush(1024)
+    handler1.afterClose()
+
+    assert(diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks 
== 2)
+  }
+
+  test("test map segment partition meta handler") {
+    val byteBufAllocator = UnpooledByteBufAllocator.DEFAULT
+
+    val tmpFilePath = Files.createTempFile("abc", "def")
+    val fileMeta = new MapFileMeta()
+    val notifier = new FlushNotifier()
+    val diskFileInfo = new DiskFileInfo(
+      UserIdentifier.apply("`a`.`b`"),
+      true,
+      fileMeta,
+      tmpFilePath.toString,
+      StorageInfo.Type.HDD)
+
+    val mapMetaHandler = new SegmentMapPartitionMetaHandler(diskFileInfo, 
notifier)
+    val pbPushDataHandShake =
+      
PbPushDataHandShake.newBuilder().setNumPartitions(10).setBufferSize(1024).build()
+    mapMetaHandler.handleEvent(pbPushDataHandShake)
+
+    assert(10 === mapMetaHandler.numSubpartitions)
+    assert(10 === mapMetaHandler.numSubpartitionBytes.length)
+
+    val pbSegmentStart = PbSegmentStart.newBuilder()
+      .setSubPartitionId(0)
+      .setSegmentId(0)
+      .build()
+
+    mapMetaHandler.handleEvent(pbSegmentStart)
+
+    val pbRegionStart = PbRegionStart.newBuilder()
+      .setCurrentRegionIndex(0)
+      .setIsBroadcast(true)
+      .build()
+
+    mapMetaHandler.handleEvent(pbRegionStart)
+
+    assert(mapMetaHandler.currentSubpartition === 0)
+    assert(mapMetaHandler.isBroadcastRegion === true)
+    assert(mapMetaHandler.currentDataRegionIndex === 0)
+
+    for (i <- 0 until 10) {
+      val dataBuf: ByteBuf = generateFlinkFormatData(byteBufAllocator, 0)
+      mapMetaHandler.beforeWrite(dataBuf)
+      mapMetaHandler.afterWrite(1024)
+    }
+
+    assert(fileMeta.getSegmentIdByFirstBufferIndex(0, 0) === 0)
+
+    val pbRegionFinish = PbRegionFinish.newBuilder()
+      .build()
+
+    mapMetaHandler.handleEvent(pbRegionFinish)
+
+    assert(mapMetaHandler.indexBuffer.position() === 160)
+    val start = mapMetaHandler.indexBuffer.getLong(0)
+    val len = mapMetaHandler.indexBuffer.getLong(8)
+    assert(0 === start)
+    assert(10240 === len)
+
+    assert(mapMetaHandler.indexChannel !== null)
+    mapMetaHandler.afterClose()
+
+    val file = new File(diskFileInfo.getIndexPath)
+    assert(file.length() == 160)
+    val inputStream = new FileInputStream(file)
+    val array = new Array[Byte](160)
+    inputStream.read(array)
+
+    val readIndex = ByteBuffer.wrap(array)
+    val start2 = readIndex.getLong()
+    val len2 = readIndex.getLong()
+
+    assert(0 === start2)
+    assert(10240 === len2)
+
+  }
+}

Reply via email to