This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 90ece9665 [CELEBORN-2002][MASTER] Audit shuffle lifecycle in separate
log file
90ece9665 is described below
commit 90ece9665c70a30583aed8596e788e3edb4432bf
Author: Wang, Fei <[email protected]>
AuthorDate: Tue May 20 05:45:34 2025 -0700
[CELEBORN-2002][MASTER] Audit shuffle lifecycle in separate log file
### What changes were proposed in this pull request?
Audit shuffle lifecycle in separate log file
- OFFER_SLOTS
- EXPIRE
- REVIVE
- UNREGISTER
### Why are the changes needed?
Remove redundant logs of expired shuffle in master-worker heartbeat, see
https://github.com/apache/celeborn/pull/3244
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
```
(base) ➜ celeborn git:(shuffle_audit) grep ShuffleAuditLogger
tests/spark-it/target/unit-tests.log
25/05/19 20:05:27,031 INFO [celeborn-dispatcher-41] ShuffleAuditLogger:
shuffleKey=local-1747710326897-0 op=OFFER_SLOTS numReducers=4
workerNum=5 extraSlots=1
25/05/19 20:05:27,719 INFO [celeborn-dispatcher-44] ShuffleAuditLogger:
shuffleKey=local-1747710326897-1 op=OFFER_SLOTS numReducers=2
workerNum=5 extraSlots=3
25/05/19 20:05:28,094 INFO [celeborn-dispatcher-47] ShuffleAuditLogger:
shuffleKey=local-1747710326897-2 op=OFFER_SLOTS numReducers=2
workerNum=5 extraSlots=3
25/05/19 20:05:28,467 INFO [celeborn-dispatcher-52] ShuffleAuditLogger:
shuffleKey=local-1747710326897-3 op=OFFER_SLOTS numReducers=8
workerNum=5 extraSlots=0
25/05/19 20:05:28,769 INFO [celeborn-dispatcher-53] ShuffleAuditLogger:
shuffleKey=local-1747710326897-4 op=OFFER_SLOTS numReducers=8
workerNum=5 extraSlots=0
25/05/19 20:05:29,720 INFO [celeborn-dispatcher-56] ShuffleAuditLogger:
shuffleKey=local-1747710326897-5 op=OFFER_SLOTS numReducers=200
workerNum=5 extraSlots=0
25/05/19 20:05:30,349 INFO [celeborn-dispatcher-59] ShuffleAuditLogger:
shuffleKey=local-1747710326897-6 op=OFFER_SLOTS numReducers=4
workerNum=5 extraSlots=1
25/05/19 20:05:40,534 INFO [celeborn-dispatcher-11] ShuffleAuditLogger:
shuffleKey=local-1747710340484-0 op=OFFER_SLOTS numReducers=4
workerNum=5 extraSlots=1
25/05/19 20:05:41,101 INFO [celeborn-dispatcher-14] ShuffleAuditLogger:
shuffleKey=local-1747710340484-1 op=OFFER_SLOTS numReducers=2
workerNum=5 extraSlots=3
25/05/19 20:05:41,480 INFO [celeborn-dispatcher-17] ShuffleAuditLogger:
shuffleKey=local-1747710340484-2 op=OFFER_SLOTS numReducers=2
workerNum=5 extraSlots=3
25/05/19 20:05:41,848 INFO [celeborn-dispatcher-26] ShuffleAuditLogger:
shuffleKey=local-1747710340484-3 op=OFFER_SLOTS numReducers=8
workerNum=5 extraSlots=0
25/05/19 20:05:42,136 INFO [celeborn-dispatcher-18] ShuffleAuditLogger:
shuffleKey=local-1747710340484-4 op=OFFER_SLOTS numReducers=8
workerNum=5 extraSlots=0
25/05/19 20:05:43,058 INFO [celeborn-dispatcher-21] ShuffleAuditLogger:
shuffleKey=local-1747710340484-5 op=OFFER_SLOTS numReducers=200
workerNum=5 extraSlots=0
25/05/19 20:05:43,542 INFO [celeborn-dispatcher-31] ShuffleAuditLogger:
shuffleKey=local-1747710340484-6 op=OFFER_SLOTS numReducers=4
workerNum=5 extraSlots=1
25/05/19 20:05:44,436 INFO [celeborn-dispatcher-29] ShuffleAuditLogger:
shuffleKeys=local-1747710326897-0,local-1747710326897-1,local-1747710326897-2,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5
op=EXPIRE worker=127.0.0.1:59932:59934:59948:59941
25/05/19 20:05:44,436 INFO [celeborn-dispatcher-27] ShuffleAuditLogger:
shuffleKeys=local-1747710326897-0,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6
op=EXPIRE worker=127.0.0.1:59930:59938:59944:59940
25/05/19 20:05:44,436 INFO [celeborn-dispatcher-32] ShuffleAuditLogger:
shuffleKeys=local-1747710326897-1,local-1747710326897-2,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6
op=EXPIRE worker=127.0.0.1:59931:59936:59945:59939
25/05/19 20:05:44,436 INFO [celeborn-dispatcher-33] ShuffleAuditLogger:
shuffleKeys=local-1747710326897-0,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6
op=EXPIRE worker=127.0.0.1:59933:59935:59946:59943
25/05/19 20:05:44,436 INFO [celeborn-dispatcher-28] ShuffleAuditLogger:
shuffleKeys=local-1747710326897-0,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6
op=EXPIRE worker=127.0.0.1:59929:59937:59947:59942
```
Closes #3265 from turboFei/shuffle_audit.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
conf/log4j2.xml.template | 20 ++++++++++
.../celeborn/service/deploy/master/Master.scala | 24 ++++++++++--
.../deploy/master/audit/ShuffleAuditLogger.scala | 44 ++++++++++++++++++++++
3 files changed, 84 insertions(+), 4 deletions(-)
diff --git a/conf/log4j2.xml.template b/conf/log4j2.xml.template
index efd1511e4..22048d582 100644
--- a/conf/log4j2.xml.template
+++ b/conf/log4j2.xml.template
@@ -66,6 +66,23 @@
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
+ <RollingRandomAccessFile name="shuffleAuditFile"
fileName="${env:CELEBORN_LOG_DIR}/audit/shuffle-audit.log"
+
filePattern="${env:CELEBORN_LOG_DIR}/audit/shuffle-audit.log.%d-%i">
+ <PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}:
%m%n%ex"/>
+ <Policies>
+ <SizeBasedTriggeringPolicy size="200 MB"/>
+ </Policies>
+ <DefaultRolloverStrategy max="7">
+ <Delete basePath="${env:CELEBORN_LOG_DIR}/audit" maxDepth="1">
+ <IfFileName glob="shuffle-audit.log*">
+ <IfAny>
+ <IfAccumulatedFileSize exceeds="1 GB"/>
+ <IfAccumulatedFileCount exceeds="10"/>
+ </IfAny>
+ </IfFileName>
+ </Delete>
+ </DefaultRolloverStrategy>
+ </RollingRandomAccessFile>
</Appenders>
<Loggers>
@@ -87,5 +104,8 @@
<Logger name="org.apache.celeborn.server.common.http.RestAuditLogger"
level="INFO" additivity="false">
<Appender-ref ref="restAuditFile" level="INFO"/>
</Logger>
+ <Logger
name="org.apache.celeborn.service.deploy.master.audit.ShuffleAuditLogger"
level="INFO" additivity="false">
+ <Appender-ref ref="shuffleAuditFile" level="INFO"/>
+ </Logger>
</Loggers>
</Configuration>
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index dfe4e03a1..8316f1706 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -51,6 +51,7 @@ import org.apache.celeborn.common.rpc._
import org.apache.celeborn.common.rpc.{RpcSecurityContextBuilder,
ServerSaslContextBuilder}
import org.apache.celeborn.common.util.{CelebornHadoopUtils, JavaUtils,
PbSerDeUtils, SignalUtils, ThreadUtils, Utils}
import org.apache.celeborn.server.common.{HttpService, Service}
+import org.apache.celeborn.service.deploy.master.audit.ShuffleAuditLogger
import
org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper,
HAMasterMetaManager, MetaHandler}
import org.apache.celeborn.service.deploy.master.quota.QuotaManager
@@ -706,13 +707,13 @@ private[celeborn] class Master(
val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
val shuffleIds = statusSystem.registeredAppAndShuffles.get(appId)
if (shuffleIds == null || !shuffleIds.contains(shuffleId)) {
- logWarning(
- s"Shuffle $shuffleKey expired on
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
expiredShuffleKeys.add(shuffleKey)
}
}
- logDebug(
- s"Shuffle ${expiredShuffleKeys.asScala.mkString("[", " ,", "]")} expired
on ${targetWorker.toUniqueId}.")
+ ShuffleAuditLogger.batchAudit(
+ expiredShuffleKeys.asScala.mkString(","),
+ "EXPIRE",
+ Seq(s"worker=${targetWorker.toUniqueId}"))
val workerEventInfo = statusSystem.workerEventInfos.get(targetWorker)
if (workerEventInfo == null) {
@@ -735,6 +736,11 @@ private[celeborn] class Master(
try {
logInfo(s"Handle lost shuffles for ${appId} ${lostShuffles} ")
statusSystem.handleReviseLostShuffles(appId, lostShuffles, requestId);
+ ShuffleAuditLogger.batchAudit(
+ lostShuffles.asScala.map { shuffleId =>
+ Utils.makeShuffleKey(appId, shuffleId)
+ }.mkString(","),
+ "REVIVE")
if (context != null) {
context.reply(ReviseLostShufflesResponse(true, ""))
}
@@ -984,6 +990,14 @@ private[celeborn] class Master(
logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
}
+ ShuffleAuditLogger.audit(
+ shuffleKey,
+ "OFFER_SLOTS",
+ Seq(
+ s"numReducers=$numReducers",
+ s"workerNum=${slots.size()}",
+ s"extraSlots=$offerSlotsExtraSize"))
+
if (authEnabled) {
pushApplicationMetaToWorkers(requestSlots, slots)
}
@@ -1035,6 +1049,7 @@ private[celeborn] class Master(
val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
statusSystem.handleUnRegisterShuffle(shuffleKey, requestId)
logInfo(s"Unregister shuffle $shuffleKey")
+ ShuffleAuditLogger.audit(shuffleKey, "UNREGISTER")
context.reply(UnregisterShuffleResponse(StatusCode.SUCCESS))
}
@@ -1047,6 +1062,7 @@ private[celeborn] class Master(
shuffleIds.map(shuffleId => Utils.makeShuffleKey(applicationId,
shuffleId)).asJava
statusSystem.handleBatchUnRegisterShuffles(shuffleKeys, requestId)
logInfo(s"BatchUnregister shuffle $shuffleKeys")
+ ShuffleAuditLogger.batchAudit(shuffleKeys.asScala.mkString(","),
"BATCH_UNREGISTER")
context.reply(BatchUnregisterShuffleResponse(StatusCode.SUCCESS,
shuffleIds.asJava))
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/audit/ShuffleAuditLogger.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/audit/ShuffleAuditLogger.scala
new file mode 100644
index 000000000..ba5b0261a
--- /dev/null
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/audit/ShuffleAuditLogger.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master.audit
+
+import org.apache.celeborn.common.internal.Logging
+
+object ShuffleAuditLogger extends Logging {
+ final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() {
+ override protected def initialValue: StringBuilder = new StringBuilder()
+ }
+
+ def audit(shuffleKey: String, op: String, labels: Seq[String] = Seq.empty):
Unit = {
+ val sb = AUDIT_BUFFER.get()
+ sb.setLength(0)
+ sb.append(s"shuffleKey=$shuffleKey").append("\t")
+ sb.append(s"op=$op")
+ if (labels.nonEmpty) sb.append(labels.mkString("\t", "\t", ""))
+ logInfo(sb.toString())
+ }
+
+ def batchAudit(shuffleKeys: String, op: String, labels: Seq[String] =
Seq.empty): Unit = {
+ val sb = AUDIT_BUFFER.get()
+ sb.setLength(0)
+ sb.append(s"shuffleKeys=$shuffleKeys").append("\t")
+ sb.append(s"op=$op")
+ if (labels.nonEmpty) sb.append(labels.mkString("\t", "\t", ""))
+ logInfo(sb.toString())
+ }
+}