[
https://issues.apache.org/jira/browse/FLINK-12410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17055755#comment-17055755
]
Robert Metzger commented on FLINK-12410:
----------------------------------------
Okay, I now understand the problem. This simple Flink program reproduces the
issue
{code:java}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointInterval(5000L);
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("topic",
new SimpleStringSchema(), properties));
stream.print();
env.execute("Test Kafka");
}
{code}
and adding
{code:java}
env.getConfig().disableGenericTypes();
{code}
causes
{code}
10:46:34,808 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source -> Sink: Print to Std. Out (1/1)
(ed05d49097c9daa100041f45302523b7) switched from RUNNING to FAILED.
java.lang.UnsupportedOperationException: Generic types have been disabled in
the ExecutionConfig and type
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition is
treated as a generic type.
at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
at
org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102)
at
org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:304)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:289)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:219)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:860)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
{code}
the expected behavior would be that the KafkaSource works independent of the
serialization settings, and that there is no log message coming from internal
Flink code.
> KafkaTopicPartition cannot be used as a POJO type because not all fields are
> valid POJO fields
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-12410
> URL: https://issues.apache.org/jira/browse/FLINK-12410
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.8.0
> Reporter: Chethan UK
> Priority: Minor
>
> While Pushing data to Kafka Topic using Pojo, there is an error: "INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
> cannot be used as a POJO type because not all fields are valid POJO fields,
> and must be processed as GenericType"
> {code:java}
> // code placeholder
> 09:18:27,633 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
> does not contain a setter for field topic
> 09:18:27,634 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class
> class
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
> cannot be used as a POJO type because not all fields are valid POJO fields,
> and must be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on performance.
> 09:18:27,633 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
> does not contain a setter for field topic
> 09:18:27,635 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class
> class
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
> cannot be used as a POJO type because not all fields are valid POJO fields,
> and must be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on performance.
> 09:18:27,635 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
> does not contain a setter for field topic
> 09:18:27,636 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class
> class
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
> cannot be used as a POJO type because not all fields are valid POJO fields,
> and must be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on performance.
> 09:18:27,639 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
> does not contain a setter for field topic
> 09:18:27,642 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class
> class
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
> cannot be used as a POJO type because not all fields are valid POJO fields,
> and must be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on performance.
> 09:18:27,654 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
> restore state for FlinkKafkaConsumer.
> 09:18:27,655 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
> restore state for FlinkKafkaConsumer.
> 09:18:27,655 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
> restore state for FlinkKafkaConsumer.
> 09:18:27,654 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
> restore state for FlinkKafkaConsumer.
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)