[
https://issues.apache.org/jira/browse/FLINK-23433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387039#comment-17387039
]
Yao Zhang commented on FLINK-23433:
-----------------------------------
Hi [~liufangliang],
I debugged the DeserializationSchema used by FlinkKafkaConsumer but I failed to
reproduce this issue.
I reviewed the code in
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
{code:java}
@Override
public void open(Configuration configuration) throws Exception {
if (schema instanceof KeyedSerializationSchemaWrapper) {
((KeyedSerializationSchemaWrapper<IN>) schema)
.getSerializationSchema()
.open(
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(),
metricGroup ->
metricGroup.addGroup("user")));
}
// ...
}
{code}
The open method of DeserializationSchema was invoked and a MetricGroup called
"user" was added during the initialization of FlinkKafkaProducer.
Could you please comment below how you can reproduce this issue?
> Metrics cannot be initialized in format
> ---------------------------------------
>
> Key: FLINK-23433
> URL: https://issues.apache.org/jira/browse/FLINK-23433
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.11.3
> Reporter: Fangliang Liu
> Priority: Major
>
>
> I want to use metrics in a custom format, I wrote it like this
> {code:java}
> ProtobufRowDeserializationSchema implements DeserializationSchema<Row>{
> private transient MetricGroup metrics;
> @Override
> public void open(InitializationContext context) throws Exception {
> metrics = context.getMetricGroup();
> }
> }
> {code}
> But, received an `metrics` NPE. it stands to reason that the metrics have
> already been initialized.
>
> [~jark], [~lzljs3620320] , Looking forward to your reply.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)