liwei li created FLINK-29555:
--------------------------------
Summary: GlobalStreamingCommitterHandler not call
notifyCheckpointCompleted after endOfInput
Key: FLINK-29555
URL: https://issues.apache.org/jira/browse/FLINK-29555
Project: Flink
Issue Type: Bug
Reporter: liwei li
env:
Flink 1.14.3
{code:java}
EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance();
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
{code}
code:
{code:java}
// Register the rows into a temporary table.
getTableEnv().createTemporaryView("sourceTable",
getTableEnv().fromValues(SimpleDataUtil.FLINK_SCHEMA.toRowDataType(),
Expressions.row(1, "hello"),
Expressions.row(2, "world"),
Expressions.row(3, (String) null),
Expressions.row(null, "bar")
)
);
// Redirect the records from source table to destination table.
sql("INSERT INTO %s SELECT id,data from sourceTable", TABLE_NAME);
{code}
Actual:
The data is not inserted into the table.
Here I use a custom global committer implement from `GlobalCommitter`. When I
debug this method, I notice that the close method is called immediately after
the endinput method is called. The committable is not sent to my global
committer.
The notifyCheckpointCompleted method in the GlobalStreamingCommitterHandler
class is not called after endOfInput. This doesn't seem to be the intended
effect.
It seemed to be a bug so I raised this issuse.
https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandler.java#L80-L100
--
This message was sent by Atlassian Jira
(v8.20.10#820010)