[ https://issues.apache.org/jira/browse/BEAM-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-3703: ---------------------------------- Component/s: (was: beam-model) io-java-kafka > java.io.IOException: KafkaWriter : failed to send 1 records (since last > report) > ------------------------------------------------------------------------------- > > Key: BEAM-3703 > URL: https://issues.apache.org/jira/browse/BEAM-3703 > Project: Beam > Issue Type: Bug > Components: io-java-kafka > Affects Versions: 2.2.0 > Reporter: jagdeep sihota > Assignee: Kenneth Knowles > Priority: Major > Fix For: Not applicable > > > I am trying to read from file and write to Kafka in google cloud kafka and > getting following error: > > org.apache.beam.sdk.util.UserCodeException: java.io.IOException: KafkaWriter > : failed to send 1 records (since last report) > at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) > at > org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) > at > org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120) > at > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: KafkaWriter : failed to send 1 records (since > last report) > at > org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.checkForFailures(KafkaIO.java:1639) > at > org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.processElement(KafkaIO.java:1581) > Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update > metadata after 60000 ms. > > > > .apply(KafkaIO.<String, String>_write_() > .withBootstrapServers("ip1:9092,ip2:9092") > .withTopic("feed") > .withValueSerializer(StringSerializer.class) > .withKeySerializer(StringSerializer.class) > > //.updateProducerProperties(ImmutableMap.of("security.protocol","PLAINTEXT")) > //.updateProducerProperties(ImmutableMap.of("sasl.mechanism","PLAIN")) > > .values() // writes values to Kafka with default key > > Kafka is running on google cloud bitnami and I am using Flink runner > How do I pass security information to Kafka IO? -- This message was sent by Atlassian JIRA (v7.6.3#76005)