This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 77bf4357ed781a028a75010819b0808910268054 Author: Hussein Awala <[email protected]> AuthorDate: Wed Aug 16 04:22:45 2023 +0200 [HUDI-6683][FOLLOW-UP] Rename kafka record value variable in JsonKafkaSource and replace casting to String by calling toString (#9451) --- .../java/org/apache/hudi/utilities/sources/JsonKafkaSource.java | 8 ++++---- .../org/apache/hudi/utilities/sources/helpers/AvroConvertor.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index de67dc171a9..f31c9b7e542 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -80,17 +80,17 @@ public class JsonKafkaSource extends KafkaSource<String> { List<String> stringList = new LinkedList<>(); ObjectMapper om = new ObjectMapper(); partitionIterator.forEachRemaining(consumerRecord -> { - String record = consumerRecord.value().toString(); - String recordKey = (String) consumerRecord.key(); + String recordValue = consumerRecord.value().toString(); + String recordKey = consumerRecord.key().toString(); try { - ObjectNode jsonNode = (ObjectNode) om.readTree(record); + ObjectNode jsonNode = (ObjectNode) om.readTree(recordValue); jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset()); jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition()); jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp()); jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey); stringList.add(om.writeValueAsString(jsonNode)); } catch (Throwable e) { - stringList.add(record); + stringList.add(recordValue); } }); return stringList.iterator(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java index 1a7daaa7bca..89191cb465c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java @@ -180,7 +180,7 @@ public class AvroConvertor implements Serializable { recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset()); recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition()); recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp()); - recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, String.valueOf(consumerRecord.key())); + recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, consumerRecord.key().toString()); return recordBuilder.build(); }
