Hi Robert, No it was compile time issue. Actually i tried to write a string as well but it did not work for me. Just for clarity my flink-connector-kafka version is 0.10.1....
I was able to fix the issue... SimpleStringSchema should be replaced with JavaDefaultStringSchema as the later is doing conversion from String to Byte Array. Thanks for the help though. On Fri, Jan 22, 2016 at 1:34 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi, > > did you get any error message while executing the job? I don't think you > can serialize the "Demo" type with the "SimpleStringSchema". > > > > > On Fri, Jan 22, 2016 at 8:13 PM, Deepak Jha <dkjhan...@gmail.com> wrote: > > > Hi Devs, > > I just started using Flink and would like to ass kafka as Sink. I went > > through the documentation but so far I've not succeeded in writing to > Kafka > > from Flink.... > > > > I' building application in Scala.... Here is my code snippet > > > > case class *Demo*(city: String, country: String, zipcode: Int) > > > > The map stage returns an instance of Demo type > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > val properties = new Properties() > > > > properties.setProperty("bootstrap.servers", "127.0.0.1:9092") > > properties.setProperty("zookeeper.connect", "127.0.0.1:2181") > > properties.setProperty("group.id", "test_topic") > > val mapToDemo: String => Demo = {//Implementation} > > > > val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new > > SimpleStringSchema, properties)) > > > > stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo](" > 127.0.0.1:9092 > > ", > > "test_topic", new SimpleStringSchema())) > > > > Can anyone explain me what am I doing wrong in adding Kafka as Sink ? > > -- > > Thanks, > > Deepak Jha > > > -- Thanks, Deepak Jha