[ 
https://issues.apache.org/jira/browse/BEAM-6063?focusedWorklogId=168750&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168750
 ]

ASF GitHub Bot logged work on BEAM-6063:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Nov/18 10:41
            Start Date: 22/Nov/18 10:41
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #7052: [BEAM-6063] KafkaIO: 
add writing support with ProducerRecord
URL: https://github.com/apache/beam/pull/7052
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index d58d2a46e687..ad16243c23a1 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -47,7 +47,7 @@
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.io.kafka.KafkaIO.Write;
+import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.SinkMetrics;
@@ -88,7 +88,8 @@
  * Exactly-once sink transform for Kafka. See {@link KafkaIO} for user visible 
documentation and
  * example usage.
  */
-class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, 
PCollection<Void>> {
+class KafkaExactlyOnceSink<K, V>
+    extends PTransform<PCollection<ProducerRecord<K, V>>, PCollection<Void>> {
 
   // Dataflow ensures at-least once processing for side effects like sinks. In 
order to provide
   // exactly-once semantics, a sink needs to be idempotent or it should avoid 
writing records
@@ -132,7 +133,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaExactlyOnceSink.class);
   private static final String METRIC_NAMESPACE = "KafkaExactlyOnceSink";
 
-  private final Write<K, V> spec;
+  private final WriteRecords<K, V> spec;
 
   static void ensureEOSSupport() {
     checkArgument(
@@ -142,12 +143,12 @@ static void ensureEOSSupport() {
         "exactly-once semantics. Please use Kafka client version 0.11 or 
newer.");
   }
 
-  KafkaExactlyOnceSink(Write<K, V> spec) {
+  KafkaExactlyOnceSink(WriteRecords<K, V> spec) {
     this.spec = spec;
   }
 
   @Override
-  public PCollection<Void> expand(PCollection<KV<K, V>> input) {
+  public PCollection<Void> expand(PCollection<ProducerRecord<K, V>> input) {
 
     int numShards = spec.getNumShards();
     if (numShards <= 0) {
@@ -164,7 +165,7 @@ static void ensureEOSSupport() {
 
     return input
         .apply(
-            Window.<KV<K, V>>into(new GlobalWindows()) // Everything into 
global window.
+            Window.<ProducerRecord<K, V>>into(new GlobalWindows()) // 
Everything into global window.
                 
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                 .discardingFiredPanes())
         .apply(
@@ -180,7 +181,7 @@ static void ensureEOSSupport() {
 
   /** Shuffle messages assigning each randomly to a shard. */
   private static class Reshard<K, V>
-      extends DoFn<KV<K, V>, KV<Integer, TimestampedValue<KV<K, V>>>> {
+      extends DoFn<ProducerRecord<K, V>, KV<Integer, 
TimestampedValue<ProducerRecord<K, V>>>> {
 
     private final int numShards;
     private transient int shardId;
@@ -203,8 +204,8 @@ public void processElement(ProcessContext ctx) {
 
   private static class Sequencer<K, V>
       extends DoFn<
-          KV<Integer, Iterable<TimestampedValue<KV<K, V>>>>,
-          KV<Integer, KV<Long, TimestampedValue<KV<K, V>>>>> {
+          KV<Integer, Iterable<TimestampedValue<ProducerRecord<K, V>>>>,
+          KV<Integer, KV<Long, TimestampedValue<ProducerRecord<K, V>>>>> {
 
     private static final String NEXT_ID = "nextId";
 
@@ -215,7 +216,7 @@ public void processElement(ProcessContext ctx) {
     public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState, 
ProcessContext ctx) {
       long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L);
       int shard = ctx.element().getKey();
-      for (TimestampedValue<KV<K, V>> value : ctx.element().getValue()) {
+      for (TimestampedValue<ProducerRecord<K, V>> value : 
ctx.element().getValue()) {
         ctx.output(KV.of(shard, KV.of(nextId, value)));
         nextId++;
       }
@@ -224,7 +225,7 @@ public void processElement(@StateId(NEXT_ID) 
ValueState<Long> nextIdState, Proce
   }
 
   private static class ExactlyOnceWriter<K, V>
-      extends DoFn<KV<Integer, Iterable<KV<Long, TimestampedValue<KV<K, 
V>>>>>, Void> {
+      extends DoFn<KV<Integer, Iterable<KV<Long, 
TimestampedValue<ProducerRecord<K, V>>>>>, Void> {
 
     private static final String NEXT_ID = "nextId";
     private static final String MIN_BUFFERED_ID = "minBufferedId";
@@ -242,7 +243,8 @@ public void processElement(@StateId(NEXT_ID) 
ValueState<Long> nextIdState, Proce
     private final StateSpec<ValueState<Long>> minBufferedIdSpec = 
StateSpecs.value();
 
     @StateId(OUT_OF_ORDER_BUFFER)
-    private final StateSpec<BagState<KV<Long, TimestampedValue<KV<K, V>>>>> 
outOfOrderBufferSpec;
+    private final StateSpec<BagState<KV<Long, 
TimestampedValue<ProducerRecord<K, V>>>>>
+        outOfOrderBufferSpec;
     // A random id assigned to each shard. Helps with detecting when multiple 
jobs are mistakenly
     // started with same groupId used for storing state on Kafka side, 
including the case where
     // a job is restarted with same groupId, but the metadata from previous 
run was not cleared.
@@ -250,7 +252,7 @@ public void processElement(@StateId(NEXT_ID) 
ValueState<Long> nextIdState, Proce
     @StateId(WRITER_ID)
     private final StateSpec<ValueState<String>> writerIdSpec = 
StateSpecs.value();
 
-    private final Write<K, V> spec;
+    private final WriteRecords<K, V> spec;
 
     // Metrics
     private final Counter elementsWritten = SinkMetrics.elementsWritten();
@@ -258,7 +260,7 @@ public void processElement(@StateId(NEXT_ID) 
ValueState<Long> nextIdState, Proce
     private final Counter elementsBuffered = Metrics.counter(METRIC_NAMESPACE, 
"elementsBuffered");
     private final Counter numTransactions = Metrics.counter(METRIC_NAMESPACE, 
"numTransactions");
 
-    ExactlyOnceWriter(Write<K, V> spec, Coder<KV<K, V>> elemCoder) {
+    ExactlyOnceWriter(WriteRecords<K, V> spec, Coder<ProducerRecord<K, V>> 
elemCoder) {
       this.spec = spec;
       this.outOfOrderBufferSpec =
           StateSpecs.bag(KvCoder.of(BigEndianLongCoder.of(), 
TimestampedValueCoder.of(elemCoder)));
@@ -276,7 +278,8 @@ public void setup() {
     public void processElement(
         @StateId(NEXT_ID) ValueState<Long> nextIdState,
         @StateId(MIN_BUFFERED_ID) ValueState<Long> minBufferedIdState,
-        @StateId(OUT_OF_ORDER_BUFFER) BagState<KV<Long, TimestampedValue<KV<K, 
V>>>> oooBufferState,
+        @StateId(OUT_OF_ORDER_BUFFER)
+            BagState<KV<Long, TimestampedValue<ProducerRecord<K, V>>>> 
oooBufferState,
         @StateId(WRITER_ID) ValueState<String> writerIdState,
         ProcessContext ctx)
         throws IOException {
@@ -316,10 +319,11 @@ public void processElement(
         // There might be out of order messages buffered in earlier 
iterations. These
         // will get merged if and when minBufferedId matches nextId.
 
-        Iterator<KV<Long, TimestampedValue<KV<K, V>>>> iter = 
ctx.element().getValue().iterator();
+        Iterator<KV<Long, TimestampedValue<ProducerRecord<K, V>>>> iter =
+            ctx.element().getValue().iterator();
 
         while (iter.hasNext()) {
-          KV<Long, TimestampedValue<KV<K, V>>> kv = iter.next();
+          KV<Long, TimestampedValue<ProducerRecord<K, V>>> kv = iter.next();
           long recordId = kv.getKey();
 
           if (recordId < nextId) {
@@ -364,7 +368,7 @@ public void processElement(
             // Read all of them in to memory and sort them. Reading into memory
             // might be problematic in extreme cases. Might need to improve it 
in future.
 
-            List<KV<Long, TimestampedValue<KV<K, V>>>> buffered =
+            List<KV<Long, TimestampedValue<ProducerRecord<K, V>>>> buffered =
                 Lists.newArrayList(oooBufferState.read());
             buffered.sort(new KV.OrderByKey<>());
 
@@ -440,7 +444,7 @@ private ShardMetadata() { // for json deserializer
       private final String writerId;
       private final Producer<K, V> producer;
       private final String producerName;
-      private final Write<K, V> spec;
+      private final WriteRecords<K, V> spec;
       private long committedId;
 
       ShardWriter(
@@ -448,7 +452,7 @@ private ShardMetadata() { // for json deserializer
           String writerId,
           Producer<K, V> producer,
           String producerName,
-          Write<K, V> spec,
+          WriteRecords<K, V> spec,
           long committedId) {
         this.shard = shard;
         this.writerId = writerId;
@@ -462,7 +466,8 @@ void beginTxn() {
         ProducerSpEL.beginTransaction(producer);
       }
 
-      Future<RecordMetadata> sendRecord(TimestampedValue<KV<K, V>> record, 
Counter sendCounter) {
+      Future<RecordMetadata> sendRecord(
+          TimestampedValue<ProducerRecord<K, V>> record, Counter sendCounter) {
         try {
           Long timestampMillis =
               spec.getPublishTimestampFunction() != null
@@ -477,8 +482,8 @@ void beginTxn() {
                       spec.getTopic(),
                       null,
                       timestampMillis,
-                      record.getValue().getKey(),
-                      record.getValue().getValue()));
+                      record.getValue().key(),
+                      record.getValue().value()));
           sendCounter.inc();
           return result;
         } catch (KafkaException e) {
@@ -678,7 +683,7 @@ void insert(int shard, ShardWriter<K, V> writer) {
    * Opens a generic consumer that is mainly meant for metadata operations 
like fetching number of
    * partitions for a topic rather than for fetching messages.
    */
-  private static Consumer<?, ?> openConsumer(Write<?, ?> spec) {
+  private static Consumer<?, ?> openConsumer(WriteRecords<?, ?> spec) {
     return spec.getConsumerFactoryFn()
         .apply(
             ImmutableMap.of(
@@ -693,7 +698,7 @@ void insert(int shard, ShardWriter<K, V> writer) {
   }
 
   private static <K, V> Producer<K, V> initializeExactlyOnceProducer(
-      Write<K, V> spec, String producerName) {
+      WriteRecords<K, V> spec, String producerName) {
 
     Map<String, Object> producerConfig = new 
HashMap<>(spec.getProducerConfig());
     producerConfig.putAll(
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 31ba72c54ba2..ad7fc8fa1220 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -160,8 +160,10 @@
  * <h3>Writing to Kafka</h3>
  *
  * <p>KafkaIO sink supports writing key-value pairs to a Kafka topic. Users 
can also write just the
- * values. To configure a Kafka sink, you must specify at the minimum Kafka
- * <tt>bootstrapServers</tt>, the topic to write to, and key and value 
serializers. For example:
+ * values or native Kafka producer records using {@link
+ * org.apache.kafka.clients.producer.ProducerRecord}. To configure a Kafka 
sink, you must specify at
+ * the minimum Kafka <tt>bootstrapServers</tt>, the topic to write to, and key 
and value
+ * serializers. For example:
  *
  * <pre>{@code
  * PCollection<KV<Long, String>> kvColl = ...;
@@ -199,6 +201,19 @@
  *   );
  * }</pre>
  *
+ * <p>Also, if you want to write Kafka {@link ProducerRecord} then you should 
use {@link
+ * KafkaIO#writeRecords()}:
+ *
+ * <pre>{@code
+ * PCollection<ProducerRecord<Long, String>> records = ...;
+ * records.apply(KafkaIO.<Long, String>writeRecords()
+ *     .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *     .withTopic("results")
+ *     .withKeySerializer(LongSerializer.class)
+ *     .withValueSerializer(StringSerializer.class)
+ *   );
+ * }</pre>
+ *
  * <h3>Advanced Kafka Configuration</h3>
  *
  * KafkaIO allows setting most of the properties in {@link ConsumerConfig} for 
source or in {@link
@@ -261,7 +276,24 @@
    */
   public static <K, V> Write<K, V> write() {
     return new AutoValue_KafkaIO_Write.Builder<K, V>()
-        .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES)
+        .setWriteRecordsTransform(
+            new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()
+                .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)
+                .setEOS(false)
+                .setNumShards(0)
+                .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
+                .build())
+        .build();
+  }
+
+  /**
+   * Creates an uninitialized {@link WriteRecords} {@link PTransform}. Before 
use, Kafka
+   * configuration should be set with {@link 
WriteRecords#withBootstrapServers(String)} and {@link
+   * WriteRecords#withTopic} along with {@link Deserializer}s for (optional) 
key and values.
+   */
+  public static <K, V> WriteRecords<K, V> writeRecords() {
+    return new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()
+        .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)
         .setEOS(false)
         .setNumShards(0)
         .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
@@ -854,11 +886,17 @@ private KafkaIO() {}
   //////////////////////// Sink Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
 
   /**
-   * A {@link PTransform} to write to a Kafka topic. See {@link KafkaIO} for 
more information on
-   * usage and configuration.
+   * A {@link PTransform} to write to a Kafka topic with ProducerRecord's. See 
{@link KafkaIO} for
+   * more information on usage and configuration.
    */
   @AutoValue
-  public abstract static class Write<K, V> extends 
PTransform<PCollection<KV<K, V>>, PDone> {
+  public abstract static class WriteRecords<K, V>
+      extends PTransform<PCollection<ProducerRecord<K, V>>, PDone> {
+    // TODO (Version 3.0): Create the only one generic {@code Write<T>} 
transform which will be
+    // parameterized depending on type of input collection (KV, 
ProducerRecords, etc). In such case,
+    // we shouldn't have to duplicate the same API for similar transforms like 
{@link Write} and
+    // {@link WriteRecords}. See example at {@link PubsubIO.Write}.
+
     @Nullable
     abstract String getTopic();
 
@@ -874,7 +912,7 @@ private KafkaIO() {}
     abstract Class<? extends Serializer<V>> getValueSerializer();
 
     @Nullable
-    abstract KafkaPublishTimestampFunction<KV<K, V>> 
getPublishTimestampFunction();
+    abstract KafkaPublishTimestampFunction<ProducerRecord<K, V>> 
getPublishTimestampFunction();
 
     // Configuration for EOS sink
     abstract boolean isEOS();
@@ -904,7 +942,7 @@ private KafkaIO() {}
       abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> 
serializer);
 
       abstract Builder<K, V> setPublishTimestampFunction(
-          KafkaPublishTimestampFunction<KV<K, V>> timestampFunction);
+          KafkaPublishTimestampFunction<ProducerRecord<K, V>> 
timestampFunction);
 
       abstract Builder<K, V> setEOS(boolean eosEnabled);
 
@@ -915,20 +953,20 @@ private KafkaIO() {}
       abstract Builder<K, V> setConsumerFactoryFn(
           SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> 
fn);
 
-      abstract Write<K, V> build();
+      abstract WriteRecords<K, V> build();
     }
 
     /**
      * Returns a new {@link Write} transform with Kafka producer pointing to 
{@code
      * bootstrapServers}.
      */
-    public Write<K, V> withBootstrapServers(String bootstrapServers) {
+    public WriteRecords<K, V> withBootstrapServers(String bootstrapServers) {
       return updateProducerProperties(
           ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers));
     }
 
     /** Sets the Kafka topic to write to. */
-    public Write<K, V> withTopic(String topic) {
+    public WriteRecords<K, V> withTopic(String topic) {
       return toBuilder().setTopic(topic).build();
     }
 
@@ -938,19 +976,19 @@ private KafkaIO() {}
      * <p>A key is optional while writing to Kafka. Note when a key is set, 
its hash is used to
      * determine partition in Kafka (see {@link ProducerRecord} for more 
details).
      */
-    public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> 
keySerializer) {
+    public WriteRecords<K, V> withKeySerializer(Class<? extends Serializer<K>> 
keySerializer) {
       return toBuilder().setKeySerializer(keySerializer).build();
     }
 
     /** Sets a {@link Serializer} for serializing value to bytes. */
-    public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> 
valueSerializer) {
+    public WriteRecords<K, V> withValueSerializer(Class<? extends 
Serializer<V>> valueSerializer) {
       return toBuilder().setValueSerializer(valueSerializer).build();
     }
 
     /**
      * Adds the given producer properties, overriding old values of properties 
with the same key.
      */
-    public Write<K, V> updateProducerProperties(Map<String, Object> 
configUpdates) {
+    public WriteRecords<K, V> updateProducerProperties(Map<String, Object> 
configUpdates) {
       Map<String, Object> config =
           updateKafkaProperties(getProducerConfig(), 
IGNORED_PRODUCER_PROPERTIES, configUpdates);
       return toBuilder().setProducerConfig(config).build();
@@ -960,7 +998,7 @@ private KafkaIO() {}
      * Sets a custom function to create Kafka producer. Primarily used for 
tests. Default is {@link
      * KafkaProducer}
      */
-    public Write<K, V> withProducerFactoryFn(
+    public WriteRecords<K, V> withProducerFactoryFn(
         SerializableFunction<Map<String, Object>, Producer<K, V>> 
producerFactoryFn) {
       return toBuilder().setProducerFactoryFn(producerFactoryFn).build();
     }
@@ -972,7 +1010,7 @@ private KafkaIO() {}
      * processing messages from the past, they might be deleted immediately by 
Kafka after being
      * published if the timestamps are older than Kafka cluster's {@code 
log.retention.hours}.
      */
-    public Write<K, V> withInputTimestamp() {
+    public WriteRecords<K, V> withInputTimestamp() {
       return 
withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
     }
 
@@ -981,9 +1019,12 @@ private KafkaIO() {}
      * NOTE: Kafka's retention policies are based on message timestamps. If 
the pipeline is
      * processing messages from the past, they might be deleted immediately by 
Kafka after being
      * published if the timestamps are older than Kafka cluster's {@code 
log.retention.hours}.
+     *
+     * @deprecated use {@code ProducerRecords} to set publish timestamp.
      */
-    public Write<K, V> withPublishTimestampFunction(
-        KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) {
+    @Deprecated
+    public WriteRecords<K, V> withPublishTimestampFunction(
+        KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction) 
{
       return 
toBuilder().setPublishTimestampFunction(timestampFunction).build();
     }
 
@@ -1024,7 +1065,7 @@ private KafkaIO() {}
      *     common mistakes so that it does not end up using state that does 
not <i>seem</i> to be
      *     written by the same job.
      */
-    public Write<K, V> withEOS(int numShards, String sinkGroupId) {
+    public WriteRecords<K, V> withEOS(int numShards, String sinkGroupId) {
       KafkaExactlyOnceSink.ensureEOSSupport();
       checkArgument(numShards >= 1, "numShards should be >= 1");
       checkArgument(sinkGroupId != null, "sinkGroupId is required for 
exactly-once sink");
@@ -1037,27 +1078,17 @@ private KafkaIO() {}
      * Similar to {@link Read#withConsumerFactoryFn(SerializableFunction)}, a 
factory function can
      * be supplied if required in a specific case. The default is {@link 
KafkaConsumer}.
      */
-    public Write<K, V> withConsumerFactoryFn(
+    public WriteRecords<K, V> withConsumerFactoryFn(
         SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> 
consumerFactoryFn) {
       return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
     }
 
-    /**
-     * Writes just the values to Kafka. This is useful for writing collections 
of values rather
-     * thank {@link KV}s.
-     */
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    public PTransform<PCollection<V>, PDone> values() {
-      return new KafkaValueWrite<>(
-          toBuilder().setKeySerializer((Class) 
StringSerializer.class).build());
-    }
-
     @Override
-    public PDone expand(PCollection<KV<K, V>> input) {
+    public PDone expand(PCollection<ProducerRecord<K, V>> input) {
       checkArgument(
           getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != 
null,
           "withBootstrapServers() is required");
-      checkArgument(getTopic() != null, "withTopic() is required");
+
       checkArgument(getKeySerializer() != null, "withKeySerializer() is 
required");
       checkArgument(getValueSerializer() != null, "withValueSerializer() is 
required");
 
@@ -1122,6 +1153,181 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
+  /**
+   * A {@link PTransform} to write to a Kafka topic with KVs . See {@link 
KafkaIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class Write<K, V> extends 
PTransform<PCollection<KV<K, V>>, PDone> {
+    // TODO (Version 3.0): Create the only one generic {@code Write<T>} 
transform which will be
+    // parameterized depending on type of input collection (KV, 
ProducerRecords, etc). In such case,
+    // we shouldn't have to duplicate the same API for similar transforms like 
{@link Write} and
+    // {@link WriteRecords}. See example at {@link PubsubIO.Write}.
+
+    @Nullable
+    abstract String getTopic();
+
+    abstract WriteRecords<K, V> getWriteRecordsTransform();
+
+    abstract Builder<K, V> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+      abstract Builder<K, V> setTopic(String topic);
+
+      abstract Builder<K, V> setWriteRecordsTransform(WriteRecords<K, V> 
transform);
+
+      abstract Write<K, V> build();
+    }
+
+    /** Used mostly to reduce using of boilerplate of wrapping {@link 
WriteRecords} methods. */
+    private Write<K, V> withWriteRecordsTransform(WriteRecords<K, V> 
transform) {
+      return toBuilder().setWriteRecordsTransform(transform).build();
+    }
+
+    /**
+     * Wrapper method over {@link WriteRecords#withBootstrapServers(String)}, 
used to keep the
+     * compatibility with old API based on KV type of element.
+     */
+    public Write<K, V> withBootstrapServers(String bootstrapServers) {
+      return withWriteRecordsTransform(
+          getWriteRecordsTransform().withBootstrapServers(bootstrapServers));
+    }
+
+    /**
+     * Wrapper method over {@link WriteRecords#withTopic(String)}, used to 
keep the compatibility
+     * with old API based on KV type of element.
+     */
+    public Write<K, V> withTopic(String topic) {
+      return toBuilder()
+          .setTopic(topic)
+          
.setWriteRecordsTransform(getWriteRecordsTransform().withTopic(topic))
+          .build();
+    }
+
+    /**
+     * Wrapper method over {@link WriteRecords#withKeySerializer(Class)}, used 
to keep the
+     * compatibility with old API based on KV type of element.
+     */
+    public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> 
keySerializer) {
+      return 
withWriteRecordsTransform(getWriteRecordsTransform().withKeySerializer(keySerializer));
+    }
+
+    /**
+     * Wrapper method over {@link WriteRecords#withValueSerializer(Class)}, 
used to keep the
+     * compatibility with old API based on KV type of element.
+     */
+    public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> 
valueSerializer) {
+      return withWriteRecordsTransform(
+          getWriteRecordsTransform().withValueSerializer(valueSerializer));
+    }
+
+    /**
+     * Wrapper method over {@link 
WriteRecords#withProducerFactoryFn(SerializableFunction)}, used to
+     * keep the compatibility with old API based on KV type of element.
+     */
+    public Write<K, V> withProducerFactoryFn(
+        SerializableFunction<Map<String, Object>, Producer<K, V>> 
producerFactoryFn) {
+      return withWriteRecordsTransform(
+          getWriteRecordsTransform().withProducerFactoryFn(producerFactoryFn));
+    }
+
+    /**
+     * Wrapper method over {@link WriteRecords#withInputTimestamp()}, used to 
keep the compatibility
+     * with old API based on KV type of element.
+     */
+    public Write<K, V> withInputTimestamp() {
+      return 
withWriteRecordsTransform(getWriteRecordsTransform().withInputTimestamp());
+    }
+
+    /**
+     * Wrapper method over {@link
+     * 
WriteRecords#withPublishTimestampFunction(KafkaPublishTimestampFunction)}, used 
to keep the
+     * compatibility with old API based on KV type of element.
+     *
+     * @deprecated use {@link WriteRecords} and {@code ProducerRecords} to set 
publish timestamp.
+     */
+    @Deprecated
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public Write<K, V> withPublishTimestampFunction(
+        KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) {
+      return withWriteRecordsTransform(
+          getWriteRecordsTransform()
+              .withPublishTimestampFunction(new 
PublishTimestampFunctionKV(timestampFunction)));
+    }
+
+    /**
+     * Wrapper method over {@link WriteRecords#withEOS(int, String)}, used to 
keep the compatibility
+     * with old API based on KV type of element.
+     */
+    public Write<K, V> withEOS(int numShards, String sinkGroupId) {
+      return 
withWriteRecordsTransform(getWriteRecordsTransform().withEOS(numShards, 
sinkGroupId));
+    }
+
+    /**
+     * Wrapper method over {@link 
WriteRecords#withConsumerFactoryFn(SerializableFunction)}, used to
+     * keep the compatibility with old API based on KV type of element.
+     */
+    public Write<K, V> withConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> 
consumerFactoryFn) {
+      return withWriteRecordsTransform(
+          getWriteRecordsTransform().withConsumerFactoryFn(consumerFactoryFn));
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<K, V>> input) {
+      checkArgument(getTopic() != null, "withTopic() is required");
+
+      KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
+      return input
+          .apply(
+              "Kafka ProducerRecord",
+              MapElements.via(
+                  new SimpleFunction<KV<K, V>, ProducerRecord<K, V>>() {
+                    @Override
+                    public ProducerRecord<K, V> apply(KV<K, V> element) {
+                      return new ProducerRecord<>(getTopic(), 
element.getKey(), element.getValue());
+                    }
+                  }))
+          .setCoder(ProducerRecordCoder.of(kvCoder.getKeyCoder(), 
kvCoder.getValueCoder()))
+          .apply(getWriteRecordsTransform());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      getWriteRecordsTransform().populateDisplayData(builder);
+    }
+
+    /**
+     * Writes just the values to Kafka. This is useful for writing collections 
of values rather
+     * thank {@link KV}s.
+     */
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public PTransform<PCollection<V>, PDone> values() {
+      return new KafkaValueWrite<K, V>(this.withKeySerializer((Class) 
StringSerializer.class));
+    }
+
+    /**
+     * Wrapper class which allows to use {@code 
KafkaPublishTimestampFunction<KV<K, V>} with {@link
+     * 
WriteRecords#withPublishTimestampFunction(KafkaPublishTimestampFunction)}.
+     */
+    private static class PublishTimestampFunctionKV<K, V>
+        implements KafkaPublishTimestampFunction<ProducerRecord<K, V>> {
+
+      private KafkaPublishTimestampFunction<KV<K, V>> fn;
+
+      public PublishTimestampFunctionKV(KafkaPublishTimestampFunction<KV<K, 
V>> fn) {
+        this.fn = fn;
+      }
+
+      @Override
+      public Instant getTimestamp(ProducerRecord<K, V> e, Instant ts) {
+        return fn.getTimestamp(KV.of(e.key(), e.value()), ts);
+      }
+    }
+  }
+
   /**
    * Same as {@code Write<K, V>} without a Key. Null is used for key as it is 
the convention is
    * Kafka when there is no key specified. Majority of Kafka writers don't 
specify a key.
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
index beaa9a220530..fe3db765b02a 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
@@ -20,7 +20,7 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.sdk.io.kafka.KafkaIO.Write;
+import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -35,10 +35,10 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * A DoFn to write to Kafka, used in KafkaIO Write transform. See {@link 
KafkaIO} for user visible
- * documentation and example usage.
+ * A DoFn to write to Kafka, used in KafkaIO WriteRecords transform. See 
{@link KafkaIO} for user
+ * visible documentation and example usage.
  */
-class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> {
+class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, Void> {
 
   @Setup
   public void setup() {
@@ -55,10 +55,12 @@ public void setup() {
   public void processElement(ProcessContext ctx) throws Exception {
     checkForFailures();
 
-    KV<K, V> kv = ctx.element();
+    ProducerRecord<K, V> record = ctx.element();
+    KV<K, V> kv = KV.of(record.key(), record.value());
+
     Long timestampMillis =
         spec.getPublishTimestampFunction() != null
-            ? spec.getPublishTimestampFunction().getTimestamp(kv, 
ctx.timestamp()).getMillis()
+            ? spec.getPublishTimestampFunction().getTimestamp(record, 
ctx.timestamp()).getMillis()
             : null;
 
     producer.send(
@@ -83,7 +85,7 @@ public void teardown() {
 
   private static final Logger LOG = LoggerFactory.getLogger(KafkaWriter.class);
 
-  private final Write<K, V> spec;
+  private final WriteRecords<K, V> spec;
   private final Map<String, Object> producerConfig;
 
   private transient Producer<K, V> producer = null;
@@ -93,7 +95,7 @@ public void teardown() {
 
   private final Counter elementsWritten = SinkMetrics.elementsWritten();
 
-  KafkaWriter(Write<K, V> spec) {
+  KafkaWriter(WriteRecords<K, V> spec) {
     this.spec = spec;
 
     this.producerConfig = new HashMap<>(spec.getProducerConfig());
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java
new file mode 100644
index 000000000000..48a50276cd1f
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoder.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+
+/** {@link Coder} for {@link ProducerRecord}. */
+public class ProducerRecordCoder<K, V> extends 
StructuredCoder<ProducerRecord<K, V>> {
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  private static final VarLongCoder longCoder = VarLongCoder.of();
+  private static final VarIntCoder intCoder = VarIntCoder.of();
+  private static final IterableCoder headerCoder =
+      IterableCoder.of(KvCoder.of(stringCoder, ByteArrayCoder.of()));
+
+  private final KvCoder<K, V> kvCoder;
+
+  public static <K, V> ProducerRecordCoder<K, V> of(Coder<K> keyCoder, 
Coder<V> valueCoder) {
+    return new ProducerRecordCoder<>(keyCoder, valueCoder);
+  }
+
+  public ProducerRecordCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+    this.kvCoder = KvCoder.of(keyCoder, valueCoder);
+  }
+
+  @Override
+  public void encode(ProducerRecord<K, V> value, OutputStream outStream) 
throws IOException {
+    stringCoder.encode(value.topic(), outStream);
+    intCoder.encode(value.partition() != null ? value.partition() : -1, 
outStream);
+    longCoder.encode(value.timestamp() != null ? value.timestamp() : 
Long.MAX_VALUE, outStream);
+    headerCoder.encode(toIterable(value), outStream);
+    kvCoder.encode(KV.of(value.key(), value.value()), outStream);
+  }
+
+  @Override
+  public ProducerRecord<K, V> decode(InputStream inStream) throws IOException {
+    String topic = stringCoder.decode(inStream);
+    Integer partition = intCoder.decode(inStream);
+    if (partition == -1) {
+      partition = null;
+    }
+
+    Long timestamp = longCoder.decode(inStream);
+    if (timestamp == Long.MAX_VALUE) {
+      timestamp = null;
+    }
+
+    Headers headers = (Headers) toHeaders(headerCoder.decode(inStream));
+    KV<K, V> kv = kvCoder.decode(inStream);
+    if (ConsumerSpEL.hasHeaders) {
+      return new ProducerRecord<>(topic, partition, timestamp, kv.getKey(), 
kv.getValue(), headers);
+    }
+    return new ProducerRecord<>(topic, partition, timestamp, kv.getKey(), 
kv.getValue());
+  }
+
+  private Object toHeaders(Iterable<KV<String, byte[]>> records) {
+    if (!ConsumerSpEL.hasHeaders) {
+      return null;
+    }
+
+    // ConsumerRecord is used to simply create a list of headers
+    ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>("", 
0, 0L, "", "");
+    records.forEach(kv -> consumerRecord.headers().add(kv.getKey(), 
kv.getValue()));
+    return consumerRecord.headers();
+  }
+
+  private Iterable<KV<String, byte[]>> toIterable(ProducerRecord record) {
+    if (!ConsumerSpEL.hasHeaders) {
+      return Collections.emptyList();
+    }
+    List<KV<String, byte[]>> vals = new ArrayList<>();
+    for (Header header : record.headers()) {
+      vals.add(KV.of(header.key(), header.value()));
+    }
+    return vals;
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return kvCoder.getCoderArguments();
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    kvCoder.verifyDeterministic();
+  }
+
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(ProducerRecord<K, V> value) {
+    return kvCoder.isRegisterByteSizeObserverCheap(KV.of(value.key(), 
value.value()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object structuralValue(ProducerRecord<K, V> value) {
+    if (consistentWithEquals()) {
+      return value;
+    } else {
+      return new ProducerRecord<>(
+          value.topic(),
+          value.partition(),
+          value.timestamp(),
+          value.key(),
+          value.value(),
+          value.headers());
+    }
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return kvCoder.consistentWithEquals();
+  }
+}
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 7d0e766e67e6..1bc5d6705026 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -64,6 +64,7 @@
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -1065,6 +1066,55 @@ public void testValuesSink() throws Exception {
     }
   }
 
+  @Test
+  public void testRecordsSink() throws Exception {
+    // Simply read from kafka source and write to kafka sink using 
ProducerRecord transform. Then
+    // verify the records are correctly published to mock kafka producer.
+
+    int numElements = 1000;
+
+    try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
+
+      ProducerSendCompletionThread completionThread =
+          new 
ProducerSendCompletionThread(producerWrapper.mockProducer).start();
+
+      String topic = "test";
+
+      p.apply(mkKafkaReadTransform(numElements, new 
ValueAsTimestampFn()).withoutMetadata())
+          .apply(ParDo.of(new KV2ProducerRecord(topic)))
+          .setCoder(ProducerRecordCoder.of(VarIntCoder.of(), 
VarLongCoder.of()))
+          .apply(
+              KafkaIO.<Integer, Long>writeRecords()
+                  .withBootstrapServers("none")
+                  .withTopic(topic)
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(LongSerializer.class)
+                  .withInputTimestamp()
+                  .withProducerFactoryFn(new 
ProducerFactoryFn(producerWrapper.producerKey)));
+
+      p.run();
+
+      completionThread.shutdown();
+
+      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false, true);
+    }
+  }
+
+  private static class KV2ProducerRecord
+      extends DoFn<KV<Integer, Long>, ProducerRecord<Integer, Long>> {
+    final String topic;
+
+    KV2ProducerRecord(String topic) {
+      this.topic = topic;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      KV<Integer, Long> kv = ctx.element();
+      ctx.output(new ProducerRecord<>(topic, kv.getKey(), kv.getValue()));
+    }
+  }
+
   @Test
   public void testExactlyOnceSink() {
     // testSink() with EOS enabled.
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoderTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoderTest.java
new file mode 100644
index 000000000000..760d0f92d0a0
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ProducerRecordCoderTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ProducerRecordCoder}. */
+@RunWith(JUnit4.class)
+public class ProducerRecordCoderTest {
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(
+        ProducerRecordCoder.of(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
+  public void testProducerRecordSerializableWithHeaders() throws IOException {
+    RecordHeaders headers = new RecordHeaders();
+    headers.add("headerKey", "headerVal".getBytes(StandardCharsets.UTF_8));
+    verifySerialization(headers, 0, System.currentTimeMillis());
+  }
+
+  @Test
+  public void testProducerRecordSerializableWithoutHeaders() throws 
IOException {
+    ConsumerRecord consumerRecord = new ConsumerRecord<>("", 0, 0L, "", "");
+    verifySerialization(consumerRecord.headers(), 0, 
System.currentTimeMillis());
+  }
+
+  @Test
+  public void testProducerRecordSerializableWithPartition() throws IOException 
{
+    ProducerRecord<String, String> decodedRecord =
+        verifySerialization(1, System.currentTimeMillis());
+    assertEquals(1, decodedRecord.partition().intValue());
+  }
+
+  @Test
+  public void testProducerRecordSerializableWithoutPartition() throws 
IOException {
+    ProducerRecord<String, String> decodedRecord =
+        verifySerialization(null, System.currentTimeMillis());
+    assertNull(decodedRecord.partition());
+  }
+
+  @Test
+  public void testProducerRecordSerializableWithTimestamp() throws IOException 
{
+    long timestamp = System.currentTimeMillis();
+    ProducerRecord<String, String> decodedRecord = verifySerialization(1, 
timestamp);
+    assertEquals(timestamp, decodedRecord.timestamp().longValue());
+  }
+
+  @Test
+  public void testProducerRecordSerializableWithoutTimestamp() throws 
IOException {
+    ProducerRecord<String, String> decodedRecord = verifySerialization(1, 
null);
+    assertNull(decodedRecord.timestamp());
+  }
+
+  private ProducerRecord<String, String> verifySerialization(Integer 
partition, Long timestamp)
+      throws IOException {
+    return verifySerialization(null, partition, timestamp);
+  }
+
+  private ProducerRecord<String, String> verifySerialization(
+      Headers headers, Integer partition, Long timestamp) throws IOException {
+    ProducerRecord<String, String> producerRecord =
+        new ProducerRecord<>("topic", partition, timestamp, "key", "value", 
headers);
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    ProducerRecordCoder producerRecordCoder =
+        ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
+
+    producerRecordCoder.encode(producerRecord, outputStream);
+    ProducerRecord<String, String> decodedRecord =
+        producerRecordCoder.decode(new 
ByteArrayInputStream(outputStream.toByteArray()));
+
+    assertEquals(producerRecord, decodedRecord);
+
+    return decodedRecord;
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 168750)
    Time Spent: 3h 20m  (was: 3h 10m)

> KafkaIO: add writing support with ProducerRecord
> ------------------------------------------------
>
>                 Key: BEAM-6063
>                 URL: https://issues.apache.org/jira/browse/BEAM-6063
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Alexey Romanenko
>            Assignee: Alexey Romanenko
>            Priority: Major
>             Fix For: 2.10.0
>
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Currently, the default input collection for {{KafkaIO.Write}} is 
> {{PCollection<KV<K,V>>}}. To support writing of Kafka headers or different 
> output Kafka topics, we need to change type of input collection to 
> {{PCollection<ProducerRecord<K,V>>}}. Also, it will make sense to use 
> {{ProducerRecord<K,V>}} instead of {{KV<K,V>}} internally in {{KafkaIO}} to 
> keep all meta information.
> In the same time, we need to keep compatibility for old interface based on 
> {{KV<K,V>}} but make it deprecated and totally move to 
> {{ProducerRecord<K,V>}} later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to