[ 
https://issues.apache.org/jira/browse/FLINK-39802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18084610#comment-18084610
 ] 

Nagy Attila Bálint commented on FLINK-39802:
--------------------------------------------

Proposal for the test-isolation-only fix (Part 1): 
https://github.com/nattilabalint/flink/tree/fixProfilingServiceTest

Proposal for a safer ProfilingService singleton usage (Part 2): 
https://github.com/nattilabalint/flink/tree/saferProfilingServiceSingleton



> Fix flaky ProfilingServiceTest.testRollingDeletion by isolating the test and 
> applying Interface Segregation to ProfilingService
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39802
>                 URL: https://issues.apache.org/jira/browse/FLINK-39802
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination, Runtime / REST
>    Affects Versions: 2.2.1
>            Reporter: Nagy Attila Bálint
>            Priority: Major
>
> h2. Summary & Problem Statement
> The {{ProfilingServiceTest.testRollingDeletion}} [test 
> case|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L111]
>  fails intermittently (or consistently when run in a specific order) due to 
> shared static state leakage across a single JVM.
> Currently, {{ProfilingService}} is 
> [implemented|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L58]
>  as a classic Singleton. The {{ProfilingService#getInstance(config)}} 
> [method|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L81]
>  only initializes the instance once.
> If it has already been invoked by a previous test class, it will return the 
> existing static singleton instance and *completely ignore* the new 
> {{Configuration}} parameter passed to it.
> Furthermore, {{ProfilingService}} implements the {{java.io.Closeable}} 
> interface (which extends {{AutoCloseable}}), exposing a public {{close()}} 
> method. This method is implemented as 
> [follows|http://https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L163]:
> {code:java}
> @Override
> public void close() throws IOException {
>     try {
>         if (profilingFuture != null && !profilingFuture.isDone()) {
>             profilingFuture.cancel();
>         }
>         if (!scheduledExecutor.isShutdown()) {
>             scheduledExecutor.shutdownNow();
>         }
>     } catch (Exception e) {
>         LOG.error("Exception thrown during stopping profiling service. ", e);
>     } finally {
>         instance = null; // <--- Anti-pattern
>     }
> }
> {code}
> Setting a shared static member to {{null}} inside an instance lifecycle 
> method is a *severe anti-pattern*.
> It allows any single consumer to unexpectedly 
> [destroy|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L169]
>  the {{ScheduledExecutorService}} underneath all other concurrent users of 
> the singleton.
> Moreover, calling {{getInstance()}} after a {{close()}} call will silently 
> spawn a brand new instance, potentially with completely different 
> configuration values.  
> Fortunately, {{ProfilingService::close()}} is *never called in production 
> code* (neither explicitly nor implicitly via *try-with-resources*).
> In the test codebase, it is only ever utilized 
> [within|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L46]
>  {{ProfilingServiceTest}}.
> h2. Root Cause Analysis of the Test Failure
> The failure of {{ProfilingServiceTest#testRollingDeletion}} highlights the 
> danger of this shared JVM state.
> When multiple tests run sequentially or concurrently within the same fork, 
> preceding test classes initialize the {{ProfilingService}} singleton with 
> default values, which defaults 
> [the|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L73]
>  {{PROFILING_RESULT_DIR}} to {{/tmp}}.
> When {{ProfilingServiceTest#setUp}} 
> [runs|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L57],
>  its configuration initialization (intended to guide the profiler to a clean, 
> isolated {{@TempDir}}) has *zero effect*, because the old singleton instance 
> pointing to {{/tmp}} is returned.
> h3. Consequences on the File System:
> Instead of executing inside the isolated {{/tmp/junitXXXXX}} temporary 
> directories, the test writes its {{.html}} output files prefixed 
> [with|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L49]
>  {{"TestJobManager"}} directly into the global {{/tmp}} directory.
> When {{ProfilingServiceTest#verifyRollingDeletionWorks()}} 
> [executes|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L176]
>  {{new File(profilingService.getProfilingResultDir()).listFiles()}}, it reads 
> the root {{/tmp}} directory and mistakenly picks up stray {{.html}} files 
> left behind by entirely different or previous test runs.
> This leads to a file count mismatch:
> {noformat}
> ProfilingServiceTest.testRollingDeletion:XXX->verifyRollingDeletionWorks:XXX 
> expected: <3> but was: <XX>
> {noformat}
> h2. How to Reproduce
> The issue can be reliably reproduced even on a single thread (sequential 
> execution) by ensuring a test that initializes the default profiling 
> singleton runs right before the profiling service test:
> {noformat}
> ./mvnw clean test -pl flink-runtime \
>   -Dfast -Djdk17 -Pjava17-target -fae \
>   -Dflink.forkCountUnitTest=1 \
>   -Dtest=ProfilingServiceTest,ShuffleMasterTest
> {noformat}
> *Important:* {{ShuffleMasterTest}} must execute before 
> {{ProfilingServiceTest}} to pollute the static instance and trigger the 
> failure.
> h3. List of Shared-State Polluting Tests (not complete list)
> The following is a verified list of test classes that call 
> {{ProfilingService::getInstance}} and can pollute the environment with the 
> default /tmp directory:
> {noformat}
> org.apache.flink.runtime.shuffle.ShuffleMasterTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventHandlingTest
> org.apache.flink.runtime.taskexecutor.TaskManagerRunnerStartupTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest
> org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorSlotLifetimeTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorExecutionDeploymentReconciliationTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorRecoveryTest
> org.apache.flink.runtime.taskexecutor.TaskExecutorTest
> org.apache.flink.runtime.io.network.partition.PartialConsumePipelinedResultTest
> org.apache.flink.runtime.leaderelection.LeaderChangeClusterComponentsTest
> org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest
> {noformat}
> h2. Proposed Changes
> h3. Part 1: Test-Level Isolation & Reset
> To fix the immediate test failure, we must isolate {{ProfilingServiceTest}} 
> and force a clean state reset before each test execution:
> 1.) Add the JUnit 5 {{@Isolated}} annotation to the {{ProfilingServiceTest}} 
> class to guarantee it runs in its own thread quarantine.
> 2.) In the {{@BeforeEach setUp()}} method, explicitly call 
> {{ProfilingService.getInstance(configs).close();}} to wipe out any polluted 
> singleton instance inherited from preceding tests, allowing the custom 
> {{@TempDir}} config to finally take effect.
> *Note:* Without the {{@Isolated}} annotation, calling {{close()}} inside 
> {{setUp()}} would have disastrous, unpredictable consequences on other 
> concurrently running tests that depend on the shared singleton.
> The exact same risk applies to implicit {{close()}} calls if a developer 
> mistakenly uses the service inside a {{try-with-resources}} block.
> h3. Part 2: Long-Term Architectural Fix via Interface Segregation (*proposal*)
> To prevent developers from accidentally invoking {{close()}} on the singleton 
> instance in production or through unmanaged {{try-with-resources}} blocks, we 
> should implement *Interface Segregation*.
> 1.) Convert {{ProfilingService}} into an interface containing all original 
> public functional methods, excluding {{close()}}:
> {code:java}
> public interface ProfilingService {
>     CompletableFuture<ProfilingInfo> requestProfiling(
>             String resourceID, long duration, ProfilingInfo.ProfilingMode 
> mode);
>     CompletableFuture<Collection<ProfilingInfo>> getProfilingList(String 
> resourceID);
>     String getProfilingResultDir();
> }
> {code}
> 2.) Move the actual implementation details to a new concrete class named e.g. 
> {{ProfilingServiceSingleton}} which implements both our interface and 
> {{Closeable}}:
> {code:java}
> public class ProfilingServiceSingleton implements ProfilingService, Closeable 
> {
>     // Original implementation stays here...
>     public static ProfilingService getInstance(Configuration configs) {
>         if (instance == null) {
>             synchronized (ProfilingServiceSingleton.class) {
>                 if (instance == null) {
>                     instance = new ProfilingServiceSingleton(configs);
>                 }
>             }
>         }
>         return instance;
>     }
> }
> {code}
> By returning the {{ProfilingService}} interface type from {{getInstance()}}, 
> the public {{close()}} method is cleanly hidden from production consumers.
> This requires changing the class name at exactly *3 production call sites* 
> where {{.getInstance(...)}} is invoked:
> {noformat}
> org.apache.flink.runtime.rest.handler.cluster.JobManagerProfilingHandler
> org.apache.flink.runtime.rest.handler.cluster.JobManagerProfilingListHandler
> org.apache.flink.runtime.taskexecutor.TaskExecutor
> {noformat}
> They will seamlessly compile using the interface:
> {code:java}
> // Keeps production code safe from accidental lifecycle termination
> // this.profilingService = ProfilingService.getInstance(configuration);       
>     <-- old class name
>    this.profilingService = 
> ProfilingServiceSingleton.getInstance(configuration);     <-- new class name
> {code}
> Since {{ProfilingService::close}} is currently not called anywhere in 
> production, this change is perfectly backwards-compatible, eliminates a 
> dangerous anti-pattern, and completely eradicates the flakiness of the 
> architectural unit tests.
> h2. Testing & Verification
> To verify the proposed fix and explore the exact conditions under which this 
> shared-state flakiness manifests, I executed a series of stress tests using a 
> *single Maven fork* ({{-Dflink.forkCountUnitTest=1}}).
> This forces all targeted test classes into a single, shared JVM environment 
> where the static singleton instance issue can be actively reproduced and 
> monitored.
> h3. Preparation Step (POM Modification)
> Before running the commands, I commented out the hardcoded 
> {{<junit.jupiter.execution.parallel.*>}} system property variables under the 
> maven-surefire-plugin configuration block in the root {{pom.xml}}.
> This was necessary to cleanly pass customized parallel execution flags 
> externally using the {{-Dsurefire.module.config}} property.
> h3. Test Scenario 1: Single JVM, Concurrent-Class Sequential Execution (1 
> Thread)
> In this scenario, I configured JUnit 5 to enable parallel class execution but 
> throttled the underlying fixed thread pool down to exactly 1 thread 
> ({{max-pool-size=1}}, {{parallelism=1}}).
> This forces the classes to run sequentially on a single thread while using 
> the concurrent class scheduling mechanism.
> *Execution Command:*
> {noformat}
> ./mvnw test -pl flink-runtime \
>   -Dfast -Djdk17 -Pjava17-target -fae \
>   -Dgit.commit.id.skip=true -Dmaven.gitcommitid.skip=true \
>   -Dflink.forkCountUnitTest=1 \
>   -Dsurefire.module.config="--add-opens=java.base/java.util=ALL-UNNAMED 
> --add-opens=java.base/java.lang=ALL-UNNAMED 
> --add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
> --add-opens=java.base/java.io=ALL-UNNAMED 
> -Djunit.jupiter.execution.parallel.enabled=true 
> -Djunit.jupiter.execution.parallel.mode.classes.default=concurrent 
> -Djunit.jupiter.execution.parallel.mode.default=same_thread 
> -Djunit.jupiter.execution.parallel.config.strategy=fixed 
> -Djunit.jupiter.execution.parallel.config.fixed.max-pool-size=1 
> -Djunit.jupiter.execution.parallel.config.fixed.saturate=true 
> -Djunit.jupiter.execution.parallel.config.fixed.parallelism=1" \
>   
> -Dtest=ProfilingServiceTest,ShuffleMasterTest,TaskExecutorOperatorEventHandlingTest,TaskManagerRunnerStartupTest,TaskExecutorPartitionLifecycleTest,TaskManagerRunnerTest,TaskExecutorSlotLifetimeTest,TaskExecutorExecutionDeploymentReconciliationTest,TaskExecutorSubmissionTest,TaskExecutorRecoveryTest,TaskExecutorTest,PartialConsumePipelinedResultTest,LeaderChangeClusterComponentsTest,SlotCountExceedingParallelismTest
> {noformat}
> *Observation (Before Fix):*
> As expected, the tests executing ahead of {{ProfilingServiceTest}} pollute 
> the JVM-wide static singleton.
> When {{ProfilingServiceTest}} eventually executes, its setup configuration is 
> ignored, and the test fails on the rolling deletion assertion as the path 
> defaults back to {{/tmp}}.
> h3. Test Scenario 2: Single JVM, High Parallelism Stress Test (200 Threads)
> Next, I ramped up the configuration to test heavy concurrent execution within 
> the same JVM by increasing both {{fixed.max-pool-size}} and 
> {{fixed.parallelism}} to *200*.
> *Observation (Before Fix - Severe Flakiness Masking):*
> Under heavy multi-threaded contention, I observed a highly dangerous side 
> effect of the {{close()}} anti-pattern.
> Out of the *5* test cases defined inside {{ProfilingServiceTest}}, *only 1 
> single test case actually executed*, while the remaining 4 were silently 
> dropped or swallowed. To make matters worse, Maven marked the entire run as a 
> success (Green Build):
> {noformat}
> [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 27.55 
> s -- in org.apache.flink.runtime.util.profiler.ProfilingServiceTest
> {noformat}
> This proves that without strict isolation and proper interface boundaries, 
> the current {{ProfilingService::close}} logic actively masks test execution 
> results during concurrent test execution, reporting successful builds despite 
> dropped test coverage.
> h2. Verification Results After Patch
> After applying the proposed changes (*Interface Segregation* via 
> {{ProfilingServiceSingleton}} + {{@Isolated}} on {{ProfilingServiceTest}} + 
> *Explicit* {{close()}} state reset in {{setUp}}):
> *Scenario 1 (Single Thread / Shared JVM):* Pass. Every preceding test runs 
> successfully, and {{ProfilingServiceTest}} cleanly resets the state to work 
> inside its assigned {{@TempDir}}.
> *Scenario 2 (200 Threads / High Contention):* Pass. All 5 test cases inside 
> ProfilingServiceTest are safely quarantined, execute successfully in their 
> entirety, and no longer swallow test executions or pollute the surrounding 
> environment.
> h2. Previous JIRAs around this issue:
> FLINK-38442: ProfilingServiceTest.testRollingDeletion is unstable (Resolved)
> FLINK-35571: ProfilingServiceTest.testRollingDeletion intermittently fails 
> due to improper test isolation (Open)
> FLINK-34013: ProfilingServiceTest.testRollingDeletion is unstable on AZP 
> (Resolved)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to