godfrey he created FLINK-23094:
----------------------------------
Summary: encounter thread-safe problem when using
StreamExecutionEnvironment#initializeContextEnvironment in multiple-threads
environment
Key: FLINK-23094
URL: https://issues.apache.org/jira/browse/FLINK-23094
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.13.0, 1.12.0, 1.11.0, 1.14.0
Reporter: godfrey he
encounter thread-safe problem when using
StreamExecutionEnvironment#initializeContextEnvironment (or related code, such
as PackagedProgramUtils#getPipelineFromProgram) in multiple-threads environment.
The reason is the {{initializeContextEnvironment}} method is not thread-safe:
{code:java}
protected static void
initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = ctx;
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
}
{code}
{{contextEnvironmentFactory}} is a static variable, and when
{{initializeContextEnvironment}} is executed in multiple-threads environment,
the value of {{contextEnvironmentFactory}} may be changed by other thread when
executing {{
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);}}
The solution is: use the local variable {{ctx}} instead of the static variable
{{contextEnvironmentFactory}}.
{code:java}
protected static void
initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = ctx;
threadLocalContextEnvironmentFactory.set(ctx);
}
{code}
Another thing I doubt is whether {{contextEnvironmentFactory}} is really needed
? Currently, {{contextEnvironmentFactory}} is not set or reset individually, it
is always changed with {{threadLocalContextEnvironmentFactory}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)