[
https://issues.apache.org/jira/browse/FLINK-12303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841258#comment-16841258
]
Aljoscha Krettek commented on FLINK-12303:
------------------------------------------
Hi [~snilard], yes you're right, this wasn't the real issue. I'm sorry about
that.
The real problem is that Scala 2.12 changes how Scala Lambdas are implemented
(they now use the same underlying mechanism as Java Lambdas, which is available
from Java 8 an onwards). It used to be that Kryo could serialize Scala Lambdas,
which is used as a serializer in your test cases. It can't do that anymore for
Scala 2.12 lambdas.
You can see exception if you enable logging, for example by adding these
dependencies to your {{build.sbt}}:
{code}
"org.slf4j" % "slf4j-log4j12" % "1.7.7" % "runtime",
"log4j" % "log4j" % "1.2.17" % "runtime"
{code}
and by putting a {{log4j.properties}} in {{src/resources}}:
{code}
log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x
- %m%n
{code}
This is the exception you will then find in the logs:
{code}
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
lambda (demo.LambdaCaseClass)
lambda (demo.TsEventClass)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224)
at
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
at
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:33)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPunctuatedWatermark(AbstractFetcher.java:459)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:404)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
... 26 more
{code}
> Scala 2.12 lambdas does not work in event classes inside streams.
> -----------------------------------------------------------------
>
> Key: FLINK-12303
> URL: https://issues.apache.org/jira/browse/FLINK-12303
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, API / Scala
> Affects Versions: 1.7.2
> Environment: Scala 2.11/2.12, Oracle Java 1.8.0_172
> Reporter: Matěj Novotný
> Priority: Major
>
> When you use lambdas inside event classes used in streams it does work in
> Scala 2.11. It stoped working in Scala 2.12. It does compile but does not
> process any data and does not throw any exception. I would expect that it
> would not compile in case I have used some not supported field in event class
> or I would throw some exception at least.
>
> For more detail check my demonstration repo, please:
> [https://github.com/matej-novotny/flink-lambda-bug]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)