This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new df2512994 [CELEBORN-1482][CIP-8] Add partition meta handler
df2512994 is described below
commit df2512994d549eb1d6d6270821ec81ddf66736b2
Author: mingji <[email protected]>
AuthorDate: Thu Jan 9 21:30:17 2025 +0800
[CELEBORN-1482][CIP-8] Add partition meta handler
### What changes were proposed in this pull request?
1. Add partition meta handler to eliminate reduce/map/mapSegment partition
writers.
### Why are the changes needed?
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-8+Refactor+Partition+Data+Writer
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes #3058 from FMX/b1482-1.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../worker/storage/PartitionMetaHandler.scala | 491 +++++++++++++++++++++
.../worker/storage/PartitionMetaHandlerSuite.scala | 251 +++++++++++
2 files changed, 742 insertions(+)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
new file mode 100644
index 000000000..b4dbb9d38
--- /dev/null
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.IOException
+import java.nio.{ByteBuffer, ByteOrder}
+import java.nio.channels.FileChannel
+import java.util
+
+import com.google.common.annotations.VisibleForTesting
+import com.google.protobuf.GeneratedMessageV3
+import io.netty.buffer.ByteBuf
+import org.roaringbitmap.RoaringBitmap
+import org.slf4j.LoggerFactory
+
+import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta,
MemoryFileInfo}
+import org.apache.celeborn.common.protocol.{PbPushDataHandShake,
PbRegionFinish, PbRegionStart, PbSegmentStart}
+import org.apache.celeborn.common.unsafe.Platform
+import org.apache.celeborn.common.util.FileChannelUtils
+
+/**
+ * It's the specific logic for reduce partition writer, map partition writer
and map segment partition writer
+ */
+trait PartitionMetaHandler {
+
+ /**
+ * For reduce partition meta handler, this method won't be invoked.
+ * For map partition meta handler, this method accept 1,2,4 messages.
+ * For map segment partition meta handler, this method accept 1,2,3,4
messages.
+ * @param message This method accept protobuf messages only
+ * There are only four message types can be accepted.
+ * 1. PbPushDataHandShake
+ * 2. PbRegionStart
+ * 3. PbSegmentStart
+ * 4. PbRegionFinish
+ */
+ def handleEvent(message: GeneratedMessageV3): Unit
+
+ /**
+ * For reduce partition meta handler, this method will add map id into
roaringbitmap if rangeReadFilter is on
+ * For map partition meta handler, this method will get partition id from
bytebuf and update the length of this partition
+ * For map segment partition meta handler, this method will check the
segment contains this partition id
+ * @param bytes The bytes that contains shuffle data
+ */
+ def beforeWrite(bytes: ByteBuf): Unit
+
+ /**
+ * For reduce partition meta handler, this method will do nothing
+ * For map partition meta handler, this method will ensure that this region
is not finished
+ * For map segmenta partition meta handler, this method will update segment
index
+ * @param size processed shuffle data size
+ */
+ def afterWrite(size: Int): Unit
+
+ /**
+ * Update file meta about file written bytes changed
+ * @param size the size of data that put into the data flusher
+ */
+ def afterFlush(size: Int): Unit
+
+ /**
+ * For reduce partition meta handler, this method will do nothing
+ * For map partition meta handler, this method will clear its index buffer
+ */
+ def beforeDestroy(): Unit
+
+ /**
+ * For reduce partition meta handler, this method will update file meta's
chunk offsets
+ * For map partition meta handler, this method will flush index buffer to
disk
+ * For map segment partition meta handler, this method will flush index
buffer to disk and clear
+ * segment index
+ */
+ def afterClose(): Unit
+}
+
+class MapPartitionMetaHandler(
+ diskFileInfo: DiskFileInfo,
+ notifier: FlushNotifier) extends PartitionMetaHandler {
+ lazy val hadoopFs = StorageManager.hadoopFs.get()
+ val logger = LoggerFactory.getLogger(classOf[MapPartitionMetaHandler])
+ val fileMeta = diskFileInfo.getFileMeta.asInstanceOf[MapFileMeta]
+ var numSubpartitions = 0
+ var currentDataRegionIndex = 0
+ var isBroadcastRegion = false
+ var numSubpartitionBytes: Array[Long] = null
+ var indexBuffer: ByteBuffer = null
+ var currentSubpartition = 0
+ var totalBytes = 0L
+ var regionStartingOffset = 0L
+ var indexChannel: FileChannel =
+ FileChannelUtils.createWritableFileChannel(diskFileInfo.getIndexPath)
+ @volatile var isRegionFinished = true
+
+ override def handleEvent(message: GeneratedMessageV3): Unit = {
+ // only accept protobuf messages
+ message match {
+ case pb: PbPushDataHandShake =>
+ pushDataHandShake(pb.getNumPartitions, pb.getBufferSize)
+ case pb: PbRegionStart =>
+ regionStart(
+ pb.getCurrentRegionIndex,
+ pb.getIsBroadcast)
+ case pb: PbRegionFinish =>
+ regionFinish()
+ case _ =>
+ // do not handle
+ }
+
+ }
+
+ def pushDataHandShake(numSubpartitions: Int, bufferSize: Int): Unit = {
+ logger.debug(
+ s"FileWriter:${diskFileInfo.getFilePath} " +
+ s"pushDataHandShake numReducePartitions:${numSubpartitions} " +
+ s"bufferSize:${bufferSize}")
+ this.numSubpartitions = numSubpartitions
+ numSubpartitionBytes = new Array[Long](numSubpartitions)
+ fileMeta.setBufferSize(bufferSize)
+ fileMeta.setNumSubPartitions(numSubpartitions)
+ }
+
+ def regionStart(currentDataRegionIndex: Int, isBroadcastRegion: Boolean):
Unit = {
+ logger.debug(
+ s"FileWriter:${diskFileInfo.getFilePath} " +
+ s"regionStart currentDataRegionIndex:${currentDataRegionIndex} " +
+ s"isBroadcastRegion:${isBroadcastRegion}")
+ this.currentSubpartition = 0
+ this.currentDataRegionIndex = currentDataRegionIndex
+ this.isBroadcastRegion = isBroadcastRegion
+ }
+
+ @throws[IOException]
+ def regionFinish(): Unit = {
+ // TODO: When region is finished, flush the data to be ready for the
reading, in scenarios that
+ // the upstream task writes and the downstream task reads simultaneously,
such as flink hybrid
+ // shuffle
+ logger.debug("FileWriter:{} regionFinish", diskFileInfo.getFilePath)
+ if (regionStartingOffset == totalBytes) return
+ var fileOffset = regionStartingOffset
+ if (indexBuffer == null) indexBuffer =
allocateIndexBuffer(numSubpartitions)
+ // write the index information of the current data region
+ for (partitionIndex <- 0 until numSubpartitions) {
+ indexBuffer.putLong(fileOffset)
+ if (!isBroadcastRegion) {
+ logger.debug(
+ s"flush index filename:${diskFileInfo.getFilePath} " +
+ s"region:${currentDataRegionIndex} " +
+ s"partitionId:${partitionIndex} " +
+ s"flush index fileOffset:${fileOffset}, " +
+ s"size:${numSubpartitionBytes(partitionIndex)} ")
+ indexBuffer.putLong(numSubpartitionBytes(partitionIndex))
+ fileOffset += numSubpartitionBytes(partitionIndex)
+ } else {
+ logger.debug(
+ s"flush index broadcast filename:${diskFileInfo.getFilePath} " +
+ s"region:${currentDataRegionIndex} " +
+ s"partitionId:${partitionIndex} " +
+ s"fileOffset:${fileOffset}, " +
+ s"size:${numSubpartitionBytes(0)} ")
+ indexBuffer.putLong(numSubpartitionBytes(0))
+ }
+ }
+ if (!indexBuffer.hasRemaining) flushIndex()
+ regionStartingOffset = totalBytes
+ util.Arrays.fill(numSubpartitionBytes, 0)
+ isRegionFinished = true
+ }
+
+ protected def allocateIndexBuffer(numSubpartitions: Int): ByteBuffer = {
+ // the returned buffer size is no smaller than 4096 bytes to improve disk
IO performance
+ val minBufferSize = 4096
+ val indexRegionSize = numSubpartitions * (8 + 8)
+ if (indexRegionSize >= minBufferSize) {
+ val buffer = ByteBuffer.allocateDirect(indexRegionSize)
+ buffer.order(ByteOrder.BIG_ENDIAN)
+ return buffer
+ }
+ var numRegions = minBufferSize / indexRegionSize
+ if (minBufferSize % indexRegionSize != 0) numRegions += 1
+ val buffer = ByteBuffer.allocateDirect(numRegions * indexRegionSize)
+ buffer.order(ByteOrder.BIG_ENDIAN)
+ buffer
+ }
+
+ @SuppressWarnings(Array("ByteBufferBackingArray"))
+ @throws[IOException]
+ protected def flushIndex(): Unit = {
+ // TODO: force flush the index file channel in scenarios which the
upstream task writes and
+ // downstream task reads simultaneously, such as flink hybrid shuffle
+ if (indexBuffer != null) {
+ logger.debug("flushIndex start:{}", diskFileInfo.getIndexPath)
+ val startTime = System.currentTimeMillis
+ indexBuffer.flip
+ notifier.checkException()
+ try {
+ if (indexBuffer.hasRemaining) {
+ // mappartition synchronously writes file index
+ if (indexChannel != null) while (indexBuffer.hasRemaining)
indexChannel.write(indexBuffer)
+ else if (diskFileInfo.isDFS) {
+ val dfsStream = hadoopFs.append(diskFileInfo.getDfsIndexPath)
+ dfsStream.write(indexBuffer.array)
+ dfsStream.close()
+ }
+ }
+ indexBuffer.clear
+ } finally logger.debug(
+ s"flushIndex end:${diskFileInfo.getIndexPath}, " +
+ s"cost:${System.currentTimeMillis - startTime}")
+ }
+ }
+
+ @throws[InterruptedException]
+ def checkPartitionRegionFinished(timeout: Long): Boolean = {
+ val delta = 100
+ var times = 0
+ while (delta * times < timeout) {
+ if (this.isRegionFinished) return true
+ Thread.sleep(delta)
+ times += 1
+ }
+ false
+ }
+
+ def getCurrentSubpartition: Int = currentSubpartition
+
+ def setCurrentSubpartition(currentSubpartition: Int): Unit = {
+ this.currentSubpartition = currentSubpartition
+ }
+
+ def getNumSubpartitionBytes: Array[Long] = numSubpartitionBytes
+
+ def getTotalBytes: Long = totalBytes
+
+ def setTotalBytes(totalBytes: Long): Unit = {
+ this.totalBytes = totalBytes
+ }
+
+ def setRegionFinished(regionFinished: Boolean): Unit = {
+ isRegionFinished = regionFinished
+ }
+
+ override def afterFlush(size: Int): Unit = {
+ diskFileInfo.updateBytesFlushed(size)
+ }
+
+ override def afterClose(): Unit = {
+ // TODO: force flush the index file channel in scenarios which the
upstream task writes and
+ // downstream task reads simultaneously, such as flink hybrid shuffle
+ if (indexBuffer != null) {
+ logger.debug(s"flushIndex start:${diskFileInfo.getIndexPath}")
+ val startTime = System.currentTimeMillis
+ indexBuffer.flip
+ notifier.checkException()
+ try {
+ if (indexBuffer.hasRemaining) {
+ // mappartition synchronously writes file index
+ if (indexChannel != null) while (indexBuffer.hasRemaining)
indexChannel.write(indexBuffer)
+ else if (diskFileInfo.isDFS) {
+ val dfsStream = hadoopFs.append(diskFileInfo.getDfsIndexPath)
+ dfsStream.write(indexBuffer.array)
+ dfsStream.close()
+ }
+ }
+ indexBuffer.clear
+ } finally logger.debug(
+ s"flushIndex end:${diskFileInfo.getIndexPath}, " +
+ s"cost:${System.currentTimeMillis - startTime}")
+ }
+ }
+
+ override def beforeWrite(bytes: ByteBuf): Unit = {
+ bytes.markReaderIndex()
+ val partitionId = bytes.readInt
+ val attemptId = bytes.readInt
+ val batchId = bytes.readInt
+ val size = bytes.readInt
+ bytes.resetReaderIndex()
+ logger.debug(
+ s"map partition filename:${diskFileInfo.getFilePath} " +
+ s"write partition:${partitionId} " +
+ s"attemptId:${attemptId} " +
+ s"batchId:${batchId} " +
+ s"size:${size}")
+
+ if (partitionId < currentSubpartition) throw new IOException(
+ s"Must writing data in reduce partition index order, " +
+ s"but now partitionId is ${partitionId} " +
+ s"and pre partitionId is ${currentSubpartition}")
+
+ if (partitionId > currentSubpartition) currentSubpartition = partitionId
+ val length = bytes.readableBytes
+ totalBytes += length
+ numSubpartitionBytes(partitionId) += length
+ }
+
+ override def afterWrite(size: Int): Unit = {
+ isRegionFinished = false
+ }
+
+ override def beforeDestroy(): Unit = {
+ try if (indexChannel != null) indexChannel.close()
+ catch {
+ case e: IOException =>
+ logger.warn(
+ s"Close channel failed for file ${diskFileInfo.getIndexPath} caused
by {}.",
+ e.getMessage)
+ }
+ }
+
+}
+
+class ReducePartitionMetaHandler(val rangeReadFilter: Boolean, val fileInfo:
FileInfo)
+ extends PartitionMetaHandler {
+ val logger = LoggerFactory.getLogger(classOf[MapPartitionMetaHandler])
+ lazy val mapIdBitMap: Option[RoaringBitmap] =
+ if (rangeReadFilter) Some(new RoaringBitmap()) else None
+
+ override def afterFlush(size: Int): Unit = {
+ fileInfo.updateBytesFlushed(size)
+ }
+
+ override def afterClose(): Unit = {
+ // update offset if it is not matched
+ if (!isChunkOffsetValid()) {
+ fileInfo.getReduceFileMeta.updateChunkOffset(fileInfo.getFileLength,
true)
+ }
+ }
+
+ private def isChunkOffsetValid(): Boolean = {
+ // Consider a scenario where some bytes have been flushed
+ // but the chunk offset boundary has not yet been updated.
+ // we should check if the chunk offset boundary equals
+ // bytesFlush or not. For example:
+ // The last record is a giant record and it has been flushed
+ // but its size is smaller than the nextBoundary, then the
+ // chunk offset will not be set after flushing. we should
+ // set it during FileWriter close.
+ if (fileInfo.isInstanceOf[DiskFileInfo]) {
+ val diskFileInfo = fileInfo.asInstanceOf[DiskFileInfo]
+ diskFileInfo.getReduceFileMeta.getLastChunkOffset ==
diskFileInfo.getFileLength
+ }
+ if (fileInfo.isInstanceOf[MemoryFileInfo]) {
+ val memoryFileInfo = fileInfo.asInstanceOf[MemoryFileInfo]
+ memoryFileInfo.getReduceFileMeta.getLastChunkOffset ==
memoryFileInfo.getFileLength
+ }
+ // this should not happen
+ false
+ }
+
+ override def beforeWrite(bytes: ByteBuf): Unit = {
+ if (rangeReadFilter) {
+ val mapId = getMapIdFromBuf(bytes)
+ mapIdBitMap.get.add(mapId)
+ }
+ }
+
+ override def afterWrite(size: Int): Unit = {}
+
+ def getMapIdFromBuf(buf: ByteBuf): Int = {
+ if (rangeReadFilter) {
+ val header = new Array[Byte](4)
+ buf.markReaderIndex
+ buf.readBytes(header)
+ buf.resetReaderIndex
+ Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET)
+ } else {
+ 0
+ }
+ }
+
+ def getMapIdBitmap(): Option[RoaringBitmap] = {
+ mapIdBitMap
+ }
+
+ override def beforeDestroy(): Unit = {}
+
+ override def handleEvent(message: GeneratedMessageV3): Unit = {
+ // reduce partition have no message to handle
+ }
+}
+
+class SegmentMapPartitionMetaHandler(diskFileInfo: DiskFileInfo, notifier:
FlushNotifier)
+ extends MapPartitionMetaHandler(diskFileInfo, notifier) {
+
+ @VisibleForTesting
+ val subPartitionHasStartSegment: util.Map[Integer, Boolean] =
+ new util.HashMap[Integer, Boolean];
+ @VisibleForTesting
+ var subPartitionBufferIndex: Array[Int] = null
+ private var dataHeaders: List[Int] = _
+
+ override def handleEvent(message: GeneratedMessageV3): Unit = {
+ super.handleEvent(message)
+ message match {
+ case pb: PbSegmentStart =>
+ segmentStart(pb.getSubPartitionId, pb.getSegmentId)
+ case _ =>
+ // do not handle
+ }
+ }
+
+ def segmentStart(subPartitionId: Int, segmentId: Int): Unit = {
+ fileMeta.addPartitionSegmentId(
+ subPartitionId,
+ segmentId)
+ subPartitionHasStartSegment.put(subPartitionId, true)
+ }
+
+ override def pushDataHandShake(numSubpartitions: Int, bufferSize: Int): Unit
= {
+ super.pushDataHandShake(numSubpartitions, bufferSize)
+ subPartitionBufferIndex = new Array[Int](numSubpartitions)
+ util.Arrays.fill(subPartitionBufferIndex, 0)
+ fileMeta.setIsWriterClosed(false)
+ fileMeta.setSegmentGranularityVisible(true)
+ }
+
+ override def afterFlush(size: Int): Unit = {
+ diskFileInfo.updateBytesFlushed(size)
+ }
+
+ override def afterClose(): Unit = {
+ subPartitionHasStartSegment.clear()
+ super.afterClose()
+ logger.debug(s"Close ${this} for file ${diskFileInfo.getFile}")
+ fileMeta.setIsWriterClosed(true)
+ }
+
+ override def beforeWrite(bytes: ByteBuf): Unit = {
+ bytes.markReaderIndex
+ val subPartitionId = bytes.readInt
+ val attemptId = bytes.readInt
+ val batchId = bytes.readInt
+ val size = bytes.readInt
+ dataHeaders = List(subPartitionId, attemptId, batchId, size)
+
+ if (!subPartitionHasStartSegment.containsKey(subPartitionId))
+ throw new IllegalStateException(String.format(
+ s"This partition may not start a segment:
subPartitionId:${subPartitionId} attemptId:${attemptId} batchId:${batchId}
size:${size}"))
+ val currentSubpartition = getCurrentSubpartition
+ // the subPartitionId must be ordered in a region// the subPartitionId
must be ordered in a region
+ if (subPartitionId < currentSubpartition) throw new
IOException(String.format(
+ s"Must writing data in reduce partition index order, but now
supPartitionId is ${subPartitionId} and the previous supPartitionId is
${currentSubpartition}, attemptId is ${attemptId}, batchId is ${batchId}, size
is ${size}"))
+ bytes.resetReaderIndex
+ logger.debug(
+ s"mappartition filename:${diskFileInfo.getFilePath} write
partition:${subPartitionId} currentSubPartition:${currentSubpartition}
attemptId:${attemptId} batchId:${batchId} size:${size}")
+ if (subPartitionId > currentSubpartition)
setCurrentSubpartition(subPartitionId)
+ val length = bytes.readableBytes
+ setTotalBytes(getTotalBytes + length)
+ getNumSubpartitionBytes(subPartitionId) += length
+
+ }
+
+ override def afterWrite(size: Int): Unit = {
+ super.afterWrite(size)
+ val subPartitionId = dataHeaders(0)
+ val attemptId = dataHeaders(1)
+ if (subPartitionHasStartSegment.get(subPartitionId)) {
+ fileMeta.addSegmentIdAndFirstBufferIndex(
+ subPartitionId,
+ subPartitionBufferIndex(subPartitionId),
+ fileMeta.getPartitionWritingSegmentId(subPartitionId))
+ logger.debug(
+ s"Add a segment id, partitionId:${subPartitionId}, " +
+ s"bufferIndex:${subPartitionBufferIndex(subPartitionId)}, " +
+
s"segmentId:${fileMeta.getPartitionWritingSegmentId(subPartitionId)}, " +
+ s"filename:${diskFileInfo.getFilePath}, " +
+ s"attemptId:${attemptId}.")
+ // After the first buffer index of the segment is added, the following
buffers in the segment
+ // should not be added anymore, so the subPartitionHasStartSegment is
updated to false.
+ subPartitionHasStartSegment.put(subPartitionId, false)
+ }
+ subPartitionBufferIndex(subPartitionId) += 1
+ }
+
+ override def beforeDestroy(): Unit = super.beforeDestroy()
+}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
new file mode 100644
index 000000000..93bdc184d
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandlerSuite.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.{File, FileInputStream}
+import java.nio.ByteBuffer
+import java.nio.file.Files
+
+import io.netty.buffer.{ByteBuf, UnpooledByteBufAllocator}
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.identity.UserIdentifier
+import org.apache.celeborn.common.meta.{DiskFileInfo, MapFileMeta,
ReduceFileMeta}
+import org.apache.celeborn.common.protocol._
+import org.apache.celeborn.common.unsafe.Platform
+
+class PartitionMetaHandlerSuite extends CelebornFunSuite with MockitoHelper {
+
+ private def generateFlinkFormatData(
+ byteBufAllocator: UnpooledByteBufAllocator,
+ partitionId: Int) = {
+ val dataBuf = byteBufAllocator.buffer(1024)
+ // partitionId attemptId batchId size
+ dataBuf.writeInt(partitionId)
+ dataBuf.writeInt(0)
+ dataBuf.writeInt(0)
+ dataBuf.writeInt(1008)
+ for (i <- 1 to 1008) {
+ dataBuf.writeByte(1)
+ }
+ assert(1024 === dataBuf.readableBytes())
+ dataBuf
+ }
+
+ private def generateSparkFormatData(
+ byteBufAllocator: UnpooledByteBufAllocator,
+ attemptId: Int) = {
+ val dataBuf = byteBufAllocator.buffer(1024)
+ val arr = new Array[Byte](1024)
+ // mapId attemptId batchId size
+ Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET, attemptId)
+ Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 4, 0)
+ Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 8, 0)
+ Platform.putInt(arr, Platform.BYTE_ARRAY_OFFSET + 12, 1008)
+
+ dataBuf.writeBytes(arr)
+ assert(1024 === dataBuf.readableBytes())
+ dataBuf
+ }
+
+ test("test map partition meta handler") {
+ val byteBufAllocator = UnpooledByteBufAllocator.DEFAULT
+
+ val tmpFilePath = Files.createTempFile("abc", "def")
+ val fileMeta = new MapFileMeta()
+ val notifier = new FlushNotifier()
+ val diskFileInfo = new DiskFileInfo(
+ UserIdentifier.apply("`a`.`b`"),
+ true,
+ fileMeta,
+ tmpFilePath.toString,
+ StorageInfo.Type.HDD)
+
+ val mapMetaHandler = new MapPartitionMetaHandler(diskFileInfo, notifier)
+ val pbPushDataHandShake =
+
PbPushDataHandShake.newBuilder().setNumPartitions(10).setBufferSize(1024).build()
+ mapMetaHandler.handleEvent(pbPushDataHandShake)
+
+ assert(10 === mapMetaHandler.numSubpartitions)
+ assert(10 === mapMetaHandler.numSubpartitionBytes.length)
+
+ val pbRegionStart = PbRegionStart.newBuilder()
+ .setCurrentRegionIndex(0)
+ .setIsBroadcast(true)
+ .build()
+
+ mapMetaHandler.handleEvent(pbRegionStart)
+
+ assert(mapMetaHandler.currentSubpartition === 0)
+ assert(mapMetaHandler.isBroadcastRegion === true)
+ assert(mapMetaHandler.currentDataRegionIndex === 0)
+
+ for (i <- 0 until 10) {
+ val dataBuf: ByteBuf = generateFlinkFormatData(byteBufAllocator, i)
+ mapMetaHandler.beforeWrite(dataBuf)
+ }
+
+ val pbRegionFinish = PbRegionFinish.newBuilder()
+ .build()
+
+ mapMetaHandler.handleEvent(pbRegionFinish)
+
+ assert(mapMetaHandler.indexBuffer.position() === 160)
+ val start = mapMetaHandler.indexBuffer.getLong(0)
+ val len = mapMetaHandler.indexBuffer.getLong(8)
+ assert(0 === start)
+ assert(1024 === len)
+
+ assert(mapMetaHandler.indexChannel !== null)
+ mapMetaHandler.afterClose()
+
+ val file = new File(diskFileInfo.getIndexPath)
+ assert(file.length() == 160)
+ val inputStream = new FileInputStream(file)
+ val array = new Array[Byte](160)
+ inputStream.read(array)
+
+ val readIndex = ByteBuffer.wrap(array)
+ val start2 = readIndex.getLong()
+ val len2 = readIndex.getLong()
+
+ assert(0 === start2)
+ assert(1024 === len2)
+
+ }
+
+ test("test reduce partition meta handler") {
+ val byteBufAllocator = UnpooledByteBufAllocator.DEFAULT
+
+ val tmpFilePath = Files.createTempFile("abc", "def")
+ val fileMeta = new ReduceFileMeta(8192)
+ val diskFileInfo = new DiskFileInfo(
+ UserIdentifier.apply("`a`.`b`"),
+ true,
+ fileMeta,
+ tmpFilePath.toString,
+ StorageInfo.Type.HDD)
+
+ val handler1 = new ReducePartitionMetaHandler(true, diskFileInfo)
+ handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 0))
+ handler1.afterFlush(1024)
+ handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 1))
+ handler1.afterFlush(1024)
+ handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 2))
+ handler1.afterFlush(1024)
+ handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 3))
+ handler1.afterFlush(1024)
+ handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 4))
+ handler1.afterFlush(1024)
+
+ assert(handler1.mapIdBitMap.get.getCardinality === 5)
+
+ assert(diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks
=== 0)
+
+ handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 5))
+ handler1.afterFlush(1024)
+ handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 6))
+ handler1.afterFlush(1024)
+ handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 7))
+ handler1.afterFlush(1024)
+
+ assert(diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks
=== 1)
+
+ handler1.beforeWrite(generateSparkFormatData(byteBufAllocator, 8))
+ handler1.afterFlush(1024)
+ handler1.afterClose()
+
+ assert(diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getNumChunks
== 2)
+ }
+
+ test("test map segment partition meta handler") {
+ val byteBufAllocator = UnpooledByteBufAllocator.DEFAULT
+
+ val tmpFilePath = Files.createTempFile("abc", "def")
+ val fileMeta = new MapFileMeta()
+ val notifier = new FlushNotifier()
+ val diskFileInfo = new DiskFileInfo(
+ UserIdentifier.apply("`a`.`b`"),
+ true,
+ fileMeta,
+ tmpFilePath.toString,
+ StorageInfo.Type.HDD)
+
+ val mapMetaHandler = new SegmentMapPartitionMetaHandler(diskFileInfo,
notifier)
+ val pbPushDataHandShake =
+
PbPushDataHandShake.newBuilder().setNumPartitions(10).setBufferSize(1024).build()
+ mapMetaHandler.handleEvent(pbPushDataHandShake)
+
+ assert(10 === mapMetaHandler.numSubpartitions)
+ assert(10 === mapMetaHandler.numSubpartitionBytes.length)
+
+ val pbSegmentStart = PbSegmentStart.newBuilder()
+ .setSubPartitionId(0)
+ .setSegmentId(0)
+ .build()
+
+ mapMetaHandler.handleEvent(pbSegmentStart)
+
+ val pbRegionStart = PbRegionStart.newBuilder()
+ .setCurrentRegionIndex(0)
+ .setIsBroadcast(true)
+ .build()
+
+ mapMetaHandler.handleEvent(pbRegionStart)
+
+ assert(mapMetaHandler.currentSubpartition === 0)
+ assert(mapMetaHandler.isBroadcastRegion === true)
+ assert(mapMetaHandler.currentDataRegionIndex === 0)
+
+ for (i <- 0 until 10) {
+ val dataBuf: ByteBuf = generateFlinkFormatData(byteBufAllocator, 0)
+ mapMetaHandler.beforeWrite(dataBuf)
+ mapMetaHandler.afterWrite(1024)
+ }
+
+ assert(fileMeta.getSegmentIdByFirstBufferIndex(0, 0) === 0)
+
+ val pbRegionFinish = PbRegionFinish.newBuilder()
+ .build()
+
+ mapMetaHandler.handleEvent(pbRegionFinish)
+
+ assert(mapMetaHandler.indexBuffer.position() === 160)
+ val start = mapMetaHandler.indexBuffer.getLong(0)
+ val len = mapMetaHandler.indexBuffer.getLong(8)
+ assert(0 === start)
+ assert(10240 === len)
+
+ assert(mapMetaHandler.indexChannel !== null)
+ mapMetaHandler.afterClose()
+
+ val file = new File(diskFileInfo.getIndexPath)
+ assert(file.length() == 160)
+ val inputStream = new FileInputStream(file)
+ val array = new Array[Byte](160)
+ inputStream.read(array)
+
+ val readIndex = ByteBuffer.wrap(array)
+ val start2 = readIndex.getLong()
+ val len2 = readIndex.getLong()
+
+ assert(0 === start2)
+ assert(10240 === len2)
+
+ }
+}