This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b523264ab45 [FLINK-26425][yarn] Support rolling log aggregation
b523264ab45 is described below
commit b523264ab45d37cd9584a0e8c06f1ef6bd1aaed7
Author: Ferenc Csaky <[email protected]>
AuthorDate: Mon Apr 28 09:53:09 2025 +0200
[FLINK-26425][yarn] Support rolling log aggregation
---
.../generated/yarn_config_configuration.html | 12 ++++
.../apache/flink/yarn/YarnClusterDescriptor.java | 34 +++++++++-
.../yarn/configuration/YarnConfigOptions.java | 20 ++++++
.../flink/yarn/YarnClusterDescriptorTest.java | 77 ++++++++++++++++++++++
4 files changed, 141 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
index cb0b0f935ba..b62e78778f0 100644
--- a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
@@ -152,6 +152,18 @@
<td>String</td>
<td>The provided usrlib directory in remote. It should be
pre-uploaded and world-readable. Flink will use it to exclude the local usrlib
directory(i.e. usrlib/ under the parent directory of FLINK_LIB_DIR). Unlike
yarn.provided.lib.dirs, YARN will not cache it on the nodes as it is for each
application. An example could be
hdfs://$namenode_address/path/of/flink/usrlib</td>
</tr>
+ <tr>
+ <td><h5>yarn.rolled-logs.exclude-pattern</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Java regular expression to exclude certain log files from
rolling log aggregation. Log files matching the defined exclude pattern will be
ignored during aggregation. If a log file matches both the include and exclude
patterns, the exclude pattern takes precedence and the file will be excluded
from aggregation.</td>
+ </tr>
+ <tr>
+ <td><h5>yarn.rolled-logs.include-pattern</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Java regular expression to match log file names for inclusion
in rolling log aggregation. This regex is used by YARN’s log aggregation
mechanism to identify which log files to collect. To enable rolling aggregation
in YARN, set the
`yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds` property in
`yarn-site.xml`. Ensure that Flink’s Log4J configuration uses FileAppender or a
compatible appender that can handle file deletions during runtime. The regex
pattern [...]
+ </tr>
<tr>
<td><h5>yarn.security.appmaster.delegation.token.services</h5></td>
<td style="word-wrap: break-word;">"hadoopfs"</td>
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 51bb617bece..3b76431c272 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -87,6 +87,7 @@ import
org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -180,8 +181,6 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
private final String yarnQueue;
- private Path flinkJarPath;
-
private final Configuration flinkConfiguration;
private final String customName;
@@ -190,6 +189,12 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
private final String applicationType;
+ private final String rolledLogIncludePattern;
+
+ private final String rolledLogExcludePattern;
+
+ private Path flinkJarPath;
+
private YarnConfigOptions.UserJarInclusion userJarInclusion;
public YarnClusterDescriptor(
@@ -221,6 +226,10 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
this.customName =
flinkConfiguration.get(YarnConfigOptions.APPLICATION_NAME);
this.applicationType =
flinkConfiguration.get(YarnConfigOptions.APPLICATION_TYPE);
this.nodeLabel = flinkConfiguration.get(YarnConfigOptions.NODE_LABEL);
+ this.rolledLogIncludePattern =
+
flinkConfiguration.get(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN);
+ this.rolledLogExcludePattern =
+
flinkConfiguration.get(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN);
}
/** Adapt flink env setting. */
@@ -1237,6 +1246,8 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
setApplicationTags(appContext);
+ setRolledLogConfigs(appContext);
+
// add a hook to clean up in case deployment fails
Thread deploymentFailureHook =
new DeploymentFailureHook(yarnApplication,
fileUploader.getApplicationDir());
@@ -1533,6 +1544,25 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
}
}
+ @VisibleForTesting
+ void setRolledLogConfigs(final ApplicationSubmissionContext appContext) {
+ LogAggregationContext ctx = null;
+
+ if (!StringUtils.isNullOrWhitespaceOnly(rolledLogIncludePattern)) {
+ ctx = Records.newRecord(LogAggregationContext.class);
+ ctx.setRolledLogsIncludePattern(rolledLogIncludePattern);
+ }
+
+ if (!StringUtils.isNullOrWhitespaceOnly(rolledLogExcludePattern)) {
+ ctx = ctx == null ? Records.newRecord(LogAggregationContext.class)
: ctx;
+ ctx.setRolledLogsExcludePattern(rolledLogExcludePattern);
+ }
+
+ if (ctx != null) {
+ appContext.setLogAggregationContext(ctx);
+ }
+ }
+
/**
* Singleton object which uses reflection to determine whether the {@link
* ApplicationSubmissionContext} supports various methods which, depending
on the Hadoop
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index f5d36dd93d4..f9dabc95090 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -401,6 +401,26 @@ public class YarnConfigOptions {
+ " Unlike yarn.provided.lib.dirs, YARN
will not cache it on the nodes as it is for each application. An example could
be "
+
"hdfs://$namenode_address/path/of/flink/usrlib");
+ public static final ConfigOption<String> ROLLED_LOGS_INCLUDE_PATTERN =
+ key("yarn.rolled-logs.include-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Java regular expression to match log file names
for inclusion in rolling log aggregation."
+ + " This regex is used by YARN’s log
aggregation mechanism to identify which log files to collect."
+ + " To enable rolling aggregation in YARN,
set the `yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds`
property in `yarn-site.xml`."
+ + " Ensure that Flink’s Log4J
configuration uses FileAppender or a compatible appender that can handle file
deletions during runtime."
+ + " The regex pattern (e.g.,
`jobmanager*`) must align with the log file names defined in the Log4J
configuration (e.g., `jobmanager.log`) to ensure all relevant files will be
aggregated.");
+
+ public static final ConfigOption<String> ROLLED_LOGS_EXCLUDE_PATTERN =
+ key("yarn.rolled-logs.exclude-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Java regular expression to exclude certain log
files from rolling log aggregation."
+ + " Log files matching the defined exclude
pattern will be ignored during aggregation."
+ + " If a log file matches both the include
and exclude patterns, the exclude pattern takes precedence and the file will be
excluded from aggregation.");
+
@SuppressWarnings("unused")
public static final ConfigOption<String> HADOOP_CONFIG_KEY =
key("flink.hadoop.<key>")
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 7ff1b814d54..59b2df4c673 100644
---
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
@@ -1022,4 +1024,79 @@ class YarnClusterDescriptorTest {
fail("Should not throw exception when setting tokens for AM
container.");
}
}
+
+ @Test
+ void testSetRolledLogConfigs() {
+ final String includePattern = "(jobmanager|taskmanager).*";
+ final String excludePattern = "(jobmanager|taskmanager)\\.(out|err)";
+
+ // Both include and exclude patterns are given.
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN,
includePattern);
+ flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN,
excludePattern);
+
+ try (final YarnClusterDescriptor yarnClusterDescriptor =
+ createYarnClusterDescriptor(flinkConfig)) {
+
+ final TestApplicationSubmissionContext testAppCtx =
+ new TestApplicationSubmissionContext();
+ yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
+
assertThat(testAppCtx.logAggregationContext.getRolledLogsIncludePattern())
+ .isEqualTo(includePattern);
+
assertThat(testAppCtx.logAggregationContext.getRolledLogsExcludePattern())
+ .isEqualTo(excludePattern);
+ }
+
+ // Only include pattern is given.
+ flinkConfig = new Configuration();
+ flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN,
includePattern);
+ try (final YarnClusterDescriptor yarnClusterDescriptor =
+ createYarnClusterDescriptor(flinkConfig)) {
+
+ final TestApplicationSubmissionContext testAppCtx =
+ new TestApplicationSubmissionContext();
+ yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
+
assertThat(testAppCtx.logAggregationContext.getRolledLogsIncludePattern())
+ .isEqualTo(includePattern);
+
assertThat(testAppCtx.logAggregationContext.getRolledLogsExcludePattern()).isNull();
+ }
+
+ // Only exclude pattern is given.
+ flinkConfig = new Configuration();
+ flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN,
excludePattern);
+ try (final YarnClusterDescriptor yarnClusterDescriptor =
+ createYarnClusterDescriptor(flinkConfig)) {
+
+ final TestApplicationSubmissionContext testAppCtx =
+ new TestApplicationSubmissionContext();
+ yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
+
assertThat(testAppCtx.logAggregationContext.getRolledLogsIncludePattern()).isNull();
+
assertThat(testAppCtx.logAggregationContext.getRolledLogsExcludePattern())
+ .isEqualTo(excludePattern);
+ }
+
+ // Blank values are ignored.
+ flinkConfig = new Configuration();
+ flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN, " ");
+ flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN, " ");
+ try (final YarnClusterDescriptor yarnClusterDescriptor =
+ createYarnClusterDescriptor(flinkConfig)) {
+
+ final TestApplicationSubmissionContext testAppCtx =
+ new TestApplicationSubmissionContext();
+ yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
+ assertThat(testAppCtx.logAggregationContext).isNull();
+ }
+ }
+
+ private static class TestApplicationSubmissionContext
+ extends ApplicationSubmissionContextPBImpl {
+
+ private LogAggregationContext logAggregationContext = null;
+
+ @Override
+ public void setLogAggregationContext(LogAggregationContext
logAggregationContext) {
+ this.logAggregationContext = logAggregationContext;
+ }
+ }
}