This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d36e52694984239f739bdad4362e85e644fa40e1 Author: Tim Brown <[email protected]> AuthorDate: Fri Mar 1 15:50:46 2024 -0600 [HUDI-7464] Fix minor bugs in kafka post-processing related code (#10772) --- .../utilities/schema/KafkaOffsetPostProcessor.java | 35 ++++++++---- .../hudi/utilities/sources/JsonKafkaSource.java | 4 +- .../schema/TestKafkaOffsetPostProcessor.java | 65 ++++++++++++++++++++++ .../utilities/sources/TestJsonKafkaSource.java | 3 + 4 files changed, 94 insertions(+), 13 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java index 500bb0c7f99..294838a435f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java @@ -18,18 +18,18 @@ package org.apache.hudi.utilities.schema; -import org.apache.avro.JsonProperties; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.utilities.config.HoodieStreamerConfig; +import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; @@ -51,8 +51,6 @@ public class KafkaOffsetPostProcessor extends SchemaPostProcessor { } } - private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetPostProcessor.class); - public static final String KAFKA_SOURCE_OFFSET_COLUMN = "_hoodie_kafka_source_offset"; public static final String KAFKA_SOURCE_PARTITION_COLUMN = "_hoodie_kafka_source_partition"; public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN = "_hoodie_kafka_source_timestamp"; @@ -65,16 +63,29 @@ public class KafkaOffsetPostProcessor extends SchemaPostProcessor { @Override public Schema processSchema(Schema schema) { // this method adds kafka offset fields namely source offset, partition, timestamp and kafka message key to the schema of the batch. + List<Schema.Field> fieldList = schema.getFields(); + Set<String> fieldNames = fieldList.stream().map(Schema.Field::name).collect(Collectors.toSet()); + // if the source schema already contains the kafka offset fields, then return the schema as is. + if (fieldNames.containsAll(Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, KAFKA_SOURCE_KEY_COLUMN))) { + return schema; + } try { - List<Schema.Field> fieldList = schema.getFields(); List<Schema.Field> newFieldList = fieldList.stream() .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())).collect(Collectors.toList()); - newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, Schema.create(Schema.Type.LONG), "offset column", 0)); - newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, Schema.create(Schema.Type.INT), "partition column", 0)); - newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, Schema.create(Schema.Type.LONG), "timestamp column", 0)); - newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, createNullableSchema(Schema.Type.STRING), "kafka key column", JsonProperties.NULL_VALUE)); - Schema newSchema = Schema.createRecord(schema.getName() + "_processed", schema.getDoc(), schema.getNamespace(), false, newFieldList); - return newSchema; + // handle case where source schema provider may have already set 1 or more of these fields + if (!fieldNames.contains(KAFKA_SOURCE_OFFSET_COLUMN)) { + newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, Schema.create(Schema.Type.LONG), "offset column", 0)); + } + if (!fieldNames.contains(KAFKA_SOURCE_PARTITION_COLUMN)) { + newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, Schema.create(Schema.Type.INT), "partition column", 0)); + } + if (!fieldNames.contains(KAFKA_SOURCE_TIMESTAMP_COLUMN)) { + newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, Schema.create(Schema.Type.LONG), "timestamp column", 0)); + } + if (!fieldNames.contains(KAFKA_SOURCE_KEY_COLUMN)) { + newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, createNullableSchema(Schema.Type.STRING), "kafka key column", JsonProperties.NULL_VALUE)); + } + return Schema.createRecord(schema.getName() + "_processed", schema.getDoc(), schema.getNamespace(), false, newFieldList); } catch (Exception e) { throw new HoodieSchemaException("Kafka offset post processor failed with schema: " + schema, e); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index 6e95a315260..c8c3b3421c6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -93,7 +93,9 @@ public class JsonKafkaSource extends KafkaSource<String> { jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset()); jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition()); jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp()); - jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey); + if (recordKey != null) { + jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey); + } stringList.add(om.writeValueAsString(jsonNode)); } catch (Throwable e) { stringList.add(recordValue); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestKafkaOffsetPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestKafkaOffsetPostProcessor.java new file mode 100644 index 00000000000..aac441609ca --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestKafkaOffsetPostProcessor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.avro.Schema; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TestKafkaOffsetPostProcessor { + private static final List<String> + EXPECTED_FIELD_NAMES = Arrays.asList("existing_field", "_hoodie_kafka_source_offset", "_hoodie_kafka_source_partition", "_hoodie_kafka_source_timestamp", "_hoodie_kafka_source_key"); + + @ParameterizedTest + @MethodSource("cases") + void testProcessSchema(Schema inputSchema) { + KafkaOffsetPostProcessor kafkaOffsetPostProcessor = new KafkaOffsetPostProcessor(null, null); + Schema actual = kafkaOffsetPostProcessor.processSchema(inputSchema); + List<String> actualFieldNames = actual.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + assertEquals(EXPECTED_FIELD_NAMES, actualFieldNames); + } + + private static Stream<Arguments> cases() { + String offsetField = "{\"name\": \"_hoodie_kafka_source_offset\", \"type\": \"long\", \"doc\": \"offset column\", \"default\": 0}"; + String partitionField = "{\"name\": \"_hoodie_kafka_source_partition\", \"type\": \"int\", \"doc\": \"partition column\", \"default\": 0}"; + String timestampField = "{\"name\": \"_hoodie_kafka_source_timestamp\", \"type\": \"long\", \"doc\": \"timestamp column\", \"default\": 0}"; + String keyField = "{\"name\": \"_hoodie_kafka_source_key\", \"type\": [\"null\", \"string\"], \"doc\": \"kafka key column\", \"default\": null}"; + return Stream.of( + Arguments.of(new Schema.Parser().parse("{\"type\": \"record\", \"name\": \"example\", \"fields\": [{\"name\": \"existing_field\", \"type\": \"string\"}]}")), + Arguments.of(new Schema.Parser().parse("{\"type\": \"record\", \"name\": \"example\", \"fields\": [{\"name\": \"existing_field\", \"type\": \"string\"}, " + + offsetField + "]}")), + Arguments.of(new Schema.Parser().parse("{\"type\": \"record\", \"name\": \"example\", \"fields\": [{\"name\": \"existing_field\", \"type\": \"string\"}, " + + offsetField + ", " + partitionField + "]}")), + Arguments.of( + new Schema.Parser().parse("{\"type\": \"record\", \"name\": \"example\", \"fields\": [{\"name\": \"existing_field\", \"type\": \"string\"}, " + + offsetField + ", " + partitionField + ", " + timestampField + "]}")), + Arguments.of( + new Schema.Parser().parse("{\"type\": \"record\", \"name\": \"example\", \"fields\": [{\"name\": \"existing_field\", \"type\": \"string\"}, " + + offsetField + ", " + partitionField + ", " + timestampField + ", " + keyField + "]}")) + ); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 166d419001d..398c509d8e0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -352,7 +352,10 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); kafkaSource = new SourceFormatAdapter(jsonSource); Dataset<Row> dfWithOffsetInfoAndNullKafkaKey = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get().cache(); + // total of 2 * numMessages are in the topic at this point, half with a key and half with a null key. All should have the source offset. assertEquals(numMessages, dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_key is null").count()); + assertEquals(numMessages, dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_key is not null").count()); + assertEquals(numMessages * 2, dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_offset is not null").count()); dfNoOffsetInfo.unpersist(); dfWithOffsetInfo.unpersist();
