[ 
https://issues.apache.org/jira/browse/GOBBLIN-820?focusedWorklogId=273711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-273711
 ]

ASF GitHub Bot logged work on GOBBLIN-820:
------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jul/19 05:49
            Start Date: 09/Jul/19 05:49
    Worklog Time Spent: 10m 
      Work Description: htran1 commented on pull request #2682: [GOBBLIN-820] 
Add keyed write capability to Kafka writer
URL: https://github.com/apache/incubator-gobblin/pull/2682#discussion_r301399447
 
 

 ##########
 File path: 
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
 ##########
 @@ -180,5 +179,94 @@ public void testAvroSerialization()
   }
 
 
+  @Test
+  public void testKeyedAvroSerialization()
+      throws IOException, InterruptedException, SchemaRegistryException {
+    String topic = "testAvroSerialization09";
+    _kafkaTestHelper.provisionTopic(topic);
+    Properties props = new Properties();
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
"bootstrap.servers",
+        "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
"value.serializer",
+        LiAvroSerializer.class.getName());
+    props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, 
"true");
+    String keyField = "field1";
+    
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG, 
keyField);
+
+
+    // set up mock schema registry
+
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+            + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+        ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+    Kafka09DataWriter<String, GenericRecord> kafka09DataWriter = new 
Kafka09DataWriter<>(props);
+    WriteCallback callback = mock(WriteCallback.class);
+
+    GenericRecord record = TestUtils.generateRandomAvroRecord();
+    try {
+      kafka09DataWriter.write(record, callback);
+    }
+    finally
+    {
+      kafka09DataWriter.close();
+    }
+
+    verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+    verify(callback, never()).onFailure(isA(Exception.class));
+    MessageAndMetadata<byte[], byte[]> value = 
_kafkaTestHelper.getIteratorForTopic(topic).next();
+    byte[] key = value.key();
+    byte[] message = value.message();
+    ConfigDrivenMd5SchemaRegistry schemaReg = new 
ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
+    LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);
+    GenericRecord receivedRecord = deser.deserialize(topic, message);
+    Assert.assertEquals(record.toString(), receivedRecord.toString());
+    Assert.assertEquals(new String(key), record.get(keyField));
+  }
+
+  @Test
+  public void testValueSerialization()
+      throws IOException, InterruptedException, SchemaRegistryException {
+    String topic = "testAvroSerialization09";
+    _kafkaTestHelper.provisionTopic(topic);
+    Properties props = new Properties();
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
"bootstrap.servers",
+        "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
"value.serializer",
+    "org.apache.kafka.common.serialization.StringSerializer");
+    props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, 
"true");
+    String keyField = "field1";
+    
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG, 
keyField);
+    
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_VALUEFIELD_CONFIG, 
keyField);
+
+
+    // set up mock schema registry
+
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+            + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+        ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+    Kafka09DataWriter<String, GenericRecord> kafka09DataWriter = new 
Kafka09DataWriter<>(props);
+    WriteCallback callback = mock(WriteCallback.class);
+
+    GenericRecord record = TestUtils.generateRandomAvroRecord();
+    try {
+      kafka09DataWriter.write(record, callback);
+    }
+    finally
 
 Review comment:
   Format to have `finally` and the two braces on the same line?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 273711)
            Time Spent: 10m
    Remaining Estimate: 0h

> Support keyed writer for Kafka
> ------------------------------
>
>                 Key: GOBBLIN-820
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-820
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-kafka
>            Reporter: Shirshanka Das
>            Assignee: Shirshanka Das
>            Priority: Major
>              Labels: kafka
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The current Kafka writer uses the non-keyed API to produce to Kafka. 
> This issue proposes to add support for keyed writes to Kafka. 
> Constraints:
>  * Minimal changes needed to existing pipeline configuration to add support 
> for keyed writes to Kafka
>  * Minimal changes to gobblin-core. Do not add general support for 
> keyed-writers as part of this issue. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to