This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 77bf4357ed781a028a75010819b0808910268054
Author: Hussein Awala <[email protected]>
AuthorDate: Wed Aug 16 04:22:45 2023 +0200

    [HUDI-6683][FOLLOW-UP] Rename kafka record value variable in 
JsonKafkaSource and replace casting to String by calling toString (#9451)
---
 .../java/org/apache/hudi/utilities/sources/JsonKafkaSource.java   | 8 ++++----
 .../org/apache/hudi/utilities/sources/helpers/AvroConvertor.java  | 2 +-
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index de67dc171a9..f31c9b7e542 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -80,17 +80,17 @@ public class JsonKafkaSource extends KafkaSource<String> {
         List<String> stringList = new LinkedList<>();
         ObjectMapper om = new ObjectMapper();
         partitionIterator.forEachRemaining(consumerRecord -> {
-          String record = consumerRecord.value().toString();
-          String recordKey = (String) consumerRecord.key();
+          String recordValue = consumerRecord.value().toString();
+          String recordKey = consumerRecord.key().toString();
           try {
-            ObjectNode jsonNode = (ObjectNode) om.readTree(record);
+            ObjectNode jsonNode = (ObjectNode) om.readTree(recordValue);
             jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
             jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, 
consumerRecord.partition());
             jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
consumerRecord.timestamp());
             jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey);
             stringList.add(om.writeValueAsString(jsonNode));
           } catch (Throwable e) {
-            stringList.add(record);
+            stringList.add(recordValue);
           }
         });
         return stringList.iterator();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 1a7daaa7bca..89191cb465c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -180,7 +180,7 @@ public class AvroConvertor implements Serializable {
     recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
     recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, 
consumerRecord.partition());
     recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
consumerRecord.timestamp());
-    recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, 
String.valueOf(consumerRecord.key()));
+    recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, 
consumerRecord.key().toString());
     return recordBuilder.build();
   }
 

Reply via email to