This is an automated email from the ASF dual-hosted git repository.
zhongqiangchen pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new b6a7d45f2 [CELEBORN-627][FLINK][FOLLOWUP] Support split partitions
b6a7d45f2 is described below
commit b6a7d45f2b45d1402cd92f07f0f57e8f80c1cefa
Author: zhongqiang.czq <[email protected]>
AuthorDate: Wed Sep 6 22:33:56 2023 +0800
[CELEBORN-627][FLINK][FOLLOWUP] Support split partitions
### What changes were proposed in this pull request?
fix duplicated sending commitFiles for MapPartition and fix not sending
BufferStreamEnd while opening MapPartition split.
### Why are the changes needed?
After open partition split for MapPartition, there are 2 errors.
- ERROR1 : Worker don't send streamend to client because concurrent thread
sync problem . After idle timeout, client will close the channel and throws the
Exception **" xx is lost, notify related stream xx"**
```java
2023-09-06T04:40:47.7549935Z 23/09/06 04:40:47,753 WARN [Keyed Aggregation
-> Map -> Sink: Unnamed (5/8)#0] Task: Keyed Aggregation -> Map -> Sink:
Unnamed (5/8)#0
(c1cade728ddb3a32e0bf72acb1d87588_c27dcf7b54ef6bfd6cff02ca8870b681_4_0)
switched from RUNNING to FAILED with failure cause:
2023-09-06T04:40:47.7550644Z java.io.IOException: Client
localhost/127.0.0.1:38485 is lost, notify related stream 256654410004
2023-09-06T04:40:47.7551219Z at
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:142)
2023-09-06T04:40:47.7551886Z at
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
2023-09-06T04:40:47.7552576Z at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:57)
2023-09-06T04:40:47.7553250Z at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:119)
2023-09-06T04:40:47.7553806Z at
java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1597)
2023-09-06T04:40:47.7554564Z at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:110)
2023-09-06T04:40:47.7555270Z at
org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:71)
2023-09-06T04:40:47.7556005Z at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:136)
2023-09-06T04:40:47.7556710Z at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
2023-09-06T04:40:47.7557370Z at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7558172Z at
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7558803Z at
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
2023-09-06T04:40:47.7559368Z at
io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
2023-09-06T04:40:47.7559954Z at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
2023-09-06T04:40:47.7560589Z at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7561222Z at
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7561829Z at
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
2023-09-06T04:40:47.7562620Z at
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:206)
2023-09-06T04:40:47.7563506Z at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
2023-09-06T04:40:47.7564207Z at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7564829Z at
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7565417Z at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
2023-09-06T04:40:47.7566014Z at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
2023-09-06T04:40:47.7566654Z at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7567317Z at
io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
2023-09-06T04:40:47.7567813Z at
io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
2023-09-06T04:40:47.7568297Z at
io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
2023-09-06T04:40:47.7568830Z at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
2023-09-06T04:40:47.7569402Z at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
2023-09-06T04:40:47.7569894Z at
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
2023-09-06T04:40:47.7570356Z at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
2023-09-06T04:40:47.7570841Z at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
2023-09-06T04:40:47.7571319Z at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2023-09-06T04:40:47.7571721Z at java.lang.Thread.run(Thread.java:750)
```
- ERROR2: Client will send duplicated commitFiles to worker. Becuase of
inconsistency unHandledPartiitions , both batchCommit and finalCommit send
commitFiles
``` java
2023-09-06T04:36:48.3146773Z 23/09/06 04:36:48,314 WARN
[Worker-CommitFiles-1] Controller: Get Partition Location for
1693975002919-61094c8156f918062a5fae12d551bc90-0 0-1 but didn't exist.
```
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ci
Closes #1881 from zhongqiangczq/fix-split-test.
Authored-by: zhongqiang.czq <[email protected]>
Signed-off-by: zhongqiang.czq <[email protected]>
(cherry picked from commit b1e3d661e6e7db255867159b639c3b345566b2e4)
Signed-off-by: zhongqiang.czq <[email protected]>
---
.../client/commit/MapPartitionCommitHandler.scala | 3 +--
tests/flink-it/src/test/resources/log4j2-test.xml | 4 ++--
.../worker/storage/MapDataPartitionReader.java | 9 ++++----
.../service/deploy/worker/PushDataHandler.scala | 24 ++++++++++++++++++----
4 files changed, 28 insertions(+), 12 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
index e75e77bf0..54d05671f 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
@@ -74,7 +74,7 @@ class MapPartitionCommitHandler(
shuffleId: Int,
shuffleCommittedInfo: ShuffleCommittedInfo):
mutable.Set[PartitionLocation] = {
shuffleCommittedInfo.unhandledPartitionLocations.asScala.filterNot {
partitionLocation =>
-
shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) &&
+
shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) ||
isPartitionInProcess(shuffleId, partitionLocation.getId)
}
}
@@ -199,7 +199,6 @@ class MapPartitionCommitHandler(
recordWorkerFailure(commitFailedWorkers)
}
- inProcessingPartitionIds.remove(partitionId)
if (dataCommitSuccess) {
val resultPartitions =
shuffleSucceedPartitionIds.computeIfAbsent(
diff --git a/tests/flink-it/src/test/resources/log4j2-test.xml
b/tests/flink-it/src/test/resources/log4j2-test.xml
index 5fb0cb8ba..607cbb578 100644
--- a/tests/flink-it/src/test/resources/log4j2-test.xml
+++ b/tests/flink-it/src/test/resources/log4j2-test.xml
@@ -16,7 +16,7 @@
~ limitations under the License.
-->
-<Configuration status="INFO">
+<Configuration status="DEBUG">
<Appenders>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}:
%m%n%ex"/>
@@ -29,7 +29,7 @@
</File>
</Appenders>
<Loggers>
- <Root level="INFO">
+ <Root level="DEBUG">
<AppenderRef ref="stdout"/>
<AppenderRef ref="file"/>
</Root>
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
index 06e50c01b..b8f996fe7 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
@@ -71,7 +71,7 @@ public class MapDataPartitionReader implements
Comparable<MapDataPartitionReader
/** Whether all the data has been successfully read or not. */
@GuardedBy("lock")
- private boolean readFinished;
+ private volatile boolean readFinished;
/** Whether this partition reader has been released or not. */
@GuardedBy("lock")
@@ -162,13 +162,14 @@ public class MapDataPartitionReader implements
Comparable<MapDataPartitionReader
addBuffer(buffer, bufferRecycler);
++numDataBuffers;
}
- if (numDataBuffers > 0) {
- notifyBacklog(numDataBuffers);
- }
if (!hasRemaining) {
closeReader();
}
+
+ if (numDataBuffers > 0) {
+ notifyBacklog(numDataBuffers);
+ }
}
private void addBuffer(ByteBuf buffer, BufferRecycler bufferRecycler) {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 963736996..36167ad47 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -940,13 +940,15 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
val isPartitionSplitEnabled = fileWriter.asInstanceOf[
MapPartitionFileWriter].getFileInfo.isPartitionSplitEnabled
- if (shutdown.get() && (messageType == Type.REGION_START || messageType ==
Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
+ if (shutdown.get() && (messageType == Type.REGION_START || messageType ==
+ Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
logInfo(s"$messageType return HARD_SPLIT for shuffle $shuffleKey since
worker shutdown.")
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
return
}
- if (checkSplit && (messageType == Type.REGION_START || messageType ==
Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit(
+ if (checkSplit && (messageType == Type.REGION_START || messageType ==
+ Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled &&
checkDiskFullAndSplit(
fileWriter,
isPrimary,
null,
@@ -1116,7 +1118,14 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
callback: RpcResponseCallback): Boolean = {
val diskFull = checkDiskFull(fileWriter)
logDebug(
- s"CheckDiskFullAndSplit in diskfull: $diskFull,
partitionSplitMinimumSize: $partitionSplitMinimumSize, splitThreshold:
${fileWriter.getSplitThreshold()}, filelength:
${fileWriter.getFileInfo.getFileLength},
filename:${fileWriter.getFileInfo.getFilePath}")
+ s"""
+ |CheckDiskFullAndSplit in
+ |diskFull:$diskFull,
+ |partitionSplitMinimumSize: $partitionSplitMinimumSize,
+ |splitThreshold:${fileWriter.getSplitThreshold()},
+ |fileLength:${fileWriter.getFileInfo.getFileLength}
+ |fileName:${fileWriter.getFileInfo.getFilePath}
+ |""".stripMargin)
if (workerPartitionSplitEnabled && ((diskFull &&
fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
(isPrimary && fileWriter.getFileInfo.getFileLength >
fileWriter.getSplitThreshold()))) {
if (softSplit != null && fileWriter.getSplitMode ==
PartitionSplitMode.SOFT &&
@@ -1125,7 +1134,14 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
} else {
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
logDebug(
- s"CheckDiskFullAndSplit hardsplit diskfull: $diskFull,
partitionSplitMinimumSize: $partitionSplitMinimumSize, splitThreshold:
${fileWriter.getSplitThreshold()}, filelength:
${fileWriter.getFileInfo.getFileLength},
filename:${fileWriter.getFileInfo.getFilePath}")
+ s"""
+ |CheckDiskFullAndSplit hardSplit
+ |diskFull:$diskFull,
+ |partitionSplitMinimumSize:$partitionSplitMinimumSize,
+ |splitThreshold:${fileWriter.getSplitThreshold()},
+ |fileLength:${fileWriter.getFileInfo.getFileLength},
+ |fileName:${fileWriter.getFileInfo.getFilePath}
+ |""".stripMargin)
return true
}
}