[ 
https://issues.apache.org/jira/browse/FLINK-23433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387039#comment-17387039
 ] 

Yao Zhang commented on FLINK-23433:
-----------------------------------

Hi [~liufangliang],
I debugged the DeserializationSchema used by FlinkKafkaConsumer but I failed to 
reproduce this issue.
I reviewed the code in 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java

{code:java}
    @Override
    public void open(Configuration configuration) throws Exception {
        if (schema instanceof KeyedSerializationSchemaWrapper) {
            ((KeyedSerializationSchemaWrapper<IN>) schema)
                    .getSerializationSchema()
                    .open(
                            
RuntimeContextInitializationContextAdapters.serializationAdapter(
                                    getRuntimeContext(),
                                    metricGroup -> 
metricGroup.addGroup("user")));
        }
// ...
}
{code}
The open method of DeserializationSchema was invoked and a MetricGroup called 
"user" was added during the initialization of FlinkKafkaProducer.

Could you please comment below how you can reproduce this issue?


> Metrics cannot be initialized in format
> ---------------------------------------
>
>                 Key: FLINK-23433
>                 URL: https://issues.apache.org/jira/browse/FLINK-23433
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.11.3
>            Reporter: Fangliang Liu
>            Priority: Major
>
>  
> I want to use metrics in a custom format, I wrote it like this
> {code:java}
> ProtobufRowDeserializationSchema implements DeserializationSchema<Row>{
> private transient MetricGroup metrics;
> @Override 
> public void open(InitializationContext context) throws Exception { 
> metrics = context.getMetricGroup(); 
>  }
> }
> {code}
> But, received an `metrics` NPE. it stands to reason that the metrics have 
> already been initialized. 
>  
> [~jark], [~lzljs3620320] , Looking forward to your reply.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to