[
https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Arnaud Linz updated FLINK-16509:
--------------------------------
Description:
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:
{code:java}
(...)
LOG.info("Consumer subtask {} restored state: {}.",
getRuntimeContext().getIndexOfThisSubtask(), restoredState);
}
else {
LOG.info("Consumer subtask {} has no restore state.",
getRuntimeContext().getIndexOfThisSubtask());
}
{code}
where as old (1.8.0) class was logging without calling getRuntimeContext :
{code:java}
(...)
LOG.info("Setting restore state in the FlinkKafkaConsumer: {}",
restoredState);
}
else {
LOG.info("No restore state for FlinkKafkaConsumer.");
}{code}
This causes a regression in my Kafka source unit test with exception:
{code:java}
java.lang.IllegalStateException: The runtime context has not been initialized.
at
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
{code}
As the context is not always available at that point (initalizeState being
called before open I guess
was:
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:
{{ LOG.info("Consumer subtask {} restored state: {}.",
getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{
LOG.info("Consumer subtask {} has no restore state.",
getRuntimeContext().getIndexOfThisSubtask()); }}}
where as old (1.8.0) class was logging without calling getRuntimeContext :
{{ LOG.info("Setting restore state in the FlinkKafkaConsumer: {}",
restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer.");
}}}
This causes a regression in my Kafka source unit test with exception:
{{ java.lang.IllegalStateException: The runtime context has not been
initialized.}}
{{ at
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)}}
{{ at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)}}
{{ As the context is not always available at that point (initalizeState being
called before open I guess)}}
{{ }}
{{ }}
> FlinkKafkaConsumerBase tries to log a context that may not have been
> initialized and fails
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-16509
> URL: https://issues.apache.org/jira/browse/FLINK-16509
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.10.0
> Environment: Unit test on local cluster, calling a unit test local
> kafka server.
> Reporter: Arnaud Linz
> Priority: Major
>
> New code of FlinkKafkaConsumerBase#initializeState(), logs restored state
> with:
> {code:java}
> (...)
> LOG.info("Consumer subtask {} restored state: {}.",
> getRuntimeContext().getIndexOfThisSubtask(), restoredState);
> }
> else {
> LOG.info("Consumer subtask {} has no restore state.",
> getRuntimeContext().getIndexOfThisSubtask());
> }
> {code}
>
> where as old (1.8.0) class was logging without calling getRuntimeContext :
>
> {code:java}
> (...)
> LOG.info("Setting restore state in the FlinkKafkaConsumer: {}",
> restoredState);
> }
> else {
> LOG.info("No restore state for FlinkKafkaConsumer.");
> }{code}
>
> This causes a regression in my Kafka source unit test with exception:
> {code:java}
> java.lang.IllegalStateException: The runtime context has not been
> initialized. at
> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)
>
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>
> As the context is not always available at that point (initalizeState being
> called before open I guess
--
This message was sent by Atlassian Jira
(v8.3.4#803005)