This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 90ece9665 [CELEBORN-2002][MASTER] Audit shuffle lifecycle in separate 
log file
90ece9665 is described below

commit 90ece9665c70a30583aed8596e788e3edb4432bf
Author: Wang, Fei <[email protected]>
AuthorDate: Tue May 20 05:45:34 2025 -0700

    [CELEBORN-2002][MASTER] Audit shuffle lifecycle in separate log file
    
    ### What changes were proposed in this pull request?
    Audit shuffle lifecycle in separate log file
    - OFFER_SLOTS
    - EXPIRE
    - REVIVE
    - UNREGISTER
    
    ### Why are the changes needed?
     Remove redundant logs of expired shuffle in master-worker heartbeat, see 
https://github.com/apache/celeborn/pull/3244
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    ```
    (base) ➜  celeborn git:(shuffle_audit) grep ShuffleAuditLogger 
tests/spark-it/target/unit-tests.log
    25/05/19 20:05:27,031 INFO [celeborn-dispatcher-41] ShuffleAuditLogger: 
shuffleKey=local-1747710326897-0        op=OFFER_SLOTS  numReducers=4   
workerNum=5     extraSlots=1
    25/05/19 20:05:27,719 INFO [celeborn-dispatcher-44] ShuffleAuditLogger: 
shuffleKey=local-1747710326897-1        op=OFFER_SLOTS  numReducers=2   
workerNum=5     extraSlots=3
    25/05/19 20:05:28,094 INFO [celeborn-dispatcher-47] ShuffleAuditLogger: 
shuffleKey=local-1747710326897-2        op=OFFER_SLOTS  numReducers=2   
workerNum=5     extraSlots=3
    25/05/19 20:05:28,467 INFO [celeborn-dispatcher-52] ShuffleAuditLogger: 
shuffleKey=local-1747710326897-3        op=OFFER_SLOTS  numReducers=8   
workerNum=5     extraSlots=0
    25/05/19 20:05:28,769 INFO [celeborn-dispatcher-53] ShuffleAuditLogger: 
shuffleKey=local-1747710326897-4        op=OFFER_SLOTS  numReducers=8   
workerNum=5     extraSlots=0
    25/05/19 20:05:29,720 INFO [celeborn-dispatcher-56] ShuffleAuditLogger: 
shuffleKey=local-1747710326897-5        op=OFFER_SLOTS  numReducers=200 
workerNum=5     extraSlots=0
    25/05/19 20:05:30,349 INFO [celeborn-dispatcher-59] ShuffleAuditLogger: 
shuffleKey=local-1747710326897-6        op=OFFER_SLOTS  numReducers=4   
workerNum=5     extraSlots=1
    25/05/19 20:05:40,534 INFO [celeborn-dispatcher-11] ShuffleAuditLogger: 
shuffleKey=local-1747710340484-0        op=OFFER_SLOTS  numReducers=4   
workerNum=5     extraSlots=1
    25/05/19 20:05:41,101 INFO [celeborn-dispatcher-14] ShuffleAuditLogger: 
shuffleKey=local-1747710340484-1        op=OFFER_SLOTS  numReducers=2   
workerNum=5     extraSlots=3
    25/05/19 20:05:41,480 INFO [celeborn-dispatcher-17] ShuffleAuditLogger: 
shuffleKey=local-1747710340484-2        op=OFFER_SLOTS  numReducers=2   
workerNum=5     extraSlots=3
    25/05/19 20:05:41,848 INFO [celeborn-dispatcher-26] ShuffleAuditLogger: 
shuffleKey=local-1747710340484-3        op=OFFER_SLOTS  numReducers=8   
workerNum=5     extraSlots=0
    25/05/19 20:05:42,136 INFO [celeborn-dispatcher-18] ShuffleAuditLogger: 
shuffleKey=local-1747710340484-4        op=OFFER_SLOTS  numReducers=8   
workerNum=5     extraSlots=0
    25/05/19 20:05:43,058 INFO [celeborn-dispatcher-21] ShuffleAuditLogger: 
shuffleKey=local-1747710340484-5        op=OFFER_SLOTS  numReducers=200 
workerNum=5     extraSlots=0
    25/05/19 20:05:43,542 INFO [celeborn-dispatcher-31] ShuffleAuditLogger: 
shuffleKey=local-1747710340484-6        op=OFFER_SLOTS  numReducers=4   
workerNum=5     extraSlots=1
    25/05/19 20:05:44,436 INFO [celeborn-dispatcher-29] ShuffleAuditLogger: 
shuffleKeys=local-1747710326897-0,local-1747710326897-1,local-1747710326897-2,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5
 op=EXPIRE       worker=127.0.0.1:59932:59934:59948:59941
    25/05/19 20:05:44,436 INFO [celeborn-dispatcher-27] ShuffleAuditLogger: 
shuffleKeys=local-1747710326897-0,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6
       op=EXPIRE       worker=127.0.0.1:59930:59938:59944:59940
    25/05/19 20:05:44,436 INFO [celeborn-dispatcher-32] ShuffleAuditLogger: 
shuffleKeys=local-1747710326897-1,local-1747710326897-2,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6
 op=EXPIRE       worker=127.0.0.1:59931:59936:59945:59939
    25/05/19 20:05:44,436 INFO [celeborn-dispatcher-33] ShuffleAuditLogger: 
shuffleKeys=local-1747710326897-0,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6
       op=EXPIRE       worker=127.0.0.1:59933:59935:59946:59943
    25/05/19 20:05:44,436 INFO [celeborn-dispatcher-28] ShuffleAuditLogger: 
shuffleKeys=local-1747710326897-0,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6
       op=EXPIRE       worker=127.0.0.1:59929:59937:59947:59942
    
    ```
    
    Closes #3265 from turboFei/shuffle_audit.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 conf/log4j2.xml.template                           | 20 ++++++++++
 .../celeborn/service/deploy/master/Master.scala    | 24 ++++++++++--
 .../deploy/master/audit/ShuffleAuditLogger.scala   | 44 ++++++++++++++++++++++
 3 files changed, 84 insertions(+), 4 deletions(-)

