[
https://issues.apache.org/jira/browse/FLINK-39802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18084610#comment-18084610
]
Nagy Attila Bálint commented on FLINK-39802:
--------------------------------------------
Proposal for the test-isolation-only fix (Part 1):
https://github.com/nattilabalint/flink/tree/fixProfilingServiceTest
Proposal for a safer ProfilingService singleton usage (Part 2):
https://github.com/nattilabalint/flink/tree/saferProfilingServiceSingleton
> Fix flaky ProfilingServiceTest.testRollingDeletion by isolating the test and
> applying Interface Segregation to ProfilingService
> -------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39802
> URL: https://issues.apache.org/jira/browse/FLINK-39802
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination, Runtime / REST
> Affects Versions: 2.2.1
> Reporter: Nagy Attila Bálint
> Priority: Major
>
> h2. Summary & Problem Statement
> The {{ProfilingServiceTest.testRollingDeletion}} [test
> case|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L111]
> fails intermittently (or consistently when run in a specific order) due to
> shared static state leakage across a single JVM.
> Currently, {{ProfilingService}} is
> [implemented|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L58]
> as a classic Singleton. The {{ProfilingService#getInstance(config)}}
> [method|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L81]
> only initializes the instance once.
> If it has already been invoked by a previous test class, it will return the
> existing static singleton instance and *completely ignore* the new
> {{Configuration}} parameter passed to it.
> Furthermore, {{ProfilingService}} implements the {{java.io.Closeable}}
> interface (which extends {{AutoCloseable}}), exposing a public {{close()}}
> method. This method is implemented as
> [follows|http://https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L163]:
> {code:java}
> @Override
> public void close() throws IOException {
> try {
> if (profilingFuture != null && !profilingFuture.isDone()) {
> profilingFuture.cancel();
> }
> if (!scheduledExecutor.isShutdown()) {
> scheduledExecutor.shutdownNow();
> }
> } catch (Exception e) {
> LOG.error("Exception thrown during stopping profiling service. ", e);
> } finally {
> instance = null; // <--- Anti-pattern
> }
> }
> {code}
> Setting a shared static member to {{null}} inside an instance lifecycle
> method is a *severe anti-pattern*.
> It allows any single consumer to unexpectedly
> [destroy|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L169]
> the {{ScheduledExecutorService}} underneath all other concurrent users of
> the singleton.
> Moreover, calling {{getInstance()}} after a {{close()}} call will silently
> spawn a brand new instance, potentially with completely different
> configuration values.
> Fortunately, {{ProfilingService::close()}} is *never called in production
> code* (neither explicitly nor implicitly via *try-with-resources*).
> In the test codebase, it is only ever utilized
> [within|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L46]
> {{ProfilingServiceTest}}.
> h2. Root Cause Analysis of the Test Failure
> The failure of {{ProfilingServiceTest#testRollingDeletion}} highlights the
> danger of this shared JVM state.
> When multiple tests run sequentially or concurrently within the same fork,
> preceding test classes initialize the {{ProfilingService}} singleton with
> default values, which defaults
> [the|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L73]
> {{PROFILING_RESULT_DIR}} to {{/tmp}}.
> When {{ProfilingServiceTest#setUp}}
> [runs|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L57],
> its configuration initialization (intended to guide the profiler to a clean,
> isolated {{@TempDir}}) has *zero effect*, because the old singleton instance
> pointing to {{/tmp}} is returned.
> h3. Consequences on the File System:
> Instead of executing inside the isolated {{/tmp/junitXXXXX}} temporary
> directories, the test writes its {{.html}} output files prefixed
> [with|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L49]
> {{"TestJobManager"}} directly into the global {{/tmp}} directory.
> When {{ProfilingServiceTest#verifyRollingDeletionWorks()}}
> [executes|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L176]
> {{new File(profilingService.getProfilingResultDir()).listFiles()}}, it reads
> the root {{/tmp}} directory and mistakenly picks up stray {{.html}} files
> left behind by entirely different or previous test runs.
> This leads to a file count mismatch:
> {noformat}
> ProfilingServiceTest.testRollingDeletion:XXX->verifyRollingDeletionWorks:XXX
> expected: <3> but was: <XX>
> {noformat}
> h2. How to Reproduce
> The issue can be reliably reproduced even on a single thread (sequential
> execution) by ensuring a test that initializes the default profiling
> singleton runs right before the profiling service test:
> {noformat}
> ./mvnw clean test -pl flink-runtime \
> -Dfast -Djdk17 -Pjava17-target -fae \
> -Dflink.forkCountUnitTest=1 \
> -Dtest=ProfilingServiceTest,ShuffleMasterTest
> {noformat}
> *Important:* {{ShuffleMasterTest}} must execute before
> {{ProfilingServiceTest}} to pollute the static instance and trigger the
> failure.
> h3. List of Shared-State Polluting Tests (not complete list)
> The following is a verified list of test classes that call
> {{ProfilingService::getInstance}} and can pollute the environment with the
> default /tmp directory:
> {noformat}
> org.apache.flink.runtime.shuffle.ShuffleMasterTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventHandlingTest
> org.apache.flink.runtime.taskexecutor.TaskManagerRunnerStartupTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest
> org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorSlotLifetimeTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorExecutionDeploymentReconciliationTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorRecoveryTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorTest
> org.apache.flink.runtime.io.network.partition.PartialConsumePipelinedResultTest
> org.apache.flink.runtime.leaderelection.LeaderChangeClusterComponentsTest
> org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest
> {noformat}
> h2. Proposed Changes
> h3. Part 1: Test-Level Isolation & Reset
> To fix the immediate test failure, we must isolate {{ProfilingServiceTest}}
> and force a clean state reset before each test execution:
> 1.) Add the JUnit 5 {{@Isolated}} annotation to the {{ProfilingServiceTest}}
> class to guarantee it runs in its own thread quarantine.
> 2.) In the {{@BeforeEach setUp()}} method, explicitly call
> {{ProfilingService.getInstance(configs).close();}} to wipe out any polluted
> singleton instance inherited from preceding tests, allowing the custom
> {{@TempDir}} config to finally take effect.
> *Note:* Without the {{@Isolated}} annotation, calling {{close()}} inside
> {{setUp()}} would have disastrous, unpredictable consequences on other
> concurrently running tests that depend on the shared singleton.
> The exact same risk applies to implicit {{close()}} calls if a developer
> mistakenly uses the service inside a {{try-with-resources}} block.
> h3. Part 2: Long-Term Architectural Fix via Interface Segregation (*proposal*)
> To prevent developers from accidentally invoking {{close()}} on the singleton
> instance in production or through unmanaged {{try-with-resources}} blocks, we
> should implement *Interface Segregation*.
> 1.) Convert {{ProfilingService}} into an interface containing all original
> public functional methods, excluding {{close()}}:
> {code:java}
> public interface ProfilingService {
> CompletableFuture<ProfilingInfo> requestProfiling(
> String resourceID, long duration, ProfilingInfo.ProfilingMode
> mode);
> CompletableFuture<Collection<ProfilingInfo>> getProfilingList(String
> resourceID);
> String getProfilingResultDir();
> }
> {code}
> 2.) Move the actual implementation details to a new concrete class named e.g.
> {{ProfilingServiceSingleton}} which implements both our interface and
> {{Closeable}}:
> {code:java}
> public class ProfilingServiceSingleton implements ProfilingService, Closeable
> {
> // Original implementation stays here...
> public static ProfilingService getInstance(Configuration configs) {
> if (instance == null) {
> synchronized (ProfilingServiceSingleton.class) {
> if (instance == null) {
> instance = new ProfilingServiceSingleton(configs);
> }
> }
> }
> return instance;
> }
> }
> {code}
> By returning the {{ProfilingService}} interface type from {{getInstance()}},
> the public {{close()}} method is cleanly hidden from production consumers.
> This requires changing the class name at exactly *3 production call sites*
> where {{.getInstance(...)}} is invoked:
> {noformat}
> org.apache.flink.runtime.rest.handler.cluster.JobManagerProfilingHandler
> org.apache.flink.runtime.rest.handler.cluster.JobManagerProfilingListHandler
> org.apache.flink.runtime.taskexecutor.TaskExecutor
> {noformat}
> They will seamlessly compile using the interface:
> {code:java}
> // Keeps production code safe from accidental lifecycle termination
> // this.profilingService = ProfilingService.getInstance(configuration);
> <-- old class name
> this.profilingService =
> ProfilingServiceSingleton.getInstance(configuration); <-- new class name
> {code}
> Since {{ProfilingService::close}} is currently not called anywhere in
> production, this change is perfectly backwards-compatible, eliminates a
> dangerous anti-pattern, and completely eradicates the flakiness of the
> architectural unit tests.
> h2. Testing & Verification
> To verify the proposed fix and explore the exact conditions under which this
> shared-state flakiness manifests, I executed a series of stress tests using a
> *single Maven fork* ({{-Dflink.forkCountUnitTest=1}}).
> This forces all targeted test classes into a single, shared JVM environment
> where the static singleton instance issue can be actively reproduced and
> monitored.
> h3. Preparation Step (POM Modification)
> Before running the commands, I commented out the hardcoded
> {{<junit.jupiter.execution.parallel.*>}} system property variables under the
> maven-surefire-plugin configuration block in the root {{pom.xml}}.
> This was necessary to cleanly pass customized parallel execution flags
> externally using the {{-Dsurefire.module.config}} property.
> h3. Test Scenario 1: Single JVM, Concurrent-Class Sequential Execution (1
> Thread)
> In this scenario, I configured JUnit 5 to enable parallel class execution but
> throttled the underlying fixed thread pool down to exactly 1 thread
> ({{max-pool-size=1}}, {{parallelism=1}}).
> This forces the classes to run sequentially on a single thread while using
> the concurrent class scheduling mechanism.
> *Execution Command:*
> {noformat}
> ./mvnw test -pl flink-runtime \
> -Dfast -Djdk17 -Pjava17-target -fae \
> -Dgit.commit.id.skip=true -Dmaven.gitcommitid.skip=true \
> -Dflink.forkCountUnitTest=1 \
> -Dsurefire.module.config="--add-opens=java.base/java.util=ALL-UNNAMED
> --add-opens=java.base/java.lang=ALL-UNNAMED
> --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
> --add-opens=java.base/java.io=ALL-UNNAMED
> -Djunit.jupiter.execution.parallel.enabled=true
> -Djunit.jupiter.execution.parallel.mode.classes.default=concurrent
> -Djunit.jupiter.execution.parallel.mode.default=same_thread
> -Djunit.jupiter.execution.parallel.config.strategy=fixed
> -Djunit.jupiter.execution.parallel.config.fixed.max-pool-size=1
> -Djunit.jupiter.execution.parallel.config.fixed.saturate=true
> -Djunit.jupiter.execution.parallel.config.fixed.parallelism=1" \
>
> -Dtest=ProfilingServiceTest,ShuffleMasterTest,TaskExecutorOperatorEventHandlingTest,TaskManagerRunnerStartupTest,TaskExecutorPartitionLifecycleTest,TaskManagerRunnerTest,TaskExecutorSlotLifetimeTest,TaskExecutorExecutionDeploymentReconciliationTest,TaskExecutorSubmissionTest,TaskExecutorRecoveryTest,TaskExecutorTest,PartialConsumePipelinedResultTest,LeaderChangeClusterComponentsTest,SlotCountExceedingParallelismTest
> {noformat}
> *Observation (Before Fix):*
> As expected, the tests executing ahead of {{ProfilingServiceTest}} pollute
> the JVM-wide static singleton.
> When {{ProfilingServiceTest}} eventually executes, its setup configuration is
> ignored, and the test fails on the rolling deletion assertion as the path
> defaults back to {{/tmp}}.
> h3. Test Scenario 2: Single JVM, High Parallelism Stress Test (200 Threads)
> Next, I ramped up the configuration to test heavy concurrent execution within
> the same JVM by increasing both {{fixed.max-pool-size}} and
> {{fixed.parallelism}} to *200*.
> *Observation (Before Fix - Severe Flakiness Masking):*
> Under heavy multi-threaded contention, I observed a highly dangerous side
> effect of the {{close()}} anti-pattern.
> Out of the *5* test cases defined inside {{ProfilingServiceTest}}, *only 1
> single test case actually executed*, while the remaining 4 were silently
> dropped or swallowed. To make matters worse, Maven marked the entire run as a
> success (Green Build):
> {noformat}
> [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 27.55
> s -- in org.apache.flink.runtime.util.profiler.ProfilingServiceTest
> {noformat}
> This proves that without strict isolation and proper interface boundaries,
> the current {{ProfilingService::close}} logic actively masks test execution
> results during concurrent test execution, reporting successful builds despite
> dropped test coverage.
> h2. Verification Results After Patch
> After applying the proposed changes (*Interface Segregation* via
> {{ProfilingServiceSingleton}} + {{@Isolated}} on {{ProfilingServiceTest}} +
> *Explicit* {{close()}} state reset in {{setUp}}):
> *Scenario 1 (Single Thread / Shared JVM):* Pass. Every preceding test runs
> successfully, and {{ProfilingServiceTest}} cleanly resets the state to work
> inside its assigned {{@TempDir}}.
> *Scenario 2 (200 Threads / High Contention):* Pass. All 5 test cases inside
> ProfilingServiceTest are safely quarantined, execute successfully in their
> entirety, and no longer swallow test executions or pollute the surrounding
> environment.
> h2. Previous JIRAs around this issue:
> FLINK-38442: ProfilingServiceTest.testRollingDeletion is unstable (Resolved)
> FLINK-35571: ProfilingServiceTest.testRollingDeletion intermittently fails
> due to improper test isolation (Open)
> FLINK-34013: ProfilingServiceTest.testRollingDeletion is unstable on AZP
> (Resolved)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)