This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d36e52694984239f739bdad4362e85e644fa40e1
Author: Tim Brown <[email protected]>
AuthorDate: Fri Mar 1 15:50:46 2024 -0600

    [HUDI-7464] Fix minor bugs in kafka post-processing related code (#10772)
---
 .../utilities/schema/KafkaOffsetPostProcessor.java | 35 ++++++++----
 .../hudi/utilities/sources/JsonKafkaSource.java    |  4 +-
 .../schema/TestKafkaOffsetPostProcessor.java       | 65 ++++++++++++++++++++++
 .../utilities/sources/TestJsonKafkaSource.java     |  3 +
 4 files changed, 94 insertions(+), 13 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
index 500bb0c7f99..294838a435f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
@@ -18,18 +18,18 @@
 
 package org.apache.hudi.utilities.schema;
 
-import org.apache.avro.JsonProperties;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.utilities.config.HoodieStreamerConfig;
 
+import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
@@ -51,8 +51,6 @@ public class KafkaOffsetPostProcessor extends 
SchemaPostProcessor {
     }
   }
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetPostProcessor.class);
-
   public static final String KAFKA_SOURCE_OFFSET_COLUMN = 
"_hoodie_kafka_source_offset";
   public static final String KAFKA_SOURCE_PARTITION_COLUMN = 
"_hoodie_kafka_source_partition";
   public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN = 
"_hoodie_kafka_source_timestamp";
@@ -65,16 +63,29 @@ public class KafkaOffsetPostProcessor extends 
SchemaPostProcessor {
   @Override
   public Schema processSchema(Schema schema) {
     // this method adds kafka offset fields namely source offset, partition, 
timestamp and kafka message key to the schema of the batch.
+    List<Schema.Field> fieldList = schema.getFields();
+    Set<String> fieldNames = 
fieldList.stream().map(Schema.Field::name).collect(Collectors.toSet());
+    // if the source schema already contains the kafka offset fields, then 
return the schema as is.
+    if (fieldNames.containsAll(Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, 
KAFKA_SOURCE_KEY_COLUMN))) {
+      return schema;
+    }
     try {
-      List<Schema.Field> fieldList = schema.getFields();
       List<Schema.Field> newFieldList = fieldList.stream()
           .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal())).collect(Collectors.toList());