diff --git a/conf/log4j2.xml.template b/conf/log4j2.xml.template
index efd1511e4..22048d582 100644
--- a/conf/log4j2.xml.template
+++ b/conf/log4j2.xml.template
@@ -66,6 +66,23 @@
                 </Delete>
             </DefaultRolloverStrategy>
         </RollingRandomAccessFile>
+        <RollingRandomAccessFile name="shuffleAuditFile" 
fileName="${env:CELEBORN_LOG_DIR}/audit/shuffle-audit.log"
+                                 
filePattern="${env:CELEBORN_LOG_DIR}/audit/shuffle-audit.log.%d-%i">
+            <PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: 
%m%n%ex"/>
+            <Policies>
+                <SizeBasedTriggeringPolicy size="200 MB"/>
+            </Policies>
+            <DefaultRolloverStrategy max="7">
+                <Delete basePath="${env:CELEBORN_LOG_DIR}/audit" maxDepth="1">
+                    <IfFileName glob="shuffle-audit.log*">
+                        <IfAny>
+                            <IfAccumulatedFileSize exceeds="1 GB"/>
+                            <IfAccumulatedFileCount exceeds="10"/>
+                        </IfAny>
+                    </IfFileName>
+                </Delete>
+            </DefaultRolloverStrategy>
+        </RollingRandomAccessFile>
     </Appenders>
 
     <Loggers>
@@ -87,5 +104,8 @@
         <Logger name="org.apache.celeborn.server.common.http.RestAuditLogger" 
level="INFO" additivity="false">
             <Appender-ref ref="restAuditFile" level="INFO"/>
         </Logger>
+        <Logger 
name="org.apache.celeborn.service.deploy.master.audit.ShuffleAuditLogger" 
level="INFO" additivity="false">
+            <Appender-ref ref="shuffleAuditFile" level="INFO"/>
+        </Logger>
     </Loggers>
 </Configuration>
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index dfe4e03a1..8316f1706 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -51,6 +51,7 @@ import org.apache.celeborn.common.rpc._
 import org.apache.celeborn.common.rpc.{RpcSecurityContextBuilder, 
ServerSaslContextBuilder}
 import org.apache.celeborn.common.util.{CelebornHadoopUtils, JavaUtils, 
PbSerDeUtils, SignalUtils, ThreadUtils, Utils}
 import org.apache.celeborn.server.common.{HttpService, Service}
+import org.apache.celeborn.service.deploy.master.audit.ShuffleAuditLogger
 import 
org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager
 import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, 
HAMasterMetaManager, MetaHandler}
 import org.apache.celeborn.service.deploy.master.quota.QuotaManager
