[ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=140687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140687
 ]

ASF GitHub Bot logged work on BEAM-4038:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Sep/18 18:13
            Start Date: 03/Sep/18 18:13
    Worklog Time Spent: 10m 
      Work Description: stale[bot] closed pull request #5287: [BEAM-4038] 
Support writing ProducerRecords to Kafka
URL: https://github.com/apache/beam/pull/5287
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 0859233aad1..5ed9a04074e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1013,6 +1013,10 @@ private KafkaIO() {}
       );
     }
 
+    public PTransform<PCollection<ProducerRecord<K, V>>, PDone> writeRecords() 
{
+      return new ProducerRecordWrite<>(this);
+    }
+
     @Override
     public PDone expand(PCollection<KV<K, V>> input) {
       checkArgument(
@@ -1130,6 +1134,20 @@ public T decode(InputStream inStream) {
     }
   }
 
+  private static class ProducerRecordWrite<K, V> extends
+      PTransform<PCollection<ProducerRecord<K, V>>, PDone> {
+
+    private final Write<K, V> kvWriteTransform;
+
+    ProducerRecordWrite(Write<K, V> kvWriteTransform){
+      this.kvWriteTransform = kvWriteTransform;
+    }
+
+    @Override public PDone expand(PCollection<ProducerRecord<K, V>> input) {
+      input.apply(ParDo.of(new ProducerRecordWriter<>(kvWriteTransform)));
+      return PDone.in(input.getPipeline());
+    }
+  }
 
   /**
    * Attempt to infer a {@link Coder} by extracting the type of the 
deserialized-class from the
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java
new file mode 100644
index 00000000000..5424e4278ca
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java
@@ -0,0 +1,122 @@
+package org.apache.beam.sdk.io.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+
+/** */
+public class ProducerRecordCoder<K, V> extends 
StructuredCoder<ProducerRecord<K, V>> {
+
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  private static final VarLongCoder longCoder = VarLongCoder.of();
+  private static final VarIntCoder intCoder = VarIntCoder.of();
+  private static final IterableCoder headerCoder =
+      IterableCoder.of(KvCoder.of(stringCoder, ByteArrayCoder.of()));
+
+  private final KvCoder<K, V> kvCoder;
+
+  public static <K, V> ProducerRecordCoder<K, V> of(Coder<K> keyCoder, 
Coder<V> valueCoder) {
+    return new ProducerRecordCoder<>(keyCoder, valueCoder);
+  }
+
+  public ProducerRecordCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+    this.kvCoder = KvCoder.of(keyCoder, valueCoder);
+  }
+
+  @Override
+  public void encode(ProducerRecord<K, V> value, OutputStream outStream) 
throws IOException {
+    stringCoder.encode(value.topic(), outStream);
+    intCoder.encode(value.partition(), outStream);
+    longCoder.encode(value.timestamp(), outStream);
+    headerCoder.encode(toIterable(value), outStream);
+    kvCoder.encode(KV.of(value.key(), value.value()), outStream);
+  }
+
+  @Override
+  public ProducerRecord<K, V> decode(InputStream inStream) throws IOException {
+    String topic = stringCoder.decode(inStream);
+    Integer partition = intCoder.decode(inStream);
+    Long timestamp = longCoder.decode(inStream);
+    Headers headers = (Headers) toHeaders(headerCoder.decode(inStream));
+    KV<K, V> kv = kvCoder.decode(inStream);
+    if (ConsumerSpEL.hasHeaders) {
+      return new ProducerRecord<>(topic, partition, timestamp, kv.getKey(), 
kv.getValue(), headers);
+    }
+    return new ProducerRecord<>(topic, partition, timestamp, kv.getKey(), 
kv.getValue());
+  }
+
+  private Object toHeaders(Iterable<KV<String, byte[]>> records) {
+    if (!ConsumerSpEL.hasHeaders) {
+      return null;
+    }
+
+    // ConsumerRecord is used to simply create a list of headers
+    ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>("", 
0, 0L, "", "");
+    records.forEach(kv -> consumerRecord.headers().add(kv.getKey(), 
kv.getValue()));
+    return consumerRecord.headers();
+  }
+
+  private Iterable<KV<String, byte[]>> toIterable(ProducerRecord record) {
+    if (!ConsumerSpEL.hasHeaders){
+      return Collections.emptyList();
+    }
+
+    List<KV<String, byte[]>> vals = new ArrayList<>();
+    for (Header header : record.headers()) {
+      vals.add(KV.of(header.key(), header.value()));
+    }
+    return vals;
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return kvCoder.getCoderArguments();
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    kvCoder.verifyDeterministic();
+  }
+
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(ProducerRecord<K, V> value) {
+    return kvCoder.isRegisterByteSizeObserverCheap(KV.of(value.key(), 
value.value()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object structuralValue(ProducerRecord<K, V> value) {
+    if (consistentWithEquals()) {
+      return value;
+    } else {
+      return new ProducerRecord<>(
+          value.topic(),
+          value.partition(),
+          value.timestamp(),
+          value.key(),
+          value.value(),
+          value.headers());
+    }
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return kvCoder.consistentWithEquals();
+  }
+}
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordWriter.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordWriter.java
new file mode 100644
index 00000000000..180b642bb87
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordWriter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.io.kafka.KafkaIO.Write;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.SinkMetrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A DoFn to write to Kafka, used in KafkaIO Write transform.
+ * See {@link KafkaIO} for user visible documentation and example usage.
+ */
+class ProducerRecordWriter<K, V> extends DoFn<ProducerRecord<K, V>, Void> {
+
+  @Setup
+  public void setup() {
+    if (spec.getProducerFactoryFn() != null) {
+      producer = spec.getProducerFactoryFn().apply(producerConfig);
+    } else {
+      producer = new KafkaProducer<>(producerConfig);
+    }
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext ctx) throws Exception {
+    checkForFailures();
+
+    ProducerRecord<K, V> record = ctx.element();
+
+    producer.send(record, new SendCallback());
+
+    elementsWritten.inc();
+  }
+
+  @FinishBundle
+  public void finishBundle() throws IOException {
+    producer.flush();
+    checkForFailures();
+  }
+
+  @Teardown
+  public void teardown() {
+    producer.close();
+  }
+
+  
///////////////////////////////////////////////////////////////////////////////////
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ProducerRecordWriter.class);
+
+  private final Write<K, V> spec;
+  private final Map<String, Object> producerConfig;
+
+  private transient Producer<K, V> producer = null;
+  // first exception and number of failures since last invocation of 
checkForFailures():
+  private transient Exception sendException = null;
+  private transient long numSendFailures = 0;
+
+  private final Counter elementsWritten = SinkMetrics.elementsWritten();
+
+  ProducerRecordWriter(Write<K, V> spec) {
+    this.spec = spec;
+
+    this.producerConfig = new HashMap<>(spec.getProducerConfig());
+
+    this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                            spec.getKeySerializer());
+    this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                            spec.getValueSerializer());
+  }
+
+  private synchronized void checkForFailures() throws IOException {
+    if (numSendFailures == 0) {
+      return;
+    }
+
+    String msg = String.format(
+        "KafkaWriter : failed to send %d records (since last report)", 
numSendFailures);
+
+    Exception e = sendException;
+    sendException = null;
+    numSendFailures = 0;
+
+    LOG.warn(msg);
+    throw new IOException(msg, e);
+  }
+
+  private class SendCallback implements Callback {
+    @Override
+    public void onCompletion(RecordMetadata metadata, Exception exception) {
+      if (exception == null) {
+        return;
+      }
+
+      synchronized (ProducerRecordWriter.this) {
+        if (sendException == null) {
+          sendException = exception;
+        }
+        numSendFailures++;
+      }
+      // don't log exception stacktrace here, exception will be propagated up.
+      LOG.warn("send failed : '{}'", exception.getMessage());
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 140687)
    Time Spent: 10h 10m  (was: 10h)

> Support Kafka Headers in KafkaIO
> --------------------------------
>
>                 Key: BEAM-4038
>                 URL: https://issues.apache.org/jira/browse/BEAM-4038
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-kafka
>            Reporter: Geet Kumar
>            Assignee: Geet Kumar
>            Priority: Major
>          Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to