Github user rmetzger commented on the pull request:
https://github.com/apache/flink/pull/474#issuecomment-81792049
There are no tests for the Kafka connectors.
The code is not working.
I've fixed an issue with the classloading already:
https://github.com/rmetzger/flink/commit/347d667cc01ec0fd8322eab078c2b087297cce6e
Currently, I'm facing the following problem
```
03/16/2015 17:27:26 Job execution switched to status FAILING.
03/16/2015 17:27:26 Job execution switched to status FAILED.
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.
at org.apache.flink.client.program.Client.run(Client.java:344)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
at com.dataartisans.KafkaDataGenerator.main(KafkaDataGenerator.java:137)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
at org.apache.flink.client.program.Client.run(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:563)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:271)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:826)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:868)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:295)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:89)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:145)
at
org.apache.flink.streaming.api.invokable.SourceInvokable.invoke(SourceInvokable.java:37)
at
org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:169)
at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:205)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:145)
at
org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable.collect(FlatMapInvokable.java:49)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:41)
at
org.apache.flink.streaming.api.function.source.GenSequenceFunction.run(GenSequenceFunction.java:42)
at
org.apache.flink.streaming.api.invokable.SourceInvokable.callUserFunction(SourceInvokable.java:42)
at
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
... 4 more
Caused by: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:145)
at
org.apache.flink.streaming.api.invokable.SinkInvokable.collect(SinkInvokable.java:47)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:41)
at
com.dataartisans.KafkaDataGenerator$1.flatMap(KafkaDataGenerator.java:123)
at
com.dataartisans.KafkaDataGenerator$1.flatMap(KafkaDataGenerator.java:86)
at
org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable.callUserFunction(FlatMapInvokable.java:42)
at
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
... 9 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at kafka.utils.Utils$.createObject(Utils.scala:440)
at kafka.producer.Producer.<init>(Producer.scala:60)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at
org.apache.flink.streaming.connectors.kafka.api.KafkaSink.initialize(KafkaSink.java:111)
at
org.apache.flink.streaming.connectors.kafka.api.KafkaSink.invoke(KafkaSink.java:127)
at
org.apache.flink.streaming.api.invokable.SinkInvokable.callUserFunction(SinkInvokable.java:41)
at
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
... 15 more
Caused by: org.apache.commons.lang3.SerializationException:
java.io.EOFException
at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232)
at
org.apache.flink.streaming.connectors.kafka.config.StringSerializer.deserialize(StringSerializer.java:50)
at
org.apache.flink.streaming.connectors.kafka.config.KafkaConfigWrapper.read(KafkaConfigWrapper.java:55)
at
org.apache.flink.streaming.connectors.kafka.config.KafkaConfigWrapper.<init>(KafkaConfigWrapper.java:49)
at
org.apache.flink.streaming.connectors.kafka.config.PartitionerWrapper.<init>(PartitionerWrapper.java:38)
... 26 more
Caused by: java.io.EOFException
at
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325)
at
java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
at
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222)
... 30 more
```
You don't need a cluster to reproduce this. Starting it with
`./bin/start-local.sh` to reproduce the issue is sufficient.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---