@@ -706,13 +707,13 @@ private[celeborn] class Master(
       val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
       val shuffleIds = statusSystem.registeredAppAndShuffles.get(appId)
       if (shuffleIds == null || !shuffleIds.contains(shuffleId)) {
-        logWarning(
-          s"Shuffle $shuffleKey expired on 
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
         expiredShuffleKeys.add(shuffleKey)
       }
     }
-    logDebug(
-      s"Shuffle ${expiredShuffleKeys.asScala.mkString("[", " ,", "]")} expired 
on ${targetWorker.toUniqueId}.")
+    ShuffleAuditLogger.batchAudit(
+      expiredShuffleKeys.asScala.mkString(","),
+      "EXPIRE",
+      Seq(s"worker=${targetWorker.toUniqueId}"))
 
     val workerEventInfo = statusSystem.workerEventInfos.get(targetWorker)
     if (workerEventInfo == null) {
@@ -735,6 +736,11 @@ private[celeborn] class Master(
     try {
       logInfo(s"Handle lost shuffles for ${appId} ${lostShuffles} ")
       statusSystem.handleReviseLostShuffles(appId, lostShuffles, requestId);
+      ShuffleAuditLogger.batchAudit(
+        lostShuffles.asScala.map { shuffleId =>
+          Utils.makeShuffleKey(appId, shuffleId)
+        }.mkString(","),
+        "REVIVE")
       if (context != null) {
         context.reply(ReviseLostShufflesResponse(true, ""))
       }
@@ -984,6 +990,14 @@ private[celeborn] class Master(
       logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
     }
 
+    ShuffleAuditLogger.audit(
+      shuffleKey,
+      "OFFER_SLOTS",
+      Seq(
+        s"numReducers=$numReducers",
+        s"workerNum=${slots.size()}",
+        s"extraSlots=$offerSlotsExtraSize"))
+
     if (authEnabled) {
       pushApplicationMetaToWorkers(requestSlots, slots)
     }
@@ -1035,6 +1049,7 @@ private[celeborn] class Master(
     val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
     statusSystem.handleUnRegisterShuffle(shuffleKey, requestId)
     logInfo(s"Unregister shuffle $shuffleKey")
+    ShuffleAuditLogger.audit(shuffleKey, "UNREGISTER")
     context.reply(UnregisterShuffleResponse(StatusCode.SUCCESS))
   }
 
@@ -1047,6 +1062,7 @@ private[celeborn] class Master(
       shuffleIds.map(shuffleId => Utils.makeShuffleKey(applicationId, 
shuffleId)).asJava
     statusSystem.handleBatchUnRegisterShuffles(shuffleKeys, requestId)
     logInfo(s"BatchUnregister shuffle $shuffleKeys")
+    ShuffleAuditLogger.batchAudit(shuffleKeys.asScala.mkString(","), 
"BATCH_UNREGISTER")
     context.reply(BatchUnregisterShuffleResponse(StatusCode.SUCCESS, 
shuffleIds.asJava))
   }
 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/audit/ShuffleAuditLogger.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/audit/ShuffleAuditLogger.scala
new file mode 100644
index 000000000..ba5b0261a
--- /dev/null
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/audit/ShuffleAuditLogger.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master.audit
+
+import org.apache.celeborn.common.internal.Logging
+
+object ShuffleAuditLogger extends Logging {
+  final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() {
+    override protected def initialValue: StringBuilder = new StringBuilder()
+  }
+
+  def audit(shuffleKey: String, op: String, labels: Seq[String] = Seq.empty): 
Unit = {
+    val sb = AUDIT_BUFFER.get()
+    sb.setLength(0)
+    sb.append(s"shuffleKey=$shuffleKey").append("\t")
+    sb.append(s"op=$op")
+    if (labels.nonEmpty) sb.append(labels.mkString("\t", "\t", ""))
+    logInfo(sb.toString())
+  }
+
+  def batchAudit(shuffleKeys: String, op: String, labels: Seq[String] = 
Seq.empty): Unit = {
+    val sb = AUDIT_BUFFER.get()
+    sb.setLength(0)
+    sb.append(s"shuffleKeys=$shuffleKeys").append("\t")
+    sb.append(s"op=$op")
+    if (labels.nonEmpty) sb.append(labels.mkString("\t", "\t", ""))
+    logInfo(sb.toString())
+  }
+}

Reply via email to