Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/474#issuecomment-81792049
  
    There are no tests for the Kafka connectors.
    
    The code is not working.
    I've fixed an issue with the classloading already: 
https://github.com/rmetzger/flink/commit/347d667cc01ec0fd8322eab078c2b087297cce6e
    Currently, I'm facing the following problem
    ```
    03/16/2015 17:27:26 Job execution switched to status FAILING.
    03/16/2015 17:27:26 Job execution switched to status FAILED.
    org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.run(Client.java:344)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
        at com.dataartisans.KafkaDataGenerator.main(KafkaDataGenerator.java:137)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:248)
        at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:563)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:271)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:826)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:868)
    Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
execution failed.
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:295)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
        at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:89)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
        at 
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:145)
        at 
org.apache.flink.streaming.api.invokable.SourceInvokable.invoke(SourceInvokable.java:37)
        at 
org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:169)
        at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:205)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
        at 
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:145)
        at 
org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable.collect(FlatMapInvokable.java:49)
        at 
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:41)
        at 
org.apache.flink.streaming.api.function.source.GenSequenceFunction.run(GenSequenceFunction.java:42)
        at 
org.apache.flink.streaming.api.invokable.SourceInvokable.callUserFunction(SourceInvokable.java:42)
        at 
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        ... 4 more
    Caused by: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
        at 
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:145)
        at 
org.apache.flink.streaming.api.invokable.SinkInvokable.collect(SinkInvokable.java:47)
        at 
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:41)
        at 
com.dataartisans.KafkaDataGenerator$1.flatMap(KafkaDataGenerator.java:123)
        at 
com.dataartisans.KafkaDataGenerator$1.flatMap(KafkaDataGenerator.java:86)
        at 
org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable.callUserFunction(FlatMapInvokable.java:42)
        at 
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        ... 9 more
    Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at kafka.utils.Utils$.createObject(Utils.scala:440)
        at kafka.producer.Producer.<init>(Producer.scala:60)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at 
org.apache.flink.streaming.connectors.kafka.api.KafkaSink.initialize(KafkaSink.java:111)
        at 
org.apache.flink.streaming.connectors.kafka.api.KafkaSink.invoke(KafkaSink.java:127)
        at 
org.apache.flink.streaming.api.invokable.SinkInvokable.callUserFunction(SinkInvokable.java:41)
        at 
org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        ... 15 more
    Caused by: org.apache.commons.lang3.SerializationException: 
java.io.EOFException
        at 
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:232)
        at 
org.apache.flink.streaming.connectors.kafka.config.StringSerializer.deserialize(StringSerializer.java:50)
        at 
org.apache.flink.streaming.connectors.kafka.config.KafkaConfigWrapper.read(KafkaConfigWrapper.java:55)
        at 
org.apache.flink.streaming.connectors.kafka.config.KafkaConfigWrapper.<init>(KafkaConfigWrapper.java:49)
        at 
org.apache.flink.streaming.connectors.kafka.config.PartitionerWrapper.<init>(PartitionerWrapper.java:38)
        ... 26 more
    Caused by: java.io.EOFException
        at 
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325)
        at 
java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
        at 
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
        at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
        at 
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:222)
        ... 30 more
    ```
    
    You don't need a cluster to reproduce this. Starting it with 
`./bin/start-local.sh` to reproduce the issue is sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to