[ 
https://issues.apache.org/jira/browse/FLINK-12410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17055755#comment-17055755
 ] 

Robert Metzger commented on FLINK-12410:
----------------------------------------

Okay, I now understand the problem. This simple Flink program reproduces the 
issue
{code:java}
        public static void main(String[] args) throws Exception {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getCheckpointConfig().setCheckpointInterval(5000L);
                env.setParallelism(1);
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "localhost:9092");
                properties.setProperty("group.id", "test");
                DataStream<String> stream = env
                                .addSource(new FlinkKafkaConsumer<>("topic", 
new SimpleStringSchema(), properties));
                stream.print();
                env.execute("Test Kafka");
        }
{code}

and adding
{code:java}
env.getConfig().disableGenericTypes();
{code}

causes

{code}
10:46:34,808 INFO  org.apache.flink.runtime.taskmanager.Task                    
 - Source: Custom Source -> Sink: Print to Std. Out (1/1) 
(ed05d49097c9daa100041f45302523b7) switched from RUNNING to FAILED.
java.lang.UnsupportedOperationException: Generic types have been disabled in 
the ExecutionConfig and type 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition is 
treated as a generic type.
        at 
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
        at 
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
        at 
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
        at 
org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102)
        at 
org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:304)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:289)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:219)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:860)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
{code}

the expected behavior would be that the KafkaSource works independent of the 
serialization settings, and that there is no log message coming from internal 
Flink code.

> KafkaTopicPartition cannot be used as a POJO type because not all fields are 
> valid POJO fields
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-12410
>                 URL: https://issues.apache.org/jira/browse/FLINK-12410
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.8.0
>            Reporter: Chethan UK
>            Priority: Minor
>
> While Pushing data to Kafka Topic using Pojo, there is an error: "INFO 
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> cannot be used as a POJO type because not all fields are valid POJO fields, 
> and must be processed as GenericType"
> {code:java}
> // code placeholder
> 09:18:27,633 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> does not contain a setter for field topic
> 09:18:27,634 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class 
> class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> cannot be used as a POJO type because not all fields are valid POJO fields, 
> and must be processed as GenericType. Please read the Flink documentation on 
> "Data Types & Serialization" for details of the effect on performance.
> 09:18:27,633 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> does not contain a setter for field topic
> 09:18:27,635 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class 
> class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> cannot be used as a POJO type because not all fields are valid POJO fields, 
> and must be processed as GenericType. Please read the Flink documentation on 
> "Data Types & Serialization" for details of the effect on performance.
> 09:18:27,635 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> does not contain a setter for field topic
> 09:18:27,636 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class 
> class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> cannot be used as a POJO type because not all fields are valid POJO fields, 
> and must be processed as GenericType. Please read the Flink documentation on 
> "Data Types & Serialization" for details of the effect on performance.
> 09:18:27,639 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> does not contain a setter for field topic
> 09:18:27,642 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class 
> class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> cannot be used as a POJO type because not all fields are valid POJO fields, 
> and must be processed as GenericType. Please read the Flink documentation on 
> "Data Types & Serialization" for details of the effect on performance.
> 09:18:27,654 INFO 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No 
> restore state for FlinkKafkaConsumer.
> 09:18:27,655 INFO 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No 
> restore state for FlinkKafkaConsumer.
> 09:18:27,655 INFO 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No 
> restore state for FlinkKafkaConsumer.
> 09:18:27,654 INFO 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No 
> restore state for FlinkKafkaConsumer.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to