This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 98f4596 [GOBBLIN-820] Add keyed write capability to Kafka writer
98f4596 is described below
commit 98f45965b86230265bd30578c887ad4a7dee5c78
Author: Shirshanka Das <[email protected]>
AuthorDate: Tue Jul 9 14:11:39 2019 -0700
[GOBBLIN-820] Add keyed write capability to Kafka writer
Closes #2682 from shirshanka/kill-pusher
---
.../configuration/ConfigurationException.java | 20 ++--
.../java/org/apache/gobblin/writer/DataWriter.java | 7 +-
.../gobblin/types/AvroGenericRecordTypeMapper.java | 30 +++---
.../gobblin/types/FieldMappingException.java | 19 +---
.../java/org/apache/gobblin/types/TypeMapper.java | 26 +++--
.../types/AvroGenericRecordTypeMapperTest.java | 26 ++---
gobblin-docs/index.md | 2 +-
gobblin-docs/sinks/Kafka.md | 6 +-
.../gobblin/kafka/writer/Kafka08DataWriter.java | 39 +++++---
.../kafka/writer/KafkaDataWriterBuilder.java | 6 +-
.../kafka/writer/Kafka08DataWriterTest.java | 6 +-
.../kafka/writer/Kafka08DataWriterUnitTest.java | 2 +-
.../gobblin/kafka/writer/Kafka09DataWriter.java | 42 +++++---
.../writer/Kafka09JsonObjectWriterBuilder.java | 4 +-
.../kafka/writer/KafkaDataWriterBuilder.java | 6 +-
.../kafka/writer/Kafka09DataWriterTest.java | 109 ++++++++++++++++++---
.../kafka/writer/Kafka09TopicProvisionTest.java | 4 +-
.../writer/AbstractKafkaDataWriterBuilder.java | 3 +-
.../kafka/writer/KafkaWriterCommonConfig.java | 61 ++++++++++++
.../kafka/writer/KafkaWriterConfigurationKeys.java | 12 +++
.../gobblin/kafka/writer/KafkaWriterHelper.java | 18 ++++
.../kafka/writer/KafkaWriterCommonConfigTest.java | 99 +++++++++++++++++++
.../gobblin/kafka/writer/TestTypeMapper.java} | 17 ++--
23 files changed, 430 insertions(+), 134 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationException.java
similarity index 61%
copy from
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationException.java
index 7adbba1..cb6e7bf 100644
---
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationException.java
@@ -14,23 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.gobblin.configuration;
-package org.apache.gobblin.kafka.writer;
+import java.io.IOException;
-import java.util.Properties;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+public class ConfigurationException extends IOException {
-import org.apache.gobblin.writer.AsyncDataWriter;
-
-/**
- * Builder that hands back a {@link Kafka08DataWriter}
- */
-public class KafkaDataWriterBuilder extends
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
-
- @Override
- protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties
props) {
- return new Kafka08DataWriter<>(props);
+ public ConfigurationException(String message, Exception e) {
+ super(message, e);
}
+
}
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
b/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
index 400dcdf..bdfb5f0 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.stream.RecordEnvelope;
public interface DataWriter<D> extends Closeable, Flushable {
/**
- * Write a source data record in Avro format using the given converter.
+ * Write a data record.
*
* @param record data record to write
* @throws IOException if there is anything wrong writing the record
@@ -52,7 +52,7 @@ public interface DataWriter<D> extends Closeable, Flushable {
/**
* Commit the data written.
- *
+ * This method is expected to be called at most once during the lifetime of
a writer.
* @throws IOException if there is anything wrong committing the output
*/
public void commit()
@@ -99,6 +99,8 @@ public interface DataWriter<D> extends Closeable, Flushable {
/**
* Write the input {@link RecordEnvelope}. By default, just call {@link
#write(Object)}.
+ * DataWriters that implement this method must acknowledge the
recordEnvelope once the write has been acknowledged
+ * by the destination system.
*/
default void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws
IOException {
write(recordEnvelope.getRecord());
@@ -115,6 +117,7 @@ public interface DataWriter<D> extends Closeable, Flushable
{
/**
* Flush data written by the writer. By default, does nothing.
+ * This method is expected to be called multiple times during the lifetime
of a writer.
* @throws IOException
*/
default void flush() throws IOException {
diff --git
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/types/AvroGenericRecordTypeMapper.java
similarity index 57%
copy from
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to
gobblin-core-base/src/main/java/org/apache/gobblin/types/AvroGenericRecordTypeMapper.java
index 7adbba1..b72b76a 100644
---
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/types/AvroGenericRecordTypeMapper.java
@@ -15,22 +15,30 @@
* limitations under the License.
*/
-package org.apache.gobblin.kafka.writer;
+package org.apache.gobblin.types;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.gobblin.writer.AsyncDataWriter;
+import lombok.extern.slf4j.Slf4j;
-/**
- * Builder that hands back a {@link Kafka08DataWriter}
- */
-public class KafkaDataWriterBuilder extends
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+
+@Slf4j
+public class AvroGenericRecordTypeMapper implements TypeMapper<GenericRecord> {
@Override
- protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties
props) {
- return new Kafka08DataWriter<>(props);
+ public Object getField(GenericRecord record, String fieldPath) throws
FieldMappingException {
+ if (fieldPath.equals(FIELD_PATH_ALL)) {
+ return record;
+ }
+
+ Object field = record;
+ try {
+ for (String part: fieldPath.split("\\.")) {
+ field = ((GenericRecord) field).get(part);
+ }
+ } catch (Exception e) {
+ throw new FieldMappingException("Failed to retrieve fieldPath " +
fieldPath + " from record " + record.toString(), e);
+ }
+ return field;
}
}
diff --git
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/types/FieldMappingException.java
similarity index 61%
copy from
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to
gobblin-core-base/src/main/java/org/apache/gobblin/types/FieldMappingException.java
index 7adbba1..91b22a6 100644
---
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/types/FieldMappingException.java
@@ -15,22 +15,11 @@
* limitations under the License.
*/
-package org.apache.gobblin.kafka.writer;
+package org.apache.gobblin.types;
-import java.util.Properties;
+public class FieldMappingException extends Exception {
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-
-import org.apache.gobblin.writer.AsyncDataWriter;
-
-/**
- * Builder that hands back a {@link Kafka08DataWriter}
- */
-public class KafkaDataWriterBuilder extends
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
-
- @Override
- protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties
props) {
- return new Kafka08DataWriter<>(props);
+ public FieldMappingException(String message, Exception e) {
+ super(message, e);
}
}
diff --git
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/types/TypeMapper.java
similarity index 63%
copy from
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to gobblin-core-base/src/main/java/org/apache/gobblin/types/TypeMapper.java
index 7adbba1..375781a 100644
---
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/types/TypeMapper.java
@@ -15,22 +15,30 @@
* limitations under the License.
*/
-package org.apache.gobblin.kafka.writer;
+package org.apache.gobblin.types;
-import java.util.Properties;
+import java.io.Closeable;
+import java.io.IOException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import com.typesafe.config.Config;
-import org.apache.gobblin.writer.AsyncDataWriter;
/**
- * Builder that hands back a {@link Kafka08DataWriter}
+ * An interface that allows Gobblin constructs to introspect a type T
+ * @param <T>
*/
-public class KafkaDataWriterBuilder extends
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+public interface TypeMapper<T> extends Closeable {
+ String FIELD_PATH_ALL = "*";
+
+ default void configure(Config config) {
+
+ }
+
+ Object getField(T record, String fieldPath)
+ throws FieldMappingException;
@Override
- protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties
props) {
- return new Kafka08DataWriter<>(props);
+ default void close()
+ throws IOException {
}
}
diff --git
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
b/gobblin-core-base/src/test/java/org/apache/gobblin/types/AvroGenericRecordTypeMapperTest.java
similarity index 62%
copy from
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to
gobblin-core-base/src/test/java/org/apache/gobblin/types/AvroGenericRecordTypeMapperTest.java
index 7adbba1..5b1703f 100644
---
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++
b/gobblin-core-base/src/test/java/org/apache/gobblin/types/AvroGenericRecordTypeMapperTest.java
@@ -14,23 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.gobblin.types;
-package org.apache.gobblin.kafka.writer;
+import org.apache.avro.generic.GenericRecord;
+import org.testng.Assert;
+import org.testng.annotations.Test;
-import java.util.Properties;
+import org.apache.gobblin.test.TestUtils;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.gobblin.writer.AsyncDataWriter;
+public class AvroGenericRecordTypeMapperTest {
-/**
- * Builder that hands back a {@link Kafka08DataWriter}
- */
-public class KafkaDataWriterBuilder extends
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+ @Test
+ public void test()
+ throws FieldMappingException {
+ GenericRecord record = TestUtils.generateRandomAvroRecord();
+ AvroGenericRecordTypeMapper typeMapper = new AvroGenericRecordTypeMapper();
+ Object field = typeMapper.getField(record, "field1");
+ Assert.assertEquals(field.getClass(), String.class);
- @Override
- protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties
props) {
- return new Kafka08DataWriter<>(props);
}
+
}
diff --git a/gobblin-docs/index.md b/gobblin-docs/index.md
index f116188..79a71c5 100644
--- a/gobblin-docs/index.md
+++ b/gobblin-docs/index.md
@@ -3,7 +3,7 @@
</p>
Over the years, LinkedIn's data infrastructure team built custom solutions for
ingesting diverse data entities into our Hadoop eco-system. At one point, we
were running 15 types of ingestion pipelines which created significant data
quality, metadata management, development, and operation challenges.
-
+
Our experiences and challenges motivated us to build _Gobblin_. Gobblin is a
universal data ingestion framework for extracting, transforming, and loading
large volume of data from a variety of data sources, e.g., databases, rest
APIs, FTP/SFTP servers, filers, etc., onto Hadoop. Gobblin handles the common
routine tasks required for all data ingestion ETLs, including job/task
scheduling, task partitioning, error handling, state management, data quality
checking, data publishing, etc. Gobb [...]
You can find a lot of useful resources in our wiki pages, including [how to
get started with Gobblin](Getting-Started), an [architecture overview of
Gobblin](Gobblin-Architecture), and
diff --git a/gobblin-docs/sinks/Kafka.md b/gobblin-docs/sinks/Kafka.md
index 10c243f..89936e5 100644
--- a/gobblin-docs/sinks/Kafka.md
+++ b/gobblin-docs/sinks/Kafka.md
@@ -57,7 +57,7 @@ INFO [TaskExecutor-0] gobblin.kafka.writer.KafkaDataWriter
211 - Successfully
#Configuration Details
-The Kafka writer supports all the configuration parameters supported by the
[0.8.2 Java Kafka
Producer](http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html).
All you have to do is prefix `writer.kafka.producerConfig.` to each
configuration property that the producer supports. For example, if you want to
set the `acks` parameter to `all` to ensure full acknowledgement of writes, you
would set `writer.kafka.producerConfig.acks=all` in your pu [...]
+At this time, Gobblin supports integration with Kafka 0.8 and 0.9. The Kafka
writer supports all the configuration parameters supported by the
version-specific Kafka Producer (e.g. [Latest Producer
Configs](https://kafka.apache.org/documentation/#producerconfigs)). All you
have to do is prefix `writer.kafka.producerConfig.` to each configuration
property that the producer supports. For example, if you want to set the `acks`
parameter to `all` to ensure full acknowledgement of writes, you [...]
There are a few key parameters at the Gobblin level that control the behavior
of the data writer.
@@ -66,6 +66,10 @@ There are a few key parameters at the Gobblin level that
control the behavior of
| `writer.kafka.topic` | The topic that the writer will be writing to. At this
time, the writer can only write to a single topic per pipeline. |
| `writer.kafka.failureAllowancePercentage` | The percentage of failures that
you are willing to tolerate while writing to Kafka. Gobblin will mark the
workunit successful and move on if there are failures but not enough to trip
the failure threshold. Only successfully acknowledged writes are counted as
successful, all others are considered as failures. The default for the
failureAllowancePercentage is set to 20.0. This means that as long as 80% of
the data is acknowledged by Kafka, Gobb [...]
| `writer.kafka.commitTimeoutMillis` | The amount of time that the Gobblin
committer will wait before abandoning its wait for unacknowledged writes. This
defaults to 1 minute. |
+| `writer.kafka.keyed` | When set to true, enables key-based writes to Kafka.
This defaults to false. If you set this to true, make sure to set the keyField
configuration property. Serialization of the key is controlled by the Kafka
Producer specific configuration property
(`writer.kafka.producerConfig.key.serializer`) |
+| `writer.kafka.keyField` | The field of the record to use as the key for
writing to Kafka. The field path follows a nested notation. So a top-level
field "name" would be set to "name", a nested field "name" within a top-level
struct "header" would be named "header.name" |
+| `writer.kafka.typeMapperClass` | The class that the writer should use to
extract keys and values from the input record. The default if not specified
assumes that AvroGenericRecordTypeMapper is being used |
+| `writer.kafka.valueField` | The field of the record to use as the value for
writing to Kafka. Defaults to "*" which indicates that the entire record should
be written. For nested records such as a pair of key, value, one would set the
value of this configuration to the field-name for the value structure. |
#What Next?
diff --git
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
index 8df03c3..fc82270 100644
---
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
+++
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Future;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -33,6 +34,7 @@ import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationException;
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
import org.apache.gobblin.writer.WriteResponse;
@@ -47,7 +49,7 @@ import org.apache.gobblin.writer.WriteResponseMapper;
*
*/
@Slf4j
-public class Kafka08DataWriter<D> implements AsyncDataWriter<D> {
+public class Kafka08DataWriter<K,V> implements AsyncDataWriter<V> {
private static final WriteResponseMapper<RecordMetadata>
WRITE_RESPONSE_WRAPPER =
new WriteResponseMapper<RecordMetadata>() {
@@ -74,8 +76,9 @@ public class Kafka08DataWriter<D> implements
AsyncDataWriter<D> {
}
};
- private final Producer<String, D> producer;
+ private final Producer<K, V> producer;
private final String topic;
+ private final KafkaWriterCommonConfig commonConfig;
public static Producer getKafkaProducer(Properties props)
{
@@ -90,14 +93,16 @@ public class Kafka08DataWriter<D> implements
AsyncDataWriter<D> {
}
}
- public Kafka08DataWriter(Properties props) {
+ public Kafka08DataWriter(Properties props)
+ throws ConfigurationException {
this(getKafkaProducer(props), ConfigFactory.parseProperties(props));
}
public Kafka08DataWriter(Producer producer, Config config)
- {
+ throws ConfigurationException {
this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC);
this.producer = producer;
+ this.commonConfig = new KafkaWriterCommonConfig(config);
}
@Override
@@ -110,17 +115,21 @@ public class Kafka08DataWriter<D> implements
AsyncDataWriter<D> {
@Override
- public Future<WriteResponse> write(final D record, final WriteCallback
callback) {
- return new WriteResponseFuture<>(this.producer.send(new
ProducerRecord<String, D>(topic, record), new Callback() {
- @Override
- public void onCompletion(final RecordMetadata metadata, Exception
exception) {
- if (exception != null) {
- callback.onFailure(exception);
- } else {
- callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
- }
- }
- }), WRITE_RESPONSE_WRAPPER);
+ public Future<WriteResponse> write(final V record, final WriteCallback
callback) {
+ try {
+ Pair<K, V> kvPair = KafkaWriterHelper.getKeyValuePair(record,
commonConfig);
+ return new WriteResponseFuture<>(this.producer.send(new
ProducerRecord<>(topic, kvPair.getKey(), kvPair.getValue()),
+ (metadata, exception) -> {
+ if (exception != null) {
+ callback.onFailure(exception);
+ } else {
+ callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
+ }
+ }), WRITE_RESPONSE_WRAPPER);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to generate write request", e);
+ }
}
@Override
diff --git
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
index 7adbba1..589925a 100644
---
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
@@ -22,15 +22,17 @@ import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.ConfigurationException;
import org.apache.gobblin.writer.AsyncDataWriter;
/**
* Builder that hands back a {@link Kafka08DataWriter}
*/
-public class KafkaDataWriterBuilder extends
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+public class KafkaDataWriterBuilder<S,D> extends
AbstractKafkaDataWriterBuilder<S, D> {
@Override
- protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties
props) {
+ protected AsyncDataWriter<D> getAsyncDataWriter(Properties props)
+ throws ConfigurationException {
return new Kafka08DataWriter<>(props);
}
}
diff --git
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterTest.java
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterTest.java
index c7ac96f..3d3387c 100644
---
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterTest.java
+++
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterTest.java
@@ -80,7 +80,7 @@ public class Kafka08DataWriterTest {
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
"localhost:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- Kafka08DataWriter<String> kafka08DataWriter = new
Kafka08DataWriter<String>(props);
+ Kafka08DataWriter kafka08DataWriter = new Kafka08DataWriter(props);
String messageString = "foobar";
WriteCallback callback = mock(WriteCallback.class);
@@ -109,7 +109,7 @@ public class Kafka08DataWriterTest {
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
"localhost:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
- Kafka08DataWriter<byte[]> kafka08DataWriter = new
Kafka08DataWriter<byte[]>(props);
+ Kafka08DataWriter kafka08DataWriter = new Kafka08DataWriter(props);
WriteCallback callback = mock(WriteCallback.class);
byte[] messageBytes = TestUtils.generateRandomBytes();
@@ -145,7 +145,7 @@ public class Kafka08DataWriterTest {
+ KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
- Kafka08DataWriter<GenericRecord> kafka08DataWriter = new
Kafka08DataWriter<>(props);
+ Kafka08DataWriter kafka08DataWriter = new Kafka08DataWriter<>(props);
WriteCallback callback = mock(WriteCallback.class);
GenericRecord record = TestUtils.generateRandomAvroRecord();
diff --git
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterUnitTest.java
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterUnitTest.java
index c31f178..901a3e9 100644
---
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterUnitTest.java
+++
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/kafka/writer/Kafka08DataWriterUnitTest.java
@@ -41,7 +41,7 @@ public class Kafka08DataWriterUnitTest {
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX +
"bootstrap.servers", "localhost:9092");
try {
- Kafka08DataWriter<GenericRecord> kafkaWriter = new
Kafka08DataWriter<>(props);
+ Kafka08DataWriter kafkaWriter = new Kafka08DataWriter<>(props);
}
catch (Exception e)
{
diff --git
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
index 52e5df2..68d8f51 100644
---
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
+++
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Future;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -39,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
+import org.apache.gobblin.configuration.ConfigurationException;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
@@ -54,7 +56,7 @@ import org.apache.gobblin.writer.WriteResponseMapper;
*
*/
@Slf4j
-public class Kafka09DataWriter<D> implements AsyncDataWriter<D> {
+public class Kafka09DataWriter<K, V> implements AsyncDataWriter<V> {
private static final WriteResponseMapper<RecordMetadata>
WRITE_RESPONSE_WRAPPER =
@@ -81,8 +83,9 @@ public class Kafka09DataWriter<D> implements
AsyncDataWriter<D> {
}
};
- private final Producer<String, D> producer;
+ private final Producer<K, V> producer;
private final String topic;
+ private final KafkaWriterCommonConfig commonConfig;
public static Producer getKafkaProducer(Properties props) {
Object producerObject = KafkaWriterHelper.getKafkaProducer(props);
@@ -96,14 +99,17 @@ public class Kafka09DataWriter<D> implements
AsyncDataWriter<D> {
}
}
- public Kafka09DataWriter(Properties props) {
+ public Kafka09DataWriter(Properties props)
+ throws ConfigurationException {
this(getKafkaProducer(props), ConfigFactory.parseProperties(props));
}
- public Kafka09DataWriter(Producer producer, Config config) {
+ public Kafka09DataWriter(Producer producer, Config config)
+ throws ConfigurationException {
this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC);
provisionTopic(topic,config);
this.producer = producer;
+ this.commonConfig = new KafkaWriterCommonConfig(config);
}
@Override
@@ -114,17 +120,23 @@ public class Kafka09DataWriter<D> implements
AsyncDataWriter<D> {
}
@Override
- public Future<WriteResponse> write(final D record, final WriteCallback
callback) {
- return new WriteResponseFuture<>(this.producer.send(new
ProducerRecord<String, D>(topic, record), new Callback() {
- @Override
- public void onCompletion(final RecordMetadata metadata, Exception
exception) {
- if (exception != null) {
- callback.onFailure(exception);
- } else {
- callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
- }
- }
- }), WRITE_RESPONSE_WRAPPER);
+ public Future<WriteResponse> write(final V record, final WriteCallback
callback) {
+ try {
+ Pair<K, V> keyValuePair = KafkaWriterHelper.getKeyValuePair(record,
this.commonConfig);
+ return new WriteResponseFuture<>(this.producer
+ .send(new ProducerRecord<>(topic, keyValuePair.getKey(),
keyValuePair.getValue()), new Callback() {
+ @Override
+ public void onCompletion(final RecordMetadata metadata, Exception
exception) {
+ if (exception != null) {
+ callback.onFailure(exception);
+ } else {
+ callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
+ }
+ }
+ }), WRITE_RESPONSE_WRAPPER);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create a Kafka write request", e);
+ }
}
@Override
diff --git
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
index c97c486..4857256 100644
---
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
+++
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.kafka.writer;
import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationException;
import org.apache.gobblin.kafka.serialize.GsonSerializerBase;
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.kafka.common.serialization.Serializer;
@@ -36,7 +37,8 @@ public class Kafka09JsonObjectWriterBuilder extends
AbstractKafkaDataWriterBuild
KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX +
KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG;
@Override
- protected AsyncDataWriter<JsonObject> getAsyncDataWriter(Properties props) {
+ protected AsyncDataWriter<JsonObject> getAsyncDataWriter(Properties props)
+ throws ConfigurationException {
props.setProperty(VALUE_SERIALIZER_KEY,
KafkaGsonObjectSerializer.class.getName());
return new Kafka09DataWriter<>(props);
}
diff --git
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
index 8869c43..33a6933 100644
---
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
@@ -22,15 +22,17 @@ import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.ConfigurationException;
import org.apache.gobblin.writer.AsyncDataWriter;
/**
* Builder that hands back a {@link Kafka09DataWriter}
*/
-public class KafkaDataWriterBuilder extends
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+public class KafkaDataWriterBuilder<S, D> extends
AbstractKafkaDataWriterBuilder<S, D> {
@Override
- protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties
props) {
+ protected AsyncDataWriter<D> getAsyncDataWriter(Properties props)
+ throws ConfigurationException {
return new Kafka09DataWriter<>(props);
}
}
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
index 45b865d..58cfcb9 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
@@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericRecord;
import org.testng.Assert;
@@ -30,7 +29,7 @@ import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
-import kafka.security.auth.Write;
+import kafka.message.MessageAndMetadata;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.kafka.KafkaTestBase;
@@ -73,8 +72,7 @@ public class Kafka09DataWriterTest {
throws IOException {
try {
_kafkaTestHelper.stopClients();
- }
- finally {
+ } finally {
_kafkaTestHelper.stopServers();
}
}
@@ -88,7 +86,7 @@ public class Kafka09DataWriterTest {
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
"localhost:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- Kafka09DataWriter<String> kafka09DataWriter = new
Kafka09DataWriter<String>(props);
+ Kafka09DataWriter<String, String> kafka09DataWriter = new
Kafka09DataWriter<>(props);
String messageString = "foobar";
WriteCallback callback = mock(WriteCallback.class);
Future<WriteResponse> future;
@@ -103,9 +101,7 @@ public class Kafka09DataWriterTest {
byte[] message =
_kafkaTestHelper.getIteratorForTopic(topic).next().message();
String messageReceived = new String(message);
Assert.assertEquals(messageReceived, messageString);
- }
- finally
- {
+ } finally {
kafka09DataWriter.close();
}
@@ -121,15 +117,13 @@ public class Kafka09DataWriterTest {
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
"localhost:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
- Kafka09DataWriter<byte[]> kafka09DataWriter = new
Kafka09DataWriter<byte[]>(props);
+ Kafka09DataWriter<String, byte[]> kafka09DataWriter = new
Kafka09DataWriter<>(props);
WriteCallback callback = mock(WriteCallback.class);
byte[] messageBytes = TestUtils.generateRandomBytes();
try {
kafka09DataWriter.write(messageBytes, callback);
- }
- finally
- {
+ } finally {
kafka09DataWriter.close();
}
@@ -157,15 +151,13 @@ public class Kafka09DataWriterTest {
+ KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
- Kafka09DataWriter<GenericRecord> kafka09DataWriter = new
Kafka09DataWriter<>(props);
+ Kafka09DataWriter<String, GenericRecord> kafka09DataWriter = new
Kafka09DataWriter<>(props);
WriteCallback callback = mock(WriteCallback.class);
GenericRecord record = TestUtils.generateRandomAvroRecord();
try {
kafka09DataWriter.write(record, callback);
- }
- finally
- {
+ } finally {
kafka09DataWriter.close();
}
@@ -180,5 +172,90 @@ public class Kafka09DataWriterTest {
}
+ @Test
+ public void testKeyedAvroSerialization()
+ throws IOException, InterruptedException, SchemaRegistryException {
+ String topic = "testAvroSerialization09";
+ _kafkaTestHelper.provisionTopic(topic);
+ Properties props = new Properties();
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX +
"bootstrap.servers",
+ "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX +
"value.serializer",
+ LiAvroSerializer.class.getName());
+ props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG,
"true");
+ String keyField = "field1";
+
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG,
keyField);
+
+
+ // set up mock schema registry
+
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+ + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+ ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+ Kafka09DataWriter<String, GenericRecord> kafka09DataWriter = new
Kafka09DataWriter<>(props);
+ WriteCallback callback = mock(WriteCallback.class);
+
+ GenericRecord record = TestUtils.generateRandomAvroRecord();
+ try {
+ kafka09DataWriter.write(record, callback);
+ } finally {
+ kafka09DataWriter.close();
+ }
+
+ verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+ verify(callback, never()).onFailure(isA(Exception.class));
+ MessageAndMetadata<byte[], byte[]> value =
_kafkaTestHelper.getIteratorForTopic(topic).next();
+ byte[] key = value.key();
+ byte[] message = value.message();
+ ConfigDrivenMd5SchemaRegistry schemaReg = new
ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
+ LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);
+ GenericRecord receivedRecord = deser.deserialize(topic, message);
+ Assert.assertEquals(record.toString(), receivedRecord.toString());
+ Assert.assertEquals(new String(key), record.get(keyField));
+ }
+
+ @Test
+ public void testValueSerialization()
+ throws IOException, InterruptedException, SchemaRegistryException {
+ String topic = "testAvroSerialization09";
+ _kafkaTestHelper.provisionTopic(topic);
+ Properties props = new Properties();
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX +
"bootstrap.servers",
+ "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX +
"value.serializer",
+ "org.apache.kafka.common.serialization.StringSerializer");
+ props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG,
"true");
+ String keyField = "field1";
+
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG,
keyField);
+
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_VALUEFIELD_CONFIG,
keyField);
+
+
+ // set up mock schema registry
+
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+ + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+ ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+ Kafka09DataWriter<String, GenericRecord> kafka09DataWriter = new
Kafka09DataWriter<>(props);
+ WriteCallback callback = mock(WriteCallback.class);
+
+ GenericRecord record = TestUtils.generateRandomAvroRecord();
+ try {
+ kafka09DataWriter.write(record, callback);
+ } finally {
+ kafka09DataWriter.close();
+ }
+
+ verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+ verify(callback, never()).onFailure(isA(Exception.class));
+ MessageAndMetadata<byte[], byte[]> value =
_kafkaTestHelper.getIteratorForTopic(topic).next();
+ byte[] key = value.key();
+ byte[] message = value.message();
+ Assert.assertEquals(new String(message), record.get(keyField));
+ Assert.assertEquals(new String(key), record.get(keyField));
+ }
}
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
index 4765d1b..7b441e8 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
@@ -107,7 +107,7 @@ public class Kafka09TopicProvisionTest {
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
_kafkaTestHelper.getBootServersList());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- Kafka09DataWriter<String> kafka09DataWriter = new
Kafka09DataWriter<String>(props);
+ Kafka09DataWriter<String, String> kafka09DataWriter = new
Kafka09DataWriter<>(props);
String zookeeperConnect = "localhost:"+_kafkaTestHelper.getZookeeperPort();
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
@@ -160,7 +160,7 @@ public class Kafka09TopicProvisionTest {
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
liveBroker);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- Kafka09DataWriter<String> kafka09DataWriter = new
Kafka09DataWriter<String>(props);
+ Kafka09DataWriter<String, String> kafka09DataWriter = new
Kafka09DataWriter<>(props);
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
// Note: You must initialize the ZkClient with ZKStringSerializer. If you
don't, then
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
index a3db916..372dc91 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
@@ -22,6 +22,7 @@ import java.util.Properties;
import com.typesafe.config.Config;
+import org.apache.gobblin.configuration.ConfigurationException;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
@@ -35,7 +36,7 @@ import org.apache.gobblin.writer.DataWriterBuilder;
*/
public abstract class AbstractKafkaDataWriterBuilder<S, D> extends
DataWriterBuilder<S, D> {
- protected abstract AsyncDataWriter<D> getAsyncDataWriter(Properties props);
+ protected abstract AsyncDataWriter<D> getAsyncDataWriter(Properties props)
throws ConfigurationException;
/**
* Build a {@link DataWriter}.
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfig.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfig.java
new file mode 100644
index 0000000..feb4eae
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfig.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.writer;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.ConfigurationException;
+import org.apache.gobblin.types.TypeMapper;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.*;
+
+
+/**
+ * Version-independent configuration for Kafka Writers
+ */
+public class KafkaWriterCommonConfig {
+
+ @Getter
+ private final boolean keyed;
+ @Getter
+ private final String keyField;
+ @Getter
+ private final TypeMapper typeMapper;
+ @Getter
+ private final String valueField;
+
+ public KafkaWriterCommonConfig(Config config)
+ throws ConfigurationException {
+ try {
+ this.keyed = ConfigUtils.getBoolean(config, WRITER_KAFKA_KEYED_CONFIG,
WRITER_KAFKA_KEYED_DEFAULT);
+ this.keyField = ConfigUtils.getString(config,
WRITER_KAFKA_KEYFIELD_CONFIG, WRITER_KAFKA_KEYFIELD_DEFAULT);
+ String typeMapperClass = ConfigUtils.getString(config,
WRITER_KAFKA_TYPEMAPPERCLASS_CONFIG,
+ WRITER_KAFKA_TYPEMAPPERCLASS_DEFAULT);
+ this.typeMapper = (TypeMapper)
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(typeMapperClass));
+ this.valueField = ConfigUtils.getString(config,
WRITER_KAFKA_VALUEFIELD_CONFIG, WRITER_KAFKA_VALUEFIELD_DEFAULT);
+ Preconditions.checkArgument(!this.keyed || (this.keyField != null),
+ "With keyed writes to Kafka, you must provide a key fieldname to be
used");
+ } catch (Exception e) {
+ throw new ConfigurationException("Failed to configure
KafkaWriterCommonConfig", e);
+ }
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index d1522ab..1255e52 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -17,6 +17,10 @@
package org.apache.gobblin.kafka.writer;
+import org.apache.gobblin.types.AvroGenericRecordTypeMapper;
+import org.apache.gobblin.types.TypeMapper;
+
+
/**
* Configuration keys for a KafkaWriter.
*/
@@ -33,6 +37,14 @@ public class KafkaWriterConfigurationKeys {
static final String FAILURE_ALLOWANCE_PCT_CONFIG =
"writer.kafka.failureAllowancePercentage";
static final double FAILURE_ALLOWANCE_PCT_DEFAULT = 20.0;
+ public static final String WRITER_KAFKA_KEYED_CONFIG = "writer.kafka.keyed";
+ public static final boolean WRITER_KAFKA_KEYED_DEFAULT = false;
+ public static final String WRITER_KAFKA_KEYFIELD_CONFIG =
"writer.kafka.keyField";
+ public static final String WRITER_KAFKA_KEYFIELD_DEFAULT = null;
+ public static final String WRITER_KAFKA_TYPEMAPPERCLASS_CONFIG =
"writer.kafka.typeMapperClass";
+ public static final String WRITER_KAFKA_TYPEMAPPERCLASS_DEFAULT =
AvroGenericRecordTypeMapper.class.getName();
+ public static final String WRITER_KAFKA_VALUEFIELD_CONFIG =
"writer.kafka.valueField";
+ public static final String WRITER_KAFKA_VALUEFIELD_DEFAULT =
TypeMapper.FIELD_PATH_ALL;
/**
* Kafka producer configurations will be passed through as is as long as
they are prefixed
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
index 1999480..a6837ae 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
@@ -21,6 +21,8 @@ import java.lang.reflect.InvocationTargetException;
import java.util.Properties;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import com.google.common.base.Throwables;
import com.typesafe.config.Config;
@@ -29,6 +31,8 @@ import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.types.FieldMappingException;
+import org.apache.gobblin.types.TypeMapper;
import org.apache.gobblin.util.ConfigUtils;
import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.*;
@@ -90,4 +94,18 @@ public class KafkaWriterHelper {
throw Throwables.propagate(e);
}
}
+
+ public static <K, V> Pair<K,V> getKeyValuePair(V record,
KafkaWriterCommonConfig commonConfig)
+ throws FieldMappingException {
+ K key = null;
+ TypeMapper typeMapper = commonConfig.getTypeMapper();
+ if (commonConfig.isKeyed()) {
+ key = (K) typeMapper.getField(record, commonConfig.getKeyField());
+ }
+ V value = record;
+ if (!commonConfig.getValueField().equals(TypeMapper.FIELD_PATH_ALL)) {
+ value = (V) typeMapper.getField(record, commonConfig.getValueField());
+ }
+ return new ImmutablePair<>(key, value);
+ }
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfigTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfigTest.java
new file mode 100644
index 0000000..bcee5b7
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterCommonConfigTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationException;
+import org.apache.gobblin.types.AvroGenericRecordTypeMapper;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class KafkaWriterCommonConfigTest {
+
+ @Test
+ public void testEmptyConstructor()
+ throws ConfigurationException {
+ Properties properties = new Properties();
+ Config config = ConfigUtils.propertiesToConfig(properties);
+ KafkaWriterCommonConfig kafkaWriterCommonConfig = new
KafkaWriterCommonConfig(config);
+ Assert.assertEquals(kafkaWriterCommonConfig.isKeyed(), false);
+ Assert.assertEquals(kafkaWriterCommonConfig.getKeyField(), null);
+
Assert.assertEquals(kafkaWriterCommonConfig.getTypeMapper().getClass().getCanonicalName(),
+ AvroGenericRecordTypeMapper.class.getCanonicalName());
+ Assert.assertEquals(kafkaWriterCommonConfig.getValueField(), "*");
+ }
+
+ @Test
+ public void testKeyedConstructor() {
+ Properties properties = new Properties();
+
properties.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG,
"true");
+ try {
+ Config config = ConfigUtils.propertiesToConfig(properties);
+ KafkaWriterCommonConfig kafkaWriterCommonConfig = new
KafkaWriterCommonConfig(config);
+ Assert.fail("Should fail to construct with keyed writes set to true,
without setting key field");
+ } catch (ConfigurationException ce) {
+ // Expected
+ }
+
properties.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG,
"key");
+ try {
+ Config config = ConfigUtils.propertiesToConfig(properties);
+ KafkaWriterCommonConfig kafkaWriterCommonConfig = new
KafkaWriterCommonConfig(config);
+ Assert.assertEquals(kafkaWriterCommonConfig.isKeyed(), true);
+ Assert.assertEquals(kafkaWriterCommonConfig.getKeyField(), "key");
+ // Check default type mapper is AvroGenericRecord based
+
Assert.assertEquals(kafkaWriterCommonConfig.getTypeMapper().getClass().getCanonicalName(),
+ AvroGenericRecordTypeMapper.class.getCanonicalName());
+ Assert.assertEquals(kafkaWriterCommonConfig.getValueField(), "*");
+ } catch (ConfigurationException ce) {
+ Assert.fail("Should successfully construct with keyed writes set to
true, and with setting key field", ce);
+ }
+
properties.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_TYPEMAPPERCLASS_CONFIG,
TestTypeMapper.class.getCanonicalName());
+ try {
+ Config config = ConfigUtils.propertiesToConfig(properties);
+ KafkaWriterCommonConfig kafkaWriterCommonConfig = new
KafkaWriterCommonConfig(config);
+ Assert.assertEquals(kafkaWriterCommonConfig.isKeyed(), true);
+ Assert.assertEquals(kafkaWriterCommonConfig.getKeyField(), "key");
+
Assert.assertEquals(kafkaWriterCommonConfig.getTypeMapper().getClass().getCanonicalName(),
+ TestTypeMapper.class.getCanonicalName());
+ Assert.assertEquals(kafkaWriterCommonConfig.getValueField(), "*");
+ } catch (ConfigurationException ce) {
+ Assert.fail("Should successfully construct", ce);
+ }
+
properties.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_VALUEFIELD_CONFIG,
"foobar");
+ try {
+ Config config = ConfigUtils.propertiesToConfig(properties);
+ KafkaWriterCommonConfig kafkaWriterCommonConfig = new
KafkaWriterCommonConfig(config);
+ Assert.assertEquals(kafkaWriterCommonConfig.isKeyed(), true);
+ Assert.assertEquals(kafkaWriterCommonConfig.getKeyField(), "key");
+
Assert.assertEquals(kafkaWriterCommonConfig.getTypeMapper().getClass().getCanonicalName(),
+ TestTypeMapper.class.getCanonicalName());
+ Assert.assertEquals(kafkaWriterCommonConfig.getValueField(), "foobar");
+ }
+ catch (ConfigurationException ce) {
+ Assert.fail("Should successfully construct", ce);
+ }
+ }
+
+
+}
diff --git
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/TestTypeMapper.java
similarity index 66%
copy from
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
copy to
gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/TestTypeMapper.java
index 7adbba1..0b7dfa7 100644
---
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/TestTypeMapper.java
@@ -17,20 +17,15 @@
package org.apache.gobblin.kafka.writer;
-import java.util.Properties;
+import org.apache.gobblin.types.FieldMappingException;
+import org.apache.gobblin.types.TypeMapper;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.gobblin.writer.AsyncDataWriter;
-
-/**
- * Builder that hands back a {@link Kafka08DataWriter}
- */
-public class KafkaDataWriterBuilder extends
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
+public class TestTypeMapper implements TypeMapper {
@Override
- protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties
props) {
- return new Kafka08DataWriter<>(props);
+ public Object getField(Object record, String fieldPath)
+ throws FieldMappingException {
+ return null;
}
}