-      newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, 
Schema.create(Schema.Type.LONG), "offset column", 0));
-      newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, 
Schema.create(Schema.Type.INT), "partition column", 0));
-      newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
Schema.create(Schema.Type.LONG), "timestamp column", 0));
-      newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, 
createNullableSchema(Schema.Type.STRING), "kafka key column", 
JsonProperties.NULL_VALUE));
-      Schema newSchema = Schema.createRecord(schema.getName() + "_processed", 
schema.getDoc(), schema.getNamespace(), false, newFieldList);
-      return newSchema;
+      // handle case where source schema provider may have already set 1 or 
more of these fields
+      if (!fieldNames.contains(KAFKA_SOURCE_OFFSET_COLUMN)) {
+        newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, 
Schema.create(Schema.Type.LONG), "offset column", 0));
+      }
+      if (!fieldNames.contains(KAFKA_SOURCE_PARTITION_COLUMN)) {
+        newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, 
Schema.create(Schema.Type.INT), "partition column", 0));
+      }
+      if (!fieldNames.contains(KAFKA_SOURCE_TIMESTAMP_COLUMN)) {
+        newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
Schema.create(Schema.Type.LONG), "timestamp column", 0));
+      }
+      if (!fieldNames.contains(KAFKA_SOURCE_KEY_COLUMN)) {
+        newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, 
createNullableSchema(Schema.Type.STRING), "kafka key column", 
JsonProperties.NULL_VALUE));
+      }
+      return Schema.createRecord(schema.getName() + "_processed", 
schema.getDoc(), schema.getNamespace(), false, newFieldList);
     } catch (Exception e) {
       throw new HoodieSchemaException("Kafka offset post processor failed with 
schema: " + schema, e);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 6e95a315260..c8c3b3421c6 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -93,7 +93,9 @@ public class JsonKafkaSource extends KafkaSource<String> {
             jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
             jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, 
consumerRecord.partition());
             jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
consumerRecord.timestamp());
-            jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey);
+            if (recordKey != null) {
+              jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey);
+            }
             stringList.add(om.writeValueAsString(jsonNode));
           } catch (Throwable e) {
             stringList.add(recordValue);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestKafkaOffsetPostProcessor.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestKafkaOffsetPostProcessor.java
new file mode 100644
index 00000000000..aac441609ca
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestKafkaOffsetPostProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestKafkaOffsetPostProcessor {
+  private static final List<String>
+      EXPECTED_FIELD_NAMES = Arrays.asList("existing_field", 
"_hoodie_kafka_source_offset", "_hoodie_kafka_source_partition", 
"_hoodie_kafka_source_timestamp", "_hoodie_kafka_source_key");
+
+  @ParameterizedTest
+  @MethodSource("cases")
+  void testProcessSchema(Schema inputSchema) {
+    KafkaOffsetPostProcessor kafkaOffsetPostProcessor = new 
KafkaOffsetPostProcessor(null, null);
+    Schema actual = kafkaOffsetPostProcessor.processSchema(inputSchema);
+    List<String> actualFieldNames = 
actual.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+    assertEquals(EXPECTED_FIELD_NAMES, actualFieldNames);
+  }
+
+  private static Stream<Arguments> cases() {
+    String offsetField = "{\"name\": \"_hoodie_kafka_source_offset\", 
\"type\": \"long\", \"doc\": \"offset column\", \"default\": 0}";
+    String partitionField = "{\"name\": \"_hoodie_kafka_source_partition\", 
\"type\": \"int\", \"doc\": \"partition column\", \"default\": 0}";
+    String timestampField = "{\"name\": \"_hoodie_kafka_source_timestamp\", 
\"type\": \"long\", \"doc\": \"timestamp column\", \"default\": 0}";
+    String keyField = "{\"name\": \"_hoodie_kafka_source_key\", \"type\": 
[\"null\", \"string\"], \"doc\": \"kafka key column\", \"default\": null}";
+    return Stream.of(
+        Arguments.of(new Schema.Parser().parse("{\"type\": \"record\", 
\"name\": \"example\", \"fields\": [{\"name\": \"existing_field\", \"type\": 
\"string\"}]}")),
+        Arguments.of(new Schema.Parser().parse("{\"type\": \"record\", 
\"name\": \"example\", \"fields\": [{\"name\": \"existing_field\", \"type\": 
\"string\"}, "
+                + offsetField + "]}")),
+        Arguments.of(new Schema.Parser().parse("{\"type\": \"record\", 
\"name\": \"example\", \"fields\": [{\"name\": \"existing_field\", \"type\": 
\"string\"}, "
+                + offsetField + ", " + partitionField + "]}")),
+        Arguments.of(
+            new Schema.Parser().parse("{\"type\": \"record\", \"name\": 
\"example\", \"fields\": [{\"name\": \"existing_field\", \"type\": \"string\"}, 
"
+                + offsetField + ", " + partitionField + ", " + timestampField 
+ "]}")),
+        Arguments.of(
+            new Schema.Parser().parse("{\"type\": \"record\", \"name\": 
\"example\", \"fields\": [{\"name\": \"existing_field\", \"type\": \"string\"}, 
"
+                + offsetField + ", " + partitionField + ", " + timestampField 
+ ", " + keyField + "]}"))
+    );
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 166d419001d..398c509d8e0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -352,7 +352,10 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, 
metrics);
     kafkaSource = new SourceFormatAdapter(jsonSource);
     Dataset<Row> dfWithOffsetInfoAndNullKafkaKey = 
kafkaSource.fetchNewDataInRowFormat(Option.empty(), 
Long.MAX_VALUE).getBatch().get().cache();
+    // total of 2 * numMessages are in the topic at this point, half with a 
key and half with a null key. All should have the source offset.
     assertEquals(numMessages, 
dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_key is 
null").count());
+    assertEquals(numMessages, 
dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_key is not 
null").count());
+    assertEquals(numMessages * 2, 
dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_offset is 
not null").count());
 
     dfNoOffsetInfo.unpersist();
     dfWithOffsetInfo.unpersist();

Reply via email to