[ 
https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-16509:
--------------------------------
    Description: 
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:

{{ LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ 
LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); }}}

 

where as old (1.8.0) class was logging without calling getRuntimeContext :

 

{{ LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); 
}}}

 

This causes a regression in my Kafka source unit test with exception:


{{ java.lang.IllegalStateException: The runtime context has not been 
initialized.}}
{{     at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)}}
{{     at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)}}

{{ As the context is not always available at that point (initalizeState being 
called before open I guess)}}
{{  }}
{{  }}

  was:
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:

```

 LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ 
LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); }

```

where as old (1.8.0) class was logging without calling getRuntimeContext :

```

 LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); 
}

``` 

This causes a regression in my Kafka source unit test with exception:

``` 
java.lang.IllegalStateException: The runtime context has not been initialized.
    at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)
``` 
As the context is not always available at that point (initalizeState being 
called before open I guess)
 
 


> FlinkKafkaConsumerBase tries to log a context that may not have been 
> initialized and fails
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-16509
>                 URL: https://issues.apache.org/jira/browse/FLINK-16509
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.10.0
>         Environment: Unit test on local cluster, calling a unit test local 
> kafka server.
>            Reporter: Arnaud Linz
>            Priority: Major
>
> New code of FlinkKafkaConsumerBase#initializeState(), logs restored state 
> with:
> {{ LOG.info("Consumer subtask {} restored state: {}.", 
> getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ 
> LOG.info("Consumer subtask {} has no restore state.", 
> getRuntimeContext().getIndexOfThisSubtask()); }}}
>  
> where as old (1.8.0) class was logging without calling getRuntimeContext :
>  
> {{ LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
> restoredState); } else \{ LOG.info("No restore state for 
> FlinkKafkaConsumer."); }}}
>  
> This causes a regression in my Kafka source unit test with exception:
> {{ java.lang.IllegalStateException: The runtime context has not been 
> initialized.}}
> {{     at 
> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)}}
> {{     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)}}
> {{ As the context is not always available at that point (initalizeState being 
> called before open I guess)}}
> {{  }}
> {{  }}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to