This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b523264ab45 [FLINK-26425][yarn] Support rolling log aggregation
b523264ab45 is described below

commit b523264ab45d37cd9584a0e8c06f1ef6bd1aaed7
Author: Ferenc Csaky <[email protected]>
AuthorDate: Mon Apr 28 09:53:09 2025 +0200

    [FLINK-26425][yarn] Support rolling log aggregation
---
 .../generated/yarn_config_configuration.html       | 12 ++++
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 34 +++++++++-
 .../yarn/configuration/YarnConfigOptions.java      | 20 ++++++
 .../flink/yarn/YarnClusterDescriptorTest.java      | 77 ++++++++++++++++++++++
 4 files changed, 141 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/yarn_config_configuration.html 
b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
index cb0b0f935ba..b62e78778f0 100644
--- a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
@@ -152,6 +152,18 @@
             <td>String</td>
             <td>The provided usrlib directory in remote. It should be 
pre-uploaded and world-readable. Flink will use it to exclude the local usrlib 
directory(i.e. usrlib/ under the parent directory of FLINK_LIB_DIR). Unlike 
yarn.provided.lib.dirs, YARN will not cache it on the nodes as it is for each 
application. An example could be 
hdfs://$namenode_address/path/of/flink/usrlib</td>
         </tr>
+        <tr>
+            <td><h5>yarn.rolled-logs.exclude-pattern</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Java regular expression to exclude certain log files from 
rolling log aggregation. Log files matching the defined exclude pattern will be 
ignored during aggregation. If a log file matches both the include and exclude 
patterns, the exclude pattern takes precedence and the file will be excluded 
from aggregation.</td>
+        </tr>
+        <tr>
+            <td><h5>yarn.rolled-logs.include-pattern</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Java regular expression to match log file names for inclusion 
in rolling log aggregation. This regex is used by YARN’s log aggregation 
mechanism to identify which log files to collect. To enable rolling aggregation 
in YARN, set the 
`yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds` property in 
`yarn-site.xml`. Ensure that Flink’s Log4J configuration uses FileAppender or a 
compatible appender that can handle file deletions during runtime. The regex 
pattern [...]
+        </tr>
         <tr>
             <td><h5>yarn.security.appmaster.delegation.token.services</h5></td>
             <td style="word-wrap: break-word;">"hadoopfs"</td>
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 51bb617bece..3b76431c272 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -87,6 +87,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -180,8 +181,6 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
     private final String yarnQueue;
 
-    private Path flinkJarPath;
-
     private final Configuration flinkConfiguration;
 
     private final String customName;
@@ -190,6 +189,12 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
     private final String applicationType;
 
+    private final String rolledLogIncludePattern;
+
+    private final String rolledLogExcludePattern;
+
+    private Path flinkJarPath;
+
     private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
     public YarnClusterDescriptor(
@@ -221,6 +226,10 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         this.customName = 
flinkConfiguration.get(YarnConfigOptions.APPLICATION_NAME);
         this.applicationType = 
flinkConfiguration.get(YarnConfigOptions.APPLICATION_TYPE);
         this.nodeLabel = flinkConfiguration.get(YarnConfigOptions.NODE_LABEL);
+        this.rolledLogIncludePattern =
+                
flinkConfiguration.get(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN);
+        this.rolledLogExcludePattern =
+                
flinkConfiguration.get(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN);
     }
 
     /** Adapt flink env setting. */
@@ -1237,6 +1246,8 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
         setApplicationTags(appContext);
 
+        setRolledLogConfigs(appContext);
+
         // add a hook to clean up in case deployment fails
         Thread deploymentFailureHook =
                 new DeploymentFailureHook(yarnApplication, 
fileUploader.getApplicationDir());
@@ -1533,6 +1544,25 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         }
     }
 
+    @VisibleForTesting
+    void setRolledLogConfigs(final ApplicationSubmissionContext appContext) {
+        LogAggregationContext ctx = null;
+
+        if (!StringUtils.isNullOrWhitespaceOnly(rolledLogIncludePattern)) {
+            ctx = Records.newRecord(LogAggregationContext.class);
+            ctx.setRolledLogsIncludePattern(rolledLogIncludePattern);
+        }
+
+        if (!StringUtils.isNullOrWhitespaceOnly(rolledLogExcludePattern)) {
+            ctx = ctx == null ? Records.newRecord(LogAggregationContext.class) 
: ctx;
+            ctx.setRolledLogsExcludePattern(rolledLogExcludePattern);
+        }
+
+        if (ctx != null) {
+            appContext.setLogAggregationContext(ctx);
+        }
+    }
+
     /**
      * Singleton object which uses reflection to determine whether the {@link
      * ApplicationSubmissionContext} supports various methods which, depending 
on the Hadoop
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index f5d36dd93d4..f9dabc95090 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -401,6 +401,26 @@ public class YarnConfigOptions {
                                     + " Unlike yarn.provided.lib.dirs, YARN 
will not cache it on the nodes as it is for each application. An example could 
be "
                                     + 
"hdfs://$namenode_address/path/of/flink/usrlib");
 
+    public static final ConfigOption<String> ROLLED_LOGS_INCLUDE_PATTERN =
+            key("yarn.rolled-logs.include-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Java regular expression to match log file names 
for inclusion in rolling log aggregation."
+                                    + " This regex is used by YARN’s log 
aggregation mechanism to identify which log files to collect."
+                                    + " To enable rolling aggregation in YARN, 
set the `yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds` 
property in `yarn-site.xml`."
+                                    + " Ensure that Flink’s Log4J 
configuration uses FileAppender or a compatible appender that can handle file 
deletions during runtime."
+                                    + " The regex pattern (e.g., 
`jobmanager*`) must align with the log file names defined in the Log4J 
configuration (e.g., `jobmanager.log`) to ensure all relevant files will be 
aggregated.");
+
+    public static final ConfigOption<String> ROLLED_LOGS_EXCLUDE_PATTERN =
+            key("yarn.rolled-logs.exclude-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Java regular expression to exclude certain log 
files from rolling log aggregation."
+                                    + " Log files matching the defined exclude 
pattern will be ignored during aggregation."
+                                    + " If a log file matches both the include 
and exclude patterns, the exclude pattern takes precedence and the file will be 
excluded from aggregation.");
+
     @SuppressWarnings("unused")
     public static final ConfigOption<String> HADOOP_CONFIG_KEY =
             key("flink.hadoop.<key>")
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 7ff1b814d54..59b2df4c673 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.Records;
@@ -1022,4 +1024,79 @@ class YarnClusterDescriptorTest {
             fail("Should not throw exception when setting tokens for AM 
container.");
         }
     }
+
+    @Test
+    void testSetRolledLogConfigs() {
+        final String includePattern = "(jobmanager|taskmanager).*";
+        final String excludePattern = "(jobmanager|taskmanager)\\.(out|err)";
+
+        // Both include and exclude patterns are given.
+        Configuration flinkConfig = new Configuration();
+        flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN, 
includePattern);
+        flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN, 
excludePattern);
+
+        try (final YarnClusterDescriptor yarnClusterDescriptor =
+                createYarnClusterDescriptor(flinkConfig)) {
+
+            final TestApplicationSubmissionContext testAppCtx =
+                    new TestApplicationSubmissionContext();
+            yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
+            
assertThat(testAppCtx.logAggregationContext.getRolledLogsIncludePattern())
+                    .isEqualTo(includePattern);
+            
assertThat(testAppCtx.logAggregationContext.getRolledLogsExcludePattern())
+                    .isEqualTo(excludePattern);
+        }
+
+        // Only include pattern is given.
+        flinkConfig = new Configuration();
+        flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN, 
includePattern);
+        try (final YarnClusterDescriptor yarnClusterDescriptor =
+                createYarnClusterDescriptor(flinkConfig)) {
+
+            final TestApplicationSubmissionContext testAppCtx =
+                    new TestApplicationSubmissionContext();
+            yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
+            
assertThat(testAppCtx.logAggregationContext.getRolledLogsIncludePattern())
+                    .isEqualTo(includePattern);
+            
assertThat(testAppCtx.logAggregationContext.getRolledLogsExcludePattern()).isNull();
+        }
+
+        // Only exclude pattern is given.
+        flinkConfig = new Configuration();
+        flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN, 
excludePattern);
+        try (final YarnClusterDescriptor yarnClusterDescriptor =
+                createYarnClusterDescriptor(flinkConfig)) {
+
+            final TestApplicationSubmissionContext testAppCtx =
+                    new TestApplicationSubmissionContext();
+            yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
+            
assertThat(testAppCtx.logAggregationContext.getRolledLogsIncludePattern()).isNull();
+            
assertThat(testAppCtx.logAggregationContext.getRolledLogsExcludePattern())
+                    .isEqualTo(excludePattern);
+        }
+
+        // Blank values are ignored.
+        flinkConfig = new Configuration();
+        flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN, "   ");
+        flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN, "   ");
+        try (final YarnClusterDescriptor yarnClusterDescriptor =
+                createYarnClusterDescriptor(flinkConfig)) {
+
+            final TestApplicationSubmissionContext testAppCtx =
+                    new TestApplicationSubmissionContext();
+            yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
+            assertThat(testAppCtx.logAggregationContext).isNull();
+        }
+    }
+
+    private static class TestApplicationSubmissionContext
+            extends ApplicationSubmissionContextPBImpl {
+
+        private LogAggregationContext logAggregationContext = null;
+
+        @Override
+        public void setLogAggregationContext(LogAggregationContext 
logAggregationContext) {
+            this.logAggregationContext = logAggregationContext;
+        }
+    }
 }

Reply via email to