This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 87506a3  [GOBBLIN-1143] Add a generic wrapper producer client to 
communicate with Kafka
87506a3 is described below

commit 87506a370c2ee4b607d93b6c8d10b777885da7fc
Author: Zihan Li <[email protected]>
AuthorDate: Thu Jun 18 18:19:13 2020 -0700

    [GOBBLIN-1143] Add a generic wrapper producer client to communicate with 
Kafka
    
    Closes #2980 from ZihanLi58/GOBBLIN-1143
---
 .../gobblin/kafka/writer/Kafka08DataWriter.java    | 31 +++++++++++++-------
 .../gobblin/kafka/writer/Kafka09DataWriter.java    | 20 +++++++++----
 .../gobblin/kafka/writer/KafkaDataWriter.java      | 34 ++++++++++++++++++++++
 3 files changed, 69 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
index fc82270..347dd53 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java
@@ -49,9 +49,9 @@ import org.apache.gobblin.writer.WriteResponseMapper;
  *
  */
 @Slf4j
-public class Kafka08DataWriter<K,V> implements AsyncDataWriter<V> {
+public class Kafka08DataWriter<K,V> implements KafkaDataWriter<K, V> {
 
-  private static final WriteResponseMapper<RecordMetadata> 
WRITE_RESPONSE_WRAPPER =
+  public static final WriteResponseMapper<RecordMetadata> 
WRITE_RESPONSE_WRAPPER =
       new WriteResponseMapper<RecordMetadata>() {
 
         @Override
@@ -118,20 +118,31 @@ public class Kafka08DataWriter<K,V> implements 
AsyncDataWriter<V> {
   public Future<WriteResponse> write(final V record, final WriteCallback 
callback) {
     try {
       Pair<K, V> kvPair = KafkaWriterHelper.getKeyValuePair(record, 
commonConfig);
-      return new WriteResponseFuture<>(this.producer.send(new 
ProducerRecord<>(topic, kvPair.getKey(), kvPair.getValue()),
-          (metadata, exception) -> {
-            if (exception != null) {
-              callback.onFailure(exception);
-            } else {
-              callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
-            }
-          }), WRITE_RESPONSE_WRAPPER);
+      return write(kvPair, callback);
     }
     catch (Exception e) {
       throw new RuntimeException("Failed to generate write request", e);
     }
   }
 
+  public Future<WriteResponse> write(Pair<K, V> keyValuePair, final 
WriteCallback callback) {
+    try {
+      return new WriteResponseFuture<>(this.producer
+          .send(new ProducerRecord<>(topic, keyValuePair.getKey(), 
keyValuePair.getValue()), new Callback() {
+            @Override
+            public void onCompletion(final RecordMetadata metadata, Exception 
exception) {
+              if (exception != null) {
+                callback.onFailure(exception);
+              } else {
+                callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
+              }
+            }
+          }), WRITE_RESPONSE_WRAPPER);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create a Kafka write request", e);
+    }
+  }
+
   @Override
   public void flush()
       throws IOException {
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
index 68d8f51..3bef426 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
@@ -56,10 +56,10 @@ import org.apache.gobblin.writer.WriteResponseMapper;
  *
  */
 @Slf4j
-public class Kafka09DataWriter<K, V> implements AsyncDataWriter<V> {
+public class Kafka09DataWriter<K, V> implements KafkaDataWriter<K, V> {
 
-  
-  private static final WriteResponseMapper<RecordMetadata> 
WRITE_RESPONSE_WRAPPER =
+
+  public static final WriteResponseMapper<RecordMetadata> 
WRITE_RESPONSE_WRAPPER =
       new WriteResponseMapper<RecordMetadata>() {
 
         @Override
@@ -123,6 +123,14 @@ public class Kafka09DataWriter<K, V> implements 
AsyncDataWriter<V> {
   public Future<WriteResponse> write(final V record, final WriteCallback 
callback) {
     try {
       Pair<K, V> keyValuePair = KafkaWriterHelper.getKeyValuePair(record, 
this.commonConfig);
+      return write(keyValuePair, callback);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create a Kafka write request", e);
+    }
+  }
+
+  public Future<WriteResponse> write(Pair<K, V> keyValuePair, final 
WriteCallback callback) {
+    try {
       return new WriteResponseFuture<>(this.producer
           .send(new ProducerRecord<>(topic, keyValuePair.getKey(), 
keyValuePair.getValue()), new Callback() {
             @Override
@@ -144,7 +152,7 @@ public class Kafka09DataWriter<K, V> implements 
AsyncDataWriter<V> {
       throws IOException {
          this.producer.flush();
   }
-  
+
   private void provisionTopic(String topicName,Config config) {
     String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
     if(!config.hasPath(zooKeeperPropKey)) {
@@ -163,11 +171,11 @@ public class Kafka09DataWriter<K, V> implements 
AsyncDataWriter<V> {
     ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(zookeeperConnect), false);
     int partitions = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.PARTITION_COUNT, 
KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
     int replication = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.REPLICATION_COUNT, 
KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
-    Properties topicConfig = new Properties(); 
+    Properties topicConfig = new Properties();
     if(AdminUtils.topicExists(zkUtils, topicName)) {
           log.debug("Topic"+topicName+" already Exists with replication: 
"+replication+" and partitions :"+partitions);
        return;
-    } 
+    }
     try {
        AdminUtils.createTopic(zkUtils, topicName, partitions, replication, 
topicConfig);
     } catch (RuntimeException e) {
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriter.java
new file mode 100644
index 0000000..c289d9b
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import java.util.concurrent.Future;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+
+
+public interface KafkaDataWriter<K, V> extends AsyncDataWriter<V> {
+  /**
+   * Asynchronously write a Key-Value pair, execute the callback on 
success/failure
+   */
+  Future<WriteResponse> write(Pair<K, V> record, @Nullable WriteCallback 
callback);
+
+}

Reply via email to