This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 98f4596  [GOBBLIN-820] Add keyed write capability to Kafka writer
98f4596 is described below

commit 98f45965b86230265bd30578c887ad4a7dee5c78
Author: Shirshanka Das <[email protected]>
AuthorDate: Tue Jul 9 14:11:39 2019 -0700

    [GOBBLIN-820] Add keyed write capability to Kafka writer
    
    Closes #2682 from shirshanka/kill-pusher
---
 .../configuration/ConfigurationException.java      |  20 ++--
 .../java/org/apache/gobblin/writer/DataWriter.java |   7 +-
 .../gobblin/types/AvroGenericRecordTypeMapper.java |  30 +++---
 .../gobblin/types/FieldMappingException.java       |  19 +---
 .../java/org/apache/gobblin/types/TypeMapper.java  |  26 +++--
 .../types/AvroGenericRecordTypeMapperTest.java     |  26 ++---
 gobblin-docs/index.md                              |   2 +-
 gobblin-docs/sinks/Kafka.md                        |   6 +-
 .../gobblin/kafka/writer/Kafka08DataWriter.java    |  39 +++++---
 .../kafka/writer/KafkaDataWriterBuilder.java       |   6 +-
 .../kafka/writer/Kafka08DataWriterTest.java        |   6 +-
 .../kafka/writer/Kafka08DataWriterUnitTest.java    |   2 +-
 .../gobblin/kafka/writer/Kafka09DataWriter.java    |  42 +++++---
 .../writer/Kafka09JsonObjectWriterBuilder.java     |   4 +-
 .../kafka/writer/KafkaDataWriterBuilder.java       |   6 +-
 .../kafka/writer/Kafka09DataWriterTest.java        | 109 ++++++++++++++++++---
 .../kafka/writer/Kafka09TopicProvisionTest.java    |   4 +-
 .../writer/AbstractKafkaDataWriterBuilder.java     |   3 +-
 .../kafka/writer/KafkaWriterCommonConfig.java      |  61 ++++++++++++
 .../kafka/writer/KafkaWriterConfigurationKeys.java |  12 +++
 .../gobblin/kafka/writer/KafkaWriterHelper.java    |  18 ++++
 .../kafka/writer/KafkaWriterCommonConfigTest.java  |  99 +++++++++++++++++++
 .../gobblin/kafka/writer/TestTypeMapper.java}      |  17 ++--
 23 files changed, 430 insertions(+), 134 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationException.java
similarity index 61%
copy from 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to 
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationException.java
index 7adbba1..cb6e7bf 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationException.java
@@ -14,23 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.gobblin.configuration;
 
-package org.apache.gobblin.kafka.writer;
+import java.io.IOException;
 
-import java.util.Properties;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+public class ConfigurationException extends IOException {
 
-import org.apache.gobblin.writer.AsyncDataWriter;
-
-/**
- * Builder that hands back a {@link Kafka08DataWriter}
- */
-public class KafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
-
-  @Override
-  protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties 
props) {
-    return new Kafka08DataWriter<>(props);
+  public ConfigurationException(String message, Exception e) {
+    super(message, e);
   }
+
 }
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java 
b/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
index 400dcdf..bdfb5f0 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.stream.RecordEnvelope;
 public interface DataWriter<D> extends Closeable, Flushable {
 
   /**
-   * Write a source data record in Avro format using the given converter.
+   * Write a data record.
    *
    * @param record data record to write
    * @throws IOException if there is anything wrong writing the record
@@ -52,7 +52,7 @@ public interface DataWriter<D> extends Closeable, Flushable {
 
   /**
    * Commit the data written.
-   *
+   * This method is expected to be called at most once during the lifetime of 
a writer.
    * @throws IOException if there is anything wrong committing the output
    */
   public void commit()
@@ -99,6 +99,8 @@ public interface DataWriter<D> extends Closeable, Flushable {
 
   /**
    * Write the input {@link RecordEnvelope}. By default, just call {@link 
#write(Object)}.
+   * DataWriters that implement this method must acknowledge the 
recordEnvelope once the write has been acknowledged
+   * by the destination system.
    */
   default void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws 
IOException {
     write(recordEnvelope.getRecord());
@@ -115,6 +117,7 @@ public interface DataWriter<D> extends Closeable, Flushable 
{
 
   /**
    * Flush data written by the writer. By default, does nothing.
+   * This method is expected to be called multiple times during the lifetime 
of a writer.
    * @throws IOException
    */
   default void flush() throws IOException {
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/types/AvroGenericRecordTypeMapper.java
similarity index 57%
copy from 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to 
gobblin-core-base/src/main/java/org/apache/gobblin/types/AvroGenericRecordTypeMapper.java
index 7adbba1..b72b76a 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/types/AvroGenericRecordTypeMapper.java
@@ -15,22 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.kafka.writer;
+package org.apache.gobblin.types;
 
-import java.util.Properties;
-
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
-import org.apache.gobblin.writer.AsyncDataWriter;
+import lombok.extern.slf4j.Slf4j;
 
-/**
- * Builder that hands back a {@link Kafka08DataWriter}
- */
-public class KafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+
+@Slf4j
+public class AvroGenericRecordTypeMapper implements TypeMapper<GenericRecord> {
 
   @Override
-  protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties 
props) {
-    return new Kafka08DataWriter<>(props);
+  public Object getField(GenericRecord record, String fieldPath) throws 
FieldMappingException {
+    if (fieldPath.equals(FIELD_PATH_ALL)) {
+      return record;
+    }
+
+    Object field = record;
+    try {
+      for (String part: fieldPath.split("\\.")) {
+        field = ((GenericRecord) field).get(part);
+      }
+    } catch (Exception e) {
+      throw new FieldMappingException("Failed to retrieve fieldPath " + 
fieldPath + " from record " + record.toString(), e);
+    }
+    return field;
   }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/types/FieldMappingException.java
similarity index 61%
copy from 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to 
gobblin-core-base/src/main/java/org/apache/gobblin/types/FieldMappingException.java
index 7adbba1..91b22a6 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/types/FieldMappingException.java
@@ -15,22 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.kafka.writer;
+package org.apache.gobblin.types;
 
-import java.util.Properties;
+public class FieldMappingException extends Exception {
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-
-import org.apache.gobblin.writer.AsyncDataWriter;
-
-/**
- * Builder that hands back a {@link Kafka08DataWriter}
- */
-public class KafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
-
-  @Override
-  protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties 
props) {
-    return new Kafka08DataWriter<>(props);
+  public FieldMappingException(String message, Exception e) {
+    super(message, e);
   }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
 b/gobblin-core-base/src/main/java/org/apache/gobblin/types/TypeMapper.java
similarity index 63%
copy from 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to gobblin-core-base/src/main/java/org/apache/gobblin/types/TypeMapper.java
index 7adbba1..375781a 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/types/TypeMapper.java
@@ -15,22 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.kafka.writer;
+package org.apache.gobblin.types;
 
-import java.util.Properties;
+import java.io.Closeable;
+import java.io.IOException;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import com.typesafe.config.Config;
 
-import org.apache.gobblin.writer.AsyncDataWriter;
 
 /**
- * Builder that hands back a {@link Kafka08DataWriter}
+ * An interface that allows Gobblin constructs to introspect a type T
+ * @param <T>
  */
-public class KafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+public interface TypeMapper<T> extends Closeable {
+  String FIELD_PATH_ALL = "*";
+
+  default void configure(Config config) {
+
+  }
+
+  Object getField(T record, String fieldPath)
+      throws FieldMappingException;
 
   @Override
-  protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties 
props) {
-    return new Kafka08DataWriter<>(props);
+  default void close()
+      throws IOException {
   }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
 
b/gobblin-core-base/src/test/java/org/apache/gobblin/types/AvroGenericRecordTypeMapperTest.java
similarity index 62%
copy from 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to 
gobblin-core-base/src/test/java/org/apache/gobblin/types/AvroGenericRecordTypeMapperTest.java
index 7adbba1..5b1703f 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ 
b/gobblin-core-base/src/test/java/org/apache/gobblin/types/AvroGenericRecordTypeMapperTest.java
@@ -14,23 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.gobblin.types;
 
-package org.apache.gobblin.kafka.writer;
+import org.apache.avro.generic.GenericRecord;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
-import java.util.Properties;
+import org.apache.gobblin.test.TestUtils;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 
-import org.apache.gobblin.writer.AsyncDataWriter;
+public class AvroGenericRecordTypeMapperTest {
 
-/**
- * Builder that hands back a {@link Kafka08DataWriter}
- */
-public class KafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+  @Test
+  public void test()
+      throws FieldMappingException {
+    GenericRecord record = TestUtils.generateRandomAvroRecord();
+    AvroGenericRecordTypeMapper typeMapper = new AvroGenericRecordTypeMapper();
+    Object field = typeMapper.getField(record, "field1");
+    Assert.assertEquals(field.getClass(), String.class);
 
-  @Override
-  protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties 
props) {
-    return new Kafka08DataWriter<>(props);
   }
+
 }
diff --git a/gobblin-docs/index.md b/gobblin-docs/index.md
index f116188..79a71c5 100644
--- a/gobblin-docs/index.md
+++ b/gobblin-docs/index.md
@@ -3,7 +3,7 @@
 </p>
 
 Over the years, LinkedIn's data infrastructure team built custom solutions for 
ingesting diverse data entities into our Hadoop eco-system. At one point, we 
were running 15 types of ingestion pipelines which created significant data 
quality, metadata management, development, and operation challenges.
-
+ 
 Our experiences and challenges motivated us to build _Gobblin_. Gobblin is a 
universal data ingestion framework for extracting, transforming, and loading 
large volume of data from a variety of data sources, e.g., databases, rest 
APIs, FTP/SFTP servers, filers, etc., onto Hadoop. Gobblin handles the common 
routine tasks required for all data ingestion ETLs, including job/task 
scheduling, task partitioning, error handling, state management, data quality 
checking, data publishing, etc. Gobb [...]
 
 You can find a lot of useful resources in our wiki pages, including [how to 
get started with Gobblin](Getting-Started), an [architecture overview of 
Gobblin](Gobblin-Architecture), and
diff --git a/gobblin-docs/sinks/Kafka.md b/gobblin-docs/sinks/Kafka.md
index 10c243f..89936e5 100644
--- a/gobblin-docs/sinks/Kafka.md
+++ b/gobblin-docs/sinks/Kafka.md
@@ -57,7 +57,7 @@ INFO  [TaskExecutor-0] gobblin.kafka.writer.KafkaDataWriter  
211 - Successfully
 
 #Configuration Details
 
-The Kafka writer supports all the configuration parameters supported by the 
[0.8.2 Java Kafka 
Producer](http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html).
 All you have to do is prefix `writer.kafka.producerConfig.` to each 
configuration property that the producer supports. For example, if you want to 
set the `acks` parameter to `all` to ensure full acknowledgement of writes, you 
would set `writer.kafka.producerConfig.acks=all` in your pu [...]
+At this time, Gobblin supports integration with Kafka 0.8 and 0.9. The Kafka 
writer supports all the configuration parameters supported by the 
version-specific Kafka Producer (e.g. [Latest Producer 
Configs](https://kafka.apache.org/documentation/#producerconfigs)). All you 
have to do is prefix `writer.kafka.producerConfig.` to each configuration 
property that the producer supports. For example, if you want to set the `acks` 
parameter to `all` to ensure full acknowledgement of writes, you [...]
 
 There are a few key parameters at the Gobblin level that control the behavior 
of the data writer. 
 
@@ -66,6 +66,10 @@ There are a few key parameters at the Gobblin level that 
control the behavior of
 | `writer.kafka.topic` | The topic that the writer will be writing to. At this 
time, the writer can only write to a single topic per pipeline. | 
 | `writer.kafka.failureAllowancePercentage` | The percentage of failures that 
you are willing to tolerate while writing to Kafka. Gobblin will mark the 
workunit successful and move on if there are failures but not enough to trip 
the failure threshold. Only successfully acknowledged writes are counted as 
successful, all others are considered as failures. The default for the 
failureAllowancePercentage is set to 20.0. This means that as long as 80% of 
the data is acknowledged by Kafka, Gobb [...]
 | `writer.kafka.commitTimeoutMillis` | The amount of time that the Gobblin 
committer will wait before abandoning its wait for unacknowledged writes. This 
defaults to 1 minute. | 
+| `writer.kafka.keyed` | When set to true, enables key-based writes to Kafka. 
This defaults to false. If you set this to true, make sure to set the keyField 
configuration property. Serialization of the key is controlled by the Kafka 
Producer specific configuration property 
(`writer.kafka.producerConfig.key.serializer`) |
+| `writer.kafka.keyField` | The field of the record to use as the key for 
writing to Kafka. The field path follows a nested notation. So a top-level 
field "name" would be set to "name", a nested field "name" within a top-level 
struct "header" would be named "header.name" | 
+| `writer.kafka.typeMapperClass` | The class that the writer should use to 
extract keys and values from the input record. The default if not specified 
assumes that AvroGenericRecordTypeMapper is being used | 
+| `writer.kafka.valueField` | The field of the record to use as the value for 
writing to Kafka. Defaults to "*" which indicates that the entire record should 
be written. For nested records such as a pair of key, value, one would set the 
value of this configuration to the field-name for the value structure. | 
 
 #What Next?
 
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
index 8df03c3..fc82270 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -33,6 +34,7 @@ import com.typesafe.config.ConfigFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationException;
 import org.apache.gobblin.writer.AsyncDataWriter;
 import org.apache.gobblin.writer.WriteCallback;
 import org.apache.gobblin.writer.WriteResponse;
@@ -47,7 +49,7 @@ import org.apache.gobblin.writer.WriteResponseMapper;
  *
  */
 @Slf4j
-public class Kafka08DataWriter<D> implements AsyncDataWriter<D> {
+public class Kafka08DataWriter<K,V> implements AsyncDataWriter<V> {
 
   private static final WriteResponseMapper<RecordMetadata> 
WRITE_RESPONSE_WRAPPER =
       new WriteResponseMapper<RecordMetadata>() {
@@ -74,8 +76,9 @@ public class Kafka08DataWriter<D> implements 
AsyncDataWriter<D> {
         }
       };
 
-  private final Producer<String, D> producer;
+  private final Producer<K, V> producer;
   private final String topic;
+  private final KafkaWriterCommonConfig commonConfig;
 
   public static Producer getKafkaProducer(Properties props)
   {
@@ -90,14 +93,16 @@ public class Kafka08DataWriter<D> implements 
AsyncDataWriter<D> {
     }
   }
 
-  public Kafka08DataWriter(Properties props) {
+  public Kafka08DataWriter(Properties props)
+      throws ConfigurationException {
     this(getKafkaProducer(props), ConfigFactory.parseProperties(props));
   }
 
   public Kafka08DataWriter(Producer producer, Config config)
-  {
+      throws ConfigurationException {
     this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC);
     this.producer = producer;
+    this.commonConfig = new KafkaWriterCommonConfig(config);
   }
 
   @Override
@@ -110,17 +115,21 @@ public class Kafka08DataWriter<D> implements 
AsyncDataWriter<D> {
 
 
   @Override
-  public Future<WriteResponse> write(final D record, final WriteCallback 
callback) {
-    return new WriteResponseFuture<>(this.producer.send(new 
ProducerRecord<String, D>(topic, record), new Callback() {
-      @Override
-      public void onCompletion(final RecordMetadata metadata, Exception 
exception) {
-        if (exception != null) {
-          callback.onFailure(exception);
-        } else {
-          callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
-        }
-      }
-    }), WRITE_RESPONSE_WRAPPER);
+  public Future<WriteResponse> write(final V record, final WriteCallback 
callback) {
+    try {
+      Pair<K, V> kvPair = KafkaWriterHelper.getKeyValuePair(record, 
commonConfig);
+      return new WriteResponseFuture<>(this.producer.send(new 
ProducerRecord<>(topic, kvPair.getKey(), kvPair.getValue()),
+          (metadata, exception) -> {
+            if (exception != null) {
+              callback.onFailure(exception);
+            } else {
+              callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
+            }
+          }), WRITE_RESPONSE_WRAPPER);
+    }
+    catch (Exception e) {
+      throw new RuntimeException("Failed to generate write request", e);
+    }
   }
 
   @Override
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
index 7adbba1..589925a 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
@@ -22,15 +22,17 @@ import java.util.Properties;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
+import org.apache.gobblin.configuration.ConfigurationException;
 import org.apache.gobblin.writer.AsyncDataWriter;
 
 /**
  * Builder that hands back a {@link Kafka08DataWriter}
  */
-public class KafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+public class KafkaDataWriterBuilder<S,D> extends 
AbstractKafkaDataWriterBuilder<S, D> {
 
   @Override
-  protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties 
props) {
+  protected AsyncDataWriter<D> getAsyncDataWriter(Properties props)
+      throws ConfigurationException {
     return new Kafka08DataWriter<>(props);
   }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterTest.java
 
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterTest.java
index c7ac96f..3d3387c 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterTest.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterTest.java
@@ -80,7 +80,7 @@ public class Kafka08DataWriterTest {
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
 "localhost:" + _kafkaTestHelper.getKafkaServerPort());
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
 "org.apache.kafka.common.serialization.StringSerializer");
-    Kafka08DataWriter<String> kafka08DataWriter = new 
Kafka08DataWriter<String>(props);
+    Kafka08DataWriter kafka08DataWriter = new Kafka08DataWriter(props);
     String messageString = "foobar";
     WriteCallback callback = mock(WriteCallback.class);
 
@@ -109,7 +109,7 @@ public class Kafka08DataWriterTest {
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
 "localhost:" + _kafkaTestHelper.getKafkaServerPort());
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
 "org.apache.kafka.common.serialization.ByteArraySerializer");
-    Kafka08DataWriter<byte[]> kafka08DataWriter = new 
Kafka08DataWriter<byte[]>(props);
+    Kafka08DataWriter kafka08DataWriter = new Kafka08DataWriter(props);
     WriteCallback callback = mock(WriteCallback.class);
 
     byte[] messageBytes = TestUtils.generateRandomBytes();
@@ -145,7 +145,7 @@ public class Kafka08DataWriterTest {
         + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
         ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
 
-    Kafka08DataWriter<GenericRecord> kafka08DataWriter = new 
Kafka08DataWriter<>(props);
+    Kafka08DataWriter kafka08DataWriter = new Kafka08DataWriter<>(props);
     WriteCallback callback = mock(WriteCallback.class);
 
     GenericRecord record = TestUtils.generateRandomAvroRecord();
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterUnitTest.java
 
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterUnitTest.java
index c31f178..901a3e9 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterUnitTest.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterUnitTest.java
@@ -41,7 +41,7 @@ public class Kafka08DataWriterUnitTest {
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
"bootstrap.servers", "localhost:9092");
 
     try {
-      Kafka08DataWriter<GenericRecord> kafkaWriter = new 
Kafka08DataWriter<>(props);
+      Kafka08DataWriter kafkaWriter = new Kafka08DataWriter<>(props);
     }
     catch (Exception e)
     {
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
index 52e5df2..68d8f51 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -39,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 
+import org.apache.gobblin.configuration.ConfigurationException;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AsyncDataWriter;
 import org.apache.gobblin.writer.WriteCallback;
@@ -54,7 +56,7 @@ import org.apache.gobblin.writer.WriteResponseMapper;
  *
  */
 @Slf4j
-public class Kafka09DataWriter<D> implements AsyncDataWriter<D> {
+public class Kafka09DataWriter<K, V> implements AsyncDataWriter<V> {
 
   
   private static final WriteResponseMapper<RecordMetadata> 
WRITE_RESPONSE_WRAPPER =
@@ -81,8 +83,9 @@ public class Kafka09DataWriter<D> implements 
AsyncDataWriter<D> {
         }
       };
 
-  private final Producer<String, D> producer;
+  private final Producer<K, V> producer;
   private final String topic;
+  private final KafkaWriterCommonConfig commonConfig;
 
   public static Producer getKafkaProducer(Properties props) {
     Object producerObject = KafkaWriterHelper.getKafkaProducer(props);
@@ -96,14 +99,17 @@ public class Kafka09DataWriter<D> implements 
AsyncDataWriter<D> {
     }
   }
 
-  public Kafka09DataWriter(Properties props) {
+  public Kafka09DataWriter(Properties props)
+      throws ConfigurationException {
     this(getKafkaProducer(props), ConfigFactory.parseProperties(props));
   }
 
-  public Kafka09DataWriter(Producer producer, Config config) {
+  public Kafka09DataWriter(Producer producer, Config config)
+      throws ConfigurationException {
     this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC);
     provisionTopic(topic,config);
     this.producer = producer;
+    this.commonConfig = new KafkaWriterCommonConfig(config);
   }
 
   @Override
@@ -114,17 +120,23 @@ public class Kafka09DataWriter<D> implements 
AsyncDataWriter<D> {
   }
 
   @Override
-  public Future<WriteResponse> write(final D record, final WriteCallback 
callback) {
-    return new WriteResponseFuture<>(this.producer.send(new 
ProducerRecord<String, D>(topic, record), new Callback() {
-      @Override
-      public void onCompletion(final RecordMetadata metadata, Exception 
exception) {
-        if (exception != null) {
-          callback.onFailure(exception);
-        } else {
-          callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
-        }
-      }
-    }), WRITE_RESPONSE_WRAPPER);
+  public Future<WriteResponse> write(final V record, final WriteCallback 
callback) {
+    try {
+      Pair<K, V> keyValuePair = KafkaWriterHelper.getKeyValuePair(record, 
this.commonConfig);
+      return new WriteResponseFuture<>(this.producer
+          .send(new ProducerRecord<>(topic, keyValuePair.getKey(), 
keyValuePair.getValue()), new Callback() {
+            @Override
+            public void onCompletion(final RecordMetadata metadata, Exception 
exception) {
+              if (exception != null) {
+                callback.onFailure(exception);
+              } else {
+                callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
+              }
+            }
+          }), WRITE_RESPONSE_WRAPPER);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create a Kafka write request", e);
+    }
   }
 
   @Override
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
index c97c486..4857256 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.kafka.writer;
 
 import java.util.Properties;
 
+import org.apache.gobblin.configuration.ConfigurationException;
 import org.apache.gobblin.kafka.serialize.GsonSerializerBase;
 import org.apache.gobblin.writer.AsyncDataWriter;
 import org.apache.kafka.common.serialization.Serializer;
@@ -36,7 +37,8 @@ public class Kafka09JsonObjectWriterBuilder extends 
AbstractKafkaDataWriterBuild
       KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG;
 
   @Override
-  protected AsyncDataWriter<JsonObject> getAsyncDataWriter(Properties props) {
+  protected AsyncDataWriter<JsonObject> getAsyncDataWriter(Properties props)
+      throws ConfigurationException {
     props.setProperty(VALUE_SERIALIZER_KEY, 
KafkaGsonObjectSerializer.class.getName());
     return new Kafka09DataWriter<>(props);
   }
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
index 8869c43..33a6933 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
@@ -22,15 +22,17 @@ import java.util.Properties;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
+import org.apache.gobblin.configuration.ConfigurationException;
 import org.apache.gobblin.writer.AsyncDataWriter;
 
 
 /**
  * Builder that hands back a {@link Kafka09DataWriter}
  */
-public class KafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+public class KafkaDataWriterBuilder<S, D> extends 
AbstractKafkaDataWriterBuilder<S, D> {
   @Override
-  protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties 
props) {
+  protected AsyncDataWriter<D> getAsyncDataWriter(Properties props)
+      throws ConfigurationException {
     return new Kafka09DataWriter<>(props);
   }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
index 45b865d..58cfcb9 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
@@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.avro.generic.GenericRecord;
 import org.testng.Assert;
@@ -30,7 +29,7 @@ import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
-import kafka.security.auth.Write;
+import kafka.message.MessageAndMetadata;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.kafka.KafkaTestBase;
@@ -73,8 +72,7 @@ public class Kafka09DataWriterTest {
       throws IOException {
     try {
       _kafkaTestHelper.stopClients();
-    }
-    finally {
+    } finally {
       _kafkaTestHelper.stopServers();
     }
   }
@@ -88,7 +86,7 @@ public class Kafka09DataWriterTest {
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
 "localhost:" + _kafkaTestHelper.getKafkaServerPort());
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
 "org.apache.kafka.common.serialization.StringSerializer");
-    Kafka09DataWriter<String> kafka09DataWriter = new 
Kafka09DataWriter<String>(props);
+    Kafka09DataWriter<String, String> kafka09DataWriter = new 
Kafka09DataWriter<>(props);
     String messageString = "foobar";
     WriteCallback callback = mock(WriteCallback.class);
     Future<WriteResponse> future;
@@ -103,9 +101,7 @@ public class Kafka09DataWriterTest {
       byte[] message = 
_kafkaTestHelper.getIteratorForTopic(topic).next().message();
       String messageReceived = new String(message);
       Assert.assertEquals(messageReceived, messageString);
-    }
-    finally
-    {
+    } finally {
       kafka09DataWriter.close();
     }
 
@@ -121,15 +117,13 @@ public class Kafka09DataWriterTest {
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
 "localhost:" + _kafkaTestHelper.getKafkaServerPort());
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
 "org.apache.kafka.common.serialization.ByteArraySerializer");
-    Kafka09DataWriter<byte[]> kafka09DataWriter = new 
Kafka09DataWriter<byte[]>(props);
+    Kafka09DataWriter<String, byte[]> kafka09DataWriter = new 
Kafka09DataWriter<>(props);
     WriteCallback callback = mock(WriteCallback.class);
     byte[] messageBytes = TestUtils.generateRandomBytes();
 
     try {
       kafka09DataWriter.write(messageBytes, callback);
-    }
-    finally
-    {
+    } finally {
       kafka09DataWriter.close();
     }
 
@@ -157,15 +151,13 @@ public class Kafka09DataWriterTest {
         + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
         ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
 
-    Kafka09DataWriter<GenericRecord> kafka09DataWriter = new 
Kafka09DataWriter<>(props);
+    Kafka09DataWriter<String, GenericRecord> kafka09DataWriter = new 
Kafka09DataWriter<>(props);
     WriteCallback callback = mock(WriteCallback.class);
 
     GenericRecord record = TestUtils.generateRandomAvroRecord();
     try {
       kafka09DataWriter.write(record, callback);
-    }
-    finally
-    {
+    } finally {
       kafka09DataWriter.close();
     }
 
@@ -180,5 +172,90 @@ public class Kafka09DataWriterTest {
   }
 
 
+  @Test
+  public void testKeyedAvroSerialization()
+      throws IOException, InterruptedException, SchemaRegistryException {
+    String topic = "testAvroSerialization09";
+    _kafkaTestHelper.provisionTopic(topic);
+    Properties props = new Properties();
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
"bootstrap.servers",
+        "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
"value.serializer",
+        LiAvroSerializer.class.getName());
+    props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, 
"true");
+    String keyField = "field1";
+    
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG, 
keyField);
+
+
+    // set up mock schema registry
+
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+            + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+        ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+    Kafka09DataWriter<String, GenericRecord> kafka09DataWriter = new 
Kafka09DataWriter<>(props);
+    WriteCallback callback = mock(WriteCallback.class);
+
+    GenericRecord record = TestUtils.generateRandomAvroRecord();
+    try {
+      kafka09DataWriter.write(record, callback);
+    } finally {
+      kafka09DataWriter.close();
+    }
+
+    verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+    verify(callback, never()).onFailure(isA(Exception.class));
+    MessageAndMetadata<byte[], byte[]> value = 
_kafkaTestHelper.getIteratorForTopic(topic).next();
+    byte[] key = value.key();
+    byte[] message = value.message();
+    ConfigDrivenMd5SchemaRegistry schemaReg = new 
ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
+    LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);
+    GenericRecord receivedRecord = deser.deserialize(topic, message);
+    Assert.assertEquals(record.toString(), receivedRecord.toString());
+    Assert.assertEquals(new String(key), record.get(keyField));
+  }
+
+  @Test
+  public void testValueSerialization()
+      throws IOException, InterruptedException, SchemaRegistryException {
+    String topic = "testAvroSerialization09";
+    _kafkaTestHelper.provisionTopic(topic);
+    Properties props = new Properties();
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
"bootstrap.servers",
+        "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
"value.serializer",
+    "org.apache.kafka.common.serialization.StringSerializer");
+    props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, 
"true");
+    String keyField = "field1";
+    
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG, 
keyField);
+    
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_VALUEFIELD_CONFIG, 
keyField);
+
+
+    // set up mock schema registry
+
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+            + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+        ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+    Kafka09DataWriter<String, GenericRecord> kafka09DataWriter = new 
Kafka09DataWriter<>(props);
+    WriteCallback callback = mock(WriteCallback.class);
+
+    GenericRecord record = TestUtils.generateRandomAvroRecord();
+    try {
+      kafka09DataWriter.write(record, callback);
+    } finally {
+      kafka09DataWriter.close();
+    }
+
+    verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+    verify(callback, never()).onFailure(isA(Exception.class));
+    MessageAndMetadata<byte[], byte[]> value = 
_kafkaTestHelper.getIteratorForTopic(topic).next();
+    byte[] key = value.key();
+    byte[] message = value.message();
+    Assert.assertEquals(new String(message), record.get(keyField));
+    Assert.assertEquals(new String(key), record.get(keyField));
+  }
 
 }
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
index 4765d1b..7b441e8 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
@@ -107,7 +107,7 @@ public class Kafka09TopicProvisionTest {
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
 _kafkaTestHelper.getBootServersList());    
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
 "org.apache.kafka.common.serialization.StringSerializer");
     
-    Kafka09DataWriter<String> kafka09DataWriter = new 
Kafka09DataWriter<String>(props);
+    Kafka09DataWriter<String, String> kafka09DataWriter = new 
Kafka09DataWriter<>(props);
     String zookeeperConnect = "localhost:"+_kafkaTestHelper.getZookeeperPort();
     int sessionTimeoutMs = 10 * 1000;
     int connectionTimeoutMs = 8 * 1000;
@@ -160,7 +160,7 @@ public class Kafka09TopicProvisionTest {
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
 liveBroker);    
     
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
 "org.apache.kafka.common.serialization.StringSerializer");
     
-    Kafka09DataWriter<String> kafka09DataWriter = new 
Kafka09DataWriter<String>(props);
+    Kafka09DataWriter<String, String> kafka09DataWriter = new 
Kafka09DataWriter<>(props);
     int sessionTimeoutMs = 10 * 1000;
     int connectionTimeoutMs = 8 * 1000;
     // Note: You must initialize the ZkClient with ZKStringSerializer.  If you 
don't, then
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
index a3db916..372dc91 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
@@ -22,6 +22,7 @@ import java.util.Properties;
 
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.configuration.ConfigurationException;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AsyncDataWriter;
@@ -35,7 +36,7 @@ import org.apache.gobblin.writer.DataWriterBuilder;
  */
 public abstract class AbstractKafkaDataWriterBuilder<S, D> extends 
DataWriterBuilder<S, D> {
 
-  protected abstract AsyncDataWriter<D> getAsyncDataWriter(Properties props);
+  protected abstract AsyncDataWriter<D> getAsyncDataWriter(Properties props) 
throws ConfigurationException;
 
   /**
    * Build a {@link DataWriter}.
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfig.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfig.java
new file mode 100644
index 0000000..feb4eae
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfig.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.writer;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.ConfigurationException;
+import org.apache.gobblin.types.TypeMapper;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.*;
+
+
+/**
+ * Version-independent configuration for Kafka Writers
+ */
+public class KafkaWriterCommonConfig {
+
+  @Getter
+  private final boolean keyed;
+  @Getter
+  private final String keyField;
+  @Getter
+  private final TypeMapper typeMapper;
+  @Getter
+  private final String valueField;
+
+  public KafkaWriterCommonConfig(Config config)
+      throws ConfigurationException {
+    try {
+      this.keyed = ConfigUtils.getBoolean(config, WRITER_KAFKA_KEYED_CONFIG, 
WRITER_KAFKA_KEYED_DEFAULT);
+      this.keyField = ConfigUtils.getString(config, 
WRITER_KAFKA_KEYFIELD_CONFIG, WRITER_KAFKA_KEYFIELD_DEFAULT);
+      String typeMapperClass = ConfigUtils.getString(config, 
WRITER_KAFKA_TYPEMAPPERCLASS_CONFIG,
+          WRITER_KAFKA_TYPEMAPPERCLASS_DEFAULT);
+      this.typeMapper = (TypeMapper) 
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(typeMapperClass));
+      this.valueField = ConfigUtils.getString(config, 
WRITER_KAFKA_VALUEFIELD_CONFIG, WRITER_KAFKA_VALUEFIELD_DEFAULT);
+      Preconditions.checkArgument(!this.keyed || (this.keyField != null),
+          "With keyed writes to Kafka, you must provide a key fieldname to be 
used");
+    } catch (Exception e) {
+      throw new ConfigurationException("Failed to configure 
KafkaWriterCommonConfig", e);
+    }
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index d1522ab..1255e52 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -17,6 +17,10 @@
 
 package org.apache.gobblin.kafka.writer;
 
+import org.apache.gobblin.types.AvroGenericRecordTypeMapper;
+import org.apache.gobblin.types.TypeMapper;
+
+
 /**
  * Configuration keys for a KafkaWriter.
  */
@@ -33,6 +37,14 @@ public class KafkaWriterConfigurationKeys {
   static final String FAILURE_ALLOWANCE_PCT_CONFIG = 
"writer.kafka.failureAllowancePercentage";
   static final double FAILURE_ALLOWANCE_PCT_DEFAULT = 20.0;
 
+  public static final String WRITER_KAFKA_KEYED_CONFIG = "writer.kafka.keyed";
+  public static final boolean WRITER_KAFKA_KEYED_DEFAULT = false;
+  public static final String WRITER_KAFKA_KEYFIELD_CONFIG = 
"writer.kafka.keyField";
+  public static final String WRITER_KAFKA_KEYFIELD_DEFAULT = null;
+  public static final String WRITER_KAFKA_TYPEMAPPERCLASS_CONFIG = 
"writer.kafka.typeMapperClass";
+  public static final String WRITER_KAFKA_TYPEMAPPERCLASS_DEFAULT = 
AvroGenericRecordTypeMapper.class.getName();
+  public static final String WRITER_KAFKA_VALUEFIELD_CONFIG = 
"writer.kafka.valueField";
+  public static final String WRITER_KAFKA_VALUEFIELD_DEFAULT = 
TypeMapper.FIELD_PATH_ALL;
 
   /**
    * Kafka producer configurations will be passed through as is as long as 
they are prefixed
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
index 1999480..a6837ae 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
@@ -21,6 +21,8 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.Properties;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 
 import com.google.common.base.Throwables;
 import com.typesafe.config.Config;
@@ -29,6 +31,8 @@ import com.typesafe.config.ConfigFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.types.FieldMappingException;
+import org.apache.gobblin.types.TypeMapper;
 import org.apache.gobblin.util.ConfigUtils;
 
 import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.*;
@@ -90,4 +94,18 @@ public class KafkaWriterHelper {
       throw Throwables.propagate(e);
     }
   }
+
+  public static <K, V> Pair<K,V> getKeyValuePair(V record, 
KafkaWriterCommonConfig commonConfig)
+      throws FieldMappingException {
+    K key = null;
+    TypeMapper typeMapper = commonConfig.getTypeMapper();
+    if (commonConfig.isKeyed()) {
+      key = (K) typeMapper.getField(record, commonConfig.getKeyField());
+    }
+    V value = record;
+    if (!commonConfig.getValueField().equals(TypeMapper.FIELD_PATH_ALL)) {
+      value = (V) typeMapper.getField(record, commonConfig.getValueField());
+    }
+    return new ImmutablePair<>(key, value);
+  }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfigTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfigTest.java
new file mode 100644
index 0000000..bcee5b7
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfigTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationException;
+import org.apache.gobblin.types.AvroGenericRecordTypeMapper;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class KafkaWriterCommonConfigTest {
+
+  @Test
+  public void testEmptyConstructor()
+      throws ConfigurationException {
+    Properties properties = new Properties();
+    Config config = ConfigUtils.propertiesToConfig(properties);
+    KafkaWriterCommonConfig kafkaWriterCommonConfig = new 
KafkaWriterCommonConfig(config);
+    Assert.assertEquals(kafkaWriterCommonConfig.isKeyed(), false);
+    Assert.assertEquals(kafkaWriterCommonConfig.getKeyField(), null);
+    
Assert.assertEquals(kafkaWriterCommonConfig.getTypeMapper().getClass().getCanonicalName(),
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    Assert.assertEquals(kafkaWriterCommonConfig.getValueField(), "*");
+  }
+
+  @Test
+  public void testKeyedConstructor() {
+    Properties properties = new Properties();
+    
properties.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, 
"true");
+    try {
+      Config config = ConfigUtils.propertiesToConfig(properties);
+      KafkaWriterCommonConfig kafkaWriterCommonConfig = new 
KafkaWriterCommonConfig(config);
+      Assert.fail("Should fail to construct with keyed writes set to true, 
without setting key field");
+    } catch (ConfigurationException ce) {
+      // Expected
+    }
+    
properties.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG,
 "key");
+    try {
+      Config config = ConfigUtils.propertiesToConfig(properties);
+      KafkaWriterCommonConfig kafkaWriterCommonConfig = new 
KafkaWriterCommonConfig(config);
+      Assert.assertEquals(kafkaWriterCommonConfig.isKeyed(), true);
+      Assert.assertEquals(kafkaWriterCommonConfig.getKeyField(), "key");
+      // Check default type mapper is AvroGenericRecord based
+      
Assert.assertEquals(kafkaWriterCommonConfig.getTypeMapper().getClass().getCanonicalName(),
+          AvroGenericRecordTypeMapper.class.getCanonicalName());
+      Assert.assertEquals(kafkaWriterCommonConfig.getValueField(), "*");
+    } catch (ConfigurationException ce) {
+      Assert.fail("Should successfully construct with keyed writes set to 
true, and with setting key field", ce);
+    }
+    
properties.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_TYPEMAPPERCLASS_CONFIG,
 TestTypeMapper.class.getCanonicalName());
+    try {
+      Config config = ConfigUtils.propertiesToConfig(properties);
+      KafkaWriterCommonConfig kafkaWriterCommonConfig = new 
KafkaWriterCommonConfig(config);
+      Assert.assertEquals(kafkaWriterCommonConfig.isKeyed(), true);
+      Assert.assertEquals(kafkaWriterCommonConfig.getKeyField(), "key");
+      
Assert.assertEquals(kafkaWriterCommonConfig.getTypeMapper().getClass().getCanonicalName(),
+          TestTypeMapper.class.getCanonicalName());
+      Assert.assertEquals(kafkaWriterCommonConfig.getValueField(), "*");
+    } catch (ConfigurationException ce) {
+      Assert.fail("Should successfully construct", ce);
+    }
+    
properties.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_VALUEFIELD_CONFIG,
 "foobar");
+    try {
+      Config config = ConfigUtils.propertiesToConfig(properties);
+      KafkaWriterCommonConfig kafkaWriterCommonConfig = new 
KafkaWriterCommonConfig(config);
+      Assert.assertEquals(kafkaWriterCommonConfig.isKeyed(), true);
+      Assert.assertEquals(kafkaWriterCommonConfig.getKeyField(), "key");
+      
Assert.assertEquals(kafkaWriterCommonConfig.getTypeMapper().getClass().getCanonicalName(),
+          TestTypeMapper.class.getCanonicalName());
+      Assert.assertEquals(kafkaWriterCommonConfig.getValueField(), "foobar");
+    }
+    catch (ConfigurationException ce) {
+      Assert.fail("Should successfully construct", ce);
+    }
+  }
+
+
+}
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/TestTypeMapper.java
similarity index 66%
copy from 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to 
gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/TestTypeMapper.java
index 7adbba1..0b7dfa7 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/TestTypeMapper.java
@@ -17,20 +17,15 @@
 
 package org.apache.gobblin.kafka.writer;
 
-import java.util.Properties;
+import org.apache.gobblin.types.FieldMappingException;
+import org.apache.gobblin.types.TypeMapper;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 
-import org.apache.gobblin.writer.AsyncDataWriter;
-
-/**
- * Builder that hands back a {@link Kafka08DataWriter}
- */
-public class KafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+public class TestTypeMapper implements TypeMapper {
 
   @Override
-  protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties 
props) {
-    return new Kafka08DataWriter<>(props);
+  public Object getField(Object record, String fieldPath)
+      throws FieldMappingException {
+    return null;
   }
 }

Reply via email to