[
https://issues.apache.org/jira/browse/FLINK-14327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16955057#comment-16955057
]
Zhu Zhu edited comment on FLINK-14327 at 10/19/19 3:07 AM:
-----------------------------------------------------------
Hi [~ASK5], the NPE happens in flinkBroadcast1.scala which seems to be user
code.
Could you check flinkBroadcast1.scala line 41?
If it's the code you attached, I guess it's these two lines in case that the
property is null.
{code:java}
val loc = v.get("locationID").asInstanceOf[String]
val temperature = v.get("temp").asDouble()
{code}
was (Author: zhuzh):
Hi [~ASK5], the NPE to happen in flinkBroadcast1.scala which seems to be user
code.
Could you check flinkBroadcast1.scala line 41?
If it's the code you attached, I guess it's these two lines in case that the
property is null.
{code:java}
val loc = v.get("locationID").asInstanceOf[String]
val temperature = v.get("temp").asDouble()
{code}
> Getting "Could not forward element to next operator" error
> ----------------------------------------------------------
>
> Key: FLINK-14327
> URL: https://issues.apache.org/jira/browse/FLINK-14327
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.9.0
> Reporter: ASK5
> Priority: Major
> Fix For: 1.9.2
>
> Attachments: so2.png
>
>
> val TEMPERATURE_THRESHOLD: Double = 50.00
> val see: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
> val properties = new Properties()
> properties.setProperty("zookeeper.connect", "localhost:2181")
> properties.setProperty("bootstrap.servers", "localhost:9092")
> val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast",
> new JSONKeyValueDeserializationSchema(false),
> properties)).name("kafkaSource")
> case class Event(locationID: String, temp: Double)
> var data = src.map { v => {
> val loc = v.get("locationID").asInstanceOf[String]
> val temperature = v.get("temp").asDouble()
> (loc, temperature)
> }}
> data = data
> .keyBy(
> v => v._1
> )
> data.print()
> see.execute()
> ---*********----
> And I'm getting the following error while consuming json file from Kafka:-
>
> {{Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.... at flinkBroadcast1$.main(flinkBroadcast1.scala:59) at
> flinkBroadcast1.main(flinkBroadcast1.scala)Caused by: java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator...Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator...Caused by:
> java.lang.NullPointerException}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)