This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 87506a3 [GOBBLIN-1143] Add a generic wrapper producer client to
communicate with Kafka
87506a3 is described below
commit 87506a370c2ee4b607d93b6c8d10b777885da7fc
Author: Zihan Li <[email protected]>
AuthorDate: Thu Jun 18 18:19:13 2020 -0700
[GOBBLIN-1143] Add a generic wrapper producer client to communicate with
Kafka
Closes #2980 from ZihanLi58/GOBBLIN-1143
---
.../gobblin/kafka/writer/Kafka08DataWriter.java | 31 +++++++++++++-------
.../gobblin/kafka/writer/Kafka09DataWriter.java | 20 +++++++++----
.../gobblin/kafka/writer/KafkaDataWriter.java | 34 ++++++++++++++++++++++
3 files changed, 69 insertions(+), 16 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
index fc82270..347dd53 100644
---
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
+++
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
@@ -49,9 +49,9 @@ import org.apache.gobblin.writer.WriteResponseMapper;
*
*/
@Slf4j
-public class Kafka08DataWriter<K,V> implements AsyncDataWriter<V> {
+public class Kafka08DataWriter<K,V> implements KafkaDataWriter<K, V> {
- private static final WriteResponseMapper<RecordMetadata>
WRITE_RESPONSE_WRAPPER =
+ public static final WriteResponseMapper<RecordMetadata>
WRITE_RESPONSE_WRAPPER =
new WriteResponseMapper<RecordMetadata>() {
@Override
@@ -118,20 +118,31 @@ public class Kafka08DataWriter<K,V> implements
AsyncDataWriter<V> {
public Future<WriteResponse> write(final V record, final WriteCallback
callback) {
try {
Pair<K, V> kvPair = KafkaWriterHelper.getKeyValuePair(record,
commonConfig);
- return new WriteResponseFuture<>(this.producer.send(new
ProducerRecord<>(topic, kvPair.getKey(), kvPair.getValue()),
- (metadata, exception) -> {
- if (exception != null) {
- callback.onFailure(exception);
- } else {
- callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
- }
- }), WRITE_RESPONSE_WRAPPER);
+ return write(kvPair, callback);
}
catch (Exception e) {
throw new RuntimeException("Failed to generate write request", e);
}
}
+ public Future<WriteResponse> write(Pair<K, V> keyValuePair, final
WriteCallback callback) {
+ try {
+ return new WriteResponseFuture<>(this.producer
+ .send(new ProducerRecord<>(topic, keyValuePair.getKey(),
keyValuePair.getValue()), new Callback() {
+ @Override
+ public void onCompletion(final RecordMetadata metadata, Exception
exception) {
+ if (exception != null) {
+ callback.onFailure(exception);
+ } else {
+ callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
+ }
+ }
+ }), WRITE_RESPONSE_WRAPPER);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create a Kafka write request", e);
+ }
+ }
+
@Override
public void flush()
throws IOException {
diff --git
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
index 68d8f51..3bef426 100644
---
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
+++
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
@@ -56,10 +56,10 @@ import org.apache.gobblin.writer.WriteResponseMapper;
*
*/
@Slf4j
-public class Kafka09DataWriter<K, V> implements AsyncDataWriter<V> {
+public class Kafka09DataWriter<K, V> implements KafkaDataWriter<K, V> {
-
- private static final WriteResponseMapper<RecordMetadata>
WRITE_RESPONSE_WRAPPER =
+
+ public static final WriteResponseMapper<RecordMetadata>
WRITE_RESPONSE_WRAPPER =
new WriteResponseMapper<RecordMetadata>() {
@Override
@@ -123,6 +123,14 @@ public class Kafka09DataWriter<K, V> implements
AsyncDataWriter<V> {
public Future<WriteResponse> write(final V record, final WriteCallback
callback) {
try {
Pair<K, V> keyValuePair = KafkaWriterHelper.getKeyValuePair(record,
this.commonConfig);
+ return write(keyValuePair, callback);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create a Kafka write request", e);
+ }
+ }
+
+ public Future<WriteResponse> write(Pair<K, V> keyValuePair, final
WriteCallback callback) {
+ try {
return new WriteResponseFuture<>(this.producer
.send(new ProducerRecord<>(topic, keyValuePair.getKey(),
keyValuePair.getValue()), new Callback() {
@Override
@@ -144,7 +152,7 @@ public class Kafka09DataWriter<K, V> implements
AsyncDataWriter<V> {
throws IOException {
this.producer.flush();
}
-
+
private void provisionTopic(String topicName,Config config) {
String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
if(!config.hasPath(zooKeeperPropKey)) {
@@ -163,11 +171,11 @@ public class Kafka09DataWriter<K, V> implements
AsyncDataWriter<V> {
ZkUtils zkUtils = new ZkUtils(zkClient, new
ZkConnection(zookeeperConnect), false);
int partitions = ConfigUtils.getInt(config,
KafkaWriterConfigurationKeys.PARTITION_COUNT,
KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
int replication = ConfigUtils.getInt(config,
KafkaWriterConfigurationKeys.REPLICATION_COUNT,
KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
- Properties topicConfig = new Properties();
+ Properties topicConfig = new Properties();
if(AdminUtils.topicExists(zkUtils, topicName)) {
log.debug("Topic"+topicName+" already Exists with replication:
"+replication+" and partitions :"+partitions);
return;
- }
+ }
try {
AdminUtils.createTopic(zkUtils, topicName, partitions, replication,
topicConfig);
} catch (RuntimeException e) {
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriter.java
new file mode 100644
index 0000000..c289d9b
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import java.util.concurrent.Future;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+
+
+public interface KafkaDataWriter<K, V> extends AsyncDataWriter<V> {
+ /**
+ * Asynchronously write a Key-Value pair, execute the callback on
success/failure
+ */
+ Future<WriteResponse> write(Pair<K, V> record, @Nullable WriteCallback
callback);
+
+}