[ https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=140687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140687 ]
ASF GitHub Bot logged work on BEAM-4038: ---------------------------------------- Author: ASF GitHub Bot Created on: 03/Sep/18 18:13 Start Date: 03/Sep/18 18:13 Worklog Time Spent: 10m Work Description: stale[bot] closed pull request #5287: [BEAM-4038] Support writing ProducerRecords to Kafka URL: https://github.com/apache/beam/pull/5287 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 0859233aad1..5ed9a04074e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1013,6 +1013,10 @@ private KafkaIO() {} ); } + public PTransform<PCollection<ProducerRecord<K, V>>, PDone> writeRecords() { + return new ProducerRecordWrite<>(this); + } + @Override public PDone expand(PCollection<KV<K, V>> input) { checkArgument( @@ -1130,6 +1134,20 @@ public T decode(InputStream inStream) { } } + private static class ProducerRecordWrite<K, V> extends + PTransform<PCollection<ProducerRecord<K, V>>, PDone> { + + private final Write<K, V> kvWriteTransform; + + ProducerRecordWrite(Write<K, V> kvWriteTransform){ + this.kvWriteTransform = kvWriteTransform; + } + + @Override public PDone expand(PCollection<ProducerRecord<K, V>> input) { + input.apply(ParDo.of(new ProducerRecordWriter<>(kvWriteTransform))); + return PDone.in(input.getPipeline()); + } + } /** * Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java new file mode 100644 index 00000000000..5424e4278ca --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java @@ -0,0 +1,122 @@ +package org.apache.beam.sdk.io.kafka; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +/** */ +public class ProducerRecordCoder<K, V> extends StructuredCoder<ProducerRecord<K, V>> { + + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final VarLongCoder longCoder = VarLongCoder.of(); + private static final VarIntCoder intCoder = VarIntCoder.of(); + private static final IterableCoder headerCoder = + IterableCoder.of(KvCoder.of(stringCoder, ByteArrayCoder.of())); + + private final KvCoder<K, V> kvCoder; + + public static <K, V> ProducerRecordCoder<K, V> of(Coder<K> keyCoder, Coder<V> valueCoder) { + return new ProducerRecordCoder<>(keyCoder, valueCoder); + } + + public ProducerRecordCoder(Coder<K> keyCoder, Coder<V> valueCoder) { + this.kvCoder = KvCoder.of(keyCoder, valueCoder); + } + + @Override + public void encode(ProducerRecord<K, V> value, OutputStream outStream) throws IOException { + stringCoder.encode(value.topic(), outStream); + intCoder.encode(value.partition(), outStream); + longCoder.encode(value.timestamp(), outStream); + headerCoder.encode(toIterable(value), outStream); + kvCoder.encode(KV.of(value.key(), value.value()), outStream); + } + + @Override + public ProducerRecord<K, V> decode(InputStream inStream) throws IOException { + String topic = stringCoder.decode(inStream); + Integer partition = intCoder.decode(inStream); + Long timestamp = longCoder.decode(inStream); + Headers headers = (Headers) toHeaders(headerCoder.decode(inStream)); + KV<K, V> kv = kvCoder.decode(inStream); + if (ConsumerSpEL.hasHeaders) { + return new ProducerRecord<>(topic, partition, timestamp, kv.getKey(), kv.getValue(), headers); + } + return new ProducerRecord<>(topic, partition, timestamp, kv.getKey(), kv.getValue()); + } + + private Object toHeaders(Iterable<KV<String, byte[]>> records) { + if (!ConsumerSpEL.hasHeaders) { + return null; + } + + // ConsumerRecord is used to simply create a list of headers + ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>("", 0, 0L, "", ""); + records.forEach(kv -> consumerRecord.headers().add(kv.getKey(), kv.getValue())); + return consumerRecord.headers(); + } + + private Iterable<KV<String, byte[]>> toIterable(ProducerRecord record) { + if (!ConsumerSpEL.hasHeaders){ + return Collections.emptyList(); + } + + List<KV<String, byte[]>> vals = new ArrayList<>(); + for (Header header : record.headers()) { + vals.add(KV.of(header.key(), header.value())); + } + return vals; + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return kvCoder.getCoderArguments(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + kvCoder.verifyDeterministic(); + } + + @Override + public boolean isRegisterByteSizeObserverCheap(ProducerRecord<K, V> value) { + return kvCoder.isRegisterByteSizeObserverCheap(KV.of(value.key(), value.value())); + } + + @SuppressWarnings("unchecked") + @Override + public Object structuralValue(ProducerRecord<K, V> value) { + if (consistentWithEquals()) { + return value; + } else { + return new ProducerRecord<>( + value.topic(), + value.partition(), + value.timestamp(), + value.key(), + value.value(), + value.headers()); + } + } + + @Override + public boolean consistentWithEquals() { + return kvCoder.consistentWithEquals(); + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordWriter.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordWriter.java new file mode 100644 index 00000000000..180b642bb87 --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordWriter.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.io.kafka.KafkaIO.Write; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.SinkMetrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A DoFn to write to Kafka, used in KafkaIO Write transform. + * See {@link KafkaIO} for user visible documentation and example usage. + */ +class ProducerRecordWriter<K, V> extends DoFn<ProducerRecord<K, V>, Void> { + + @Setup + public void setup() { + if (spec.getProducerFactoryFn() != null) { + producer = spec.getProducerFactoryFn().apply(producerConfig); + } else { + producer = new KafkaProducer<>(producerConfig); + } + } + + @ProcessElement + public void processElement(ProcessContext ctx) throws Exception { + checkForFailures(); + + ProducerRecord<K, V> record = ctx.element(); + + producer.send(record, new SendCallback()); + + elementsWritten.inc(); + } + + @FinishBundle + public void finishBundle() throws IOException { + producer.flush(); + checkForFailures(); + } + + @Teardown + public void teardown() { + producer.close(); + } + + /////////////////////////////////////////////////////////////////////////////////// + + private static final Logger LOG = LoggerFactory.getLogger(ProducerRecordWriter.class); + + private final Write<K, V> spec; + private final Map<String, Object> producerConfig; + + private transient Producer<K, V> producer = null; + // first exception and number of failures since last invocation of checkForFailures(): + private transient Exception sendException = null; + private transient long numSendFailures = 0; + + private final Counter elementsWritten = SinkMetrics.elementsWritten(); + + ProducerRecordWriter(Write<K, V> spec) { + this.spec = spec; + + this.producerConfig = new HashMap<>(spec.getProducerConfig()); + + this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + spec.getKeySerializer()); + this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + spec.getValueSerializer()); + } + + private synchronized void checkForFailures() throws IOException { + if (numSendFailures == 0) { + return; + } + + String msg = String.format( + "KafkaWriter : failed to send %d records (since last report)", numSendFailures); + + Exception e = sendException; + sendException = null; + numSendFailures = 0; + + LOG.warn(msg); + throw new IOException(msg, e); + } + + private class SendCallback implements Callback { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception == null) { + return; + } + + synchronized (ProducerRecordWriter.this) { + if (sendException == null) { + sendException = exception; + } + numSendFailures++; + } + // don't log exception stacktrace here, exception will be propagated up. + LOG.warn("send failed : '{}'", exception.getMessage()); + } + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 140687) Time Spent: 10h 10m (was: 10h) > Support Kafka Headers in KafkaIO > -------------------------------- > > Key: BEAM-4038 > URL: https://issues.apache.org/jira/browse/BEAM-4038 > Project: Beam > Issue Type: New Feature > Components: io-java-kafka > Reporter: Geet Kumar > Assignee: Geet Kumar > Priority: Major > Time Spent: 10h 10m > Remaining Estimate: 0h > > Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The > purpose of this JIRA is to support this feature in KafkaIO. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)