This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new aa5bb0dda34 [HUDI-4732] Add support for confluent schema registry with 
proto (#11070)
aa5bb0dda34 is described below

commit aa5bb0dda34bf643d61e96f51a456cf876c0a0eb
Author: Tim Brown <[email protected]>
AuthorDate: Sun May 12 19:59:45 2024 -0400

    [HUDI-4732] Add support for confluent schema registry with proto (#11070)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 hudi-utilities/pom.xml                             |  7 +--
 .../hudi/utilities/config/KafkaSourceConfig.java   |  8 +++
 .../deser/KafkaAvroSchemaDeserializer.java         |  4 +-
 .../schema/ProtoClassBasedSchemaProvider.java      | 10 +---
 .../ProtoSchemaToAvroSchemaConverter.java          | 43 +++++++++++++++
 .../hudi/utilities/sources/ProtoKafkaSource.java   | 40 ++++++++++----
 .../sources/helpers/ProtoConversionUtil.java       | 56 +++++++++++++++++--
 .../deser/TestKafkaAvroSchemaDeserializer.java     |  8 +--
 .../TestProtoSchemaToAvroSchemaConverter.java      | 50 +++++++++++++++++
 .../utilities/sources/TestProtoKafkaSource.java    | 63 ++++++++++++++++++++--
 packaging/hudi-utilities-bundle/pom.xml            |  1 +
 packaging/hudi-utilities-slim-bundle/pom.xml       |  1 +
 pom.xml                                            | 34 +++++++++++-
 13 files changed, 288 insertions(+), 37 deletions(-)

diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 3a7a9d6a712..47c172b7791 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -361,12 +361,10 @@
     <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>kafka-avro-serializer</artifactId>
-      <version>${confluent.version}</version>
     </dependency>
     <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>common-config</artifactId>
-      <version>${confluent.version}</version>
     </dependency>
     <dependency>
       <groupId>io.confluent</groupId>
@@ -376,7 +374,10 @@
     <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>kafka-schema-registry-client</artifactId>
-      <version>${confluent.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-protobuf-serializer</artifactId>
     </dependency>
 
     <!-- Httpcomponents -->
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
index 024712f8cdd..6215e99d665 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
@@ -24,6 +24,8 @@ import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
 import javax.annotation.concurrent.Immutable;
 
 import static 
org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX;
@@ -120,6 +122,12 @@ public class KafkaSourceConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Kafka consumer strategy for reading data.");
 
+  public static final ConfigProperty<String> 
KAFKA_PROTO_VALUE_DESERIALIZER_CLASS = ConfigProperty
+      .key(PREFIX + "proto.value.deserializer.class")
+      .defaultValue(ByteArrayDeserializer.class.getName())
+      .sinceVersion("0.15.0")
+      .withDocumentation("Kafka Proto Payload Deserializer Class");
+
   /**
    * Kafka reset offset strategies.
    */
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
index 246be5f8ec6..4673eceed15 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
@@ -60,7 +60,6 @@ public class KafkaAvroSchemaDeserializer extends 
KafkaAvroDeserializer {
   /**
    * We need to inject sourceSchema instead of reader schema during 
deserialization or later stages of the pipeline.
    *
-   * @param includeSchemaAndVersion
    * @param topic
    * @param isKey
    * @param payload
@@ -70,13 +69,12 @@ public class KafkaAvroSchemaDeserializer extends 
KafkaAvroDeserializer {
    */
   @Override
   protected Object deserialize(
-      boolean includeSchemaAndVersion,
       String topic,
       Boolean isKey,
       byte[] payload,
       Schema readerSchema)
       throws SerializationException {
-    return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, 
sourceSchema);
+    return super.deserialize(topic, isKey, payload, sourceSchema);
   }
 
   protected TypedProperties getConvertToTypedProperties(Map<String, ?> 
configs) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
index 7d6981efb40..a4b485e1634 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
@@ -32,13 +32,8 @@ import org.apache.spark.api.java.JavaSparkContext;
 import java.util.Collections;
 
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
-import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
-import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static 
org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME;
-import static 
org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH;
-import static 
org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS;
-import static 
org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS;
 
 /**
  * A schema provider that takes in a class name for a generated protobuf class 
that is on the classpath.
@@ -75,10 +70,7 @@ public class ProtoClassBasedSchemaProvider extends 
SchemaProvider {
     super(props, jssc);
     checkRequiredConfigProperties(props, 
Collections.singletonList(PROTO_SCHEMA_CLASS_NAME));
     String className = getStringWithAltKeys(config, PROTO_SCHEMA_CLASS_NAME);
-    boolean wrappedPrimitivesAsRecords = getBooleanWithAltKeys(props, 
PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS);
-    int maxRecursionDepth = getIntWithAltKeys(props, 
PROTO_SCHEMA_MAX_RECURSION_DEPTH);
-    boolean timestampsAsRecords = getBooleanWithAltKeys(props, 
PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS);
-    ProtoConversionUtil.SchemaConfig schemaConfig = new 
ProtoConversionUtil.SchemaConfig(wrappedPrimitivesAsRecords, maxRecursionDepth, 
timestampsAsRecords);
+    ProtoConversionUtil.SchemaConfig schemaConfig = 
ProtoConversionUtil.SchemaConfig.fromProperties(props);
     try {
       schemaString = 
ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className),
 schemaConfig).toString();
     } catch (Exception e) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
new file mode 100644
index 00000000000..78ef25e9a04
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema.converter;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
+import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil;
+
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+
+import java.io.IOException;
+
+/**
+ * Converts a protobuf schema from the schema registry to an Avro schema.
+ */
+public class ProtoSchemaToAvroSchemaConverter implements 
SchemaRegistryProvider.SchemaConverter {
+  private final ProtoConversionUtil.SchemaConfig schemaConfig;
+
+  public ProtoSchemaToAvroSchemaConverter(TypedProperties config) {
+    this.schemaConfig = 
ProtoConversionUtil.SchemaConfig.fromProperties(config);
+  }
+
+  @Override
+  public String convert(String schema) throws IOException {
+    ProtobufSchema protobufSchema = new ProtobufSchema(schema);
+    return 
ProtoConversionUtil.getAvroSchemaForMessageDescriptor(protobufSchema.toDescriptor(),
 schemaConfig).toString();
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 1dc731b5f95..a56c991bebd 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -19,9 +19,12 @@
 package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
@@ -31,6 +34,8 @@ import 
org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.StreamContext;
 
 import com.google.protobuf.Message;
+import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.spark.api.java.JavaRDD;
@@ -52,8 +57,8 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
  * Reads protobuf serialized Kafka data, based on a provided class name.
  */
 public class ProtoKafkaSource extends KafkaSource<JavaRDD<Message>> {
-
-  private final String className;
+  private final Option<String> className;
+  private final String deserializerName;
 
   public ProtoKafkaSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
                           SchemaProvider schemaProvider, 
HoodieIngestionMetrics metrics) {
@@ -63,11 +68,18 @@ public class ProtoKafkaSource extends 
KafkaSource<JavaRDD<Message>> {
   public ProtoKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, 
StreamContext streamContext) {
     super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics,
         new 
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(),
 properties, sparkContext), streamContext.getSourceProfileSupplier()));
-    checkRequiredConfigProperties(props, Collections.singletonList(
-        ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
-    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
-    props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, 
ByteArrayDeserializer.class);
-    className = getStringWithAltKeys(props, 
ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME);
+    this.deserializerName = ConfigUtils.getStringWithAltKeys(props, 
KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS, true);
+    if (!deserializerName.equals(ByteArrayDeserializer.class.getName()) && 
!deserializerName.equals(KafkaProtobufDeserializer.class.getName())) {
+      throw new HoodieReadFromSourceException("Only ByteArrayDeserializer and 
KafkaProtobufDeserializer are supported for ProtoKafkaSource");
+    }
+    if (deserializerName.equals(ByteArrayDeserializer.class.getName())) {
+      checkRequiredConfigProperties(props, 
Collections.singletonList(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
+      className = Option.of(getStringWithAltKeys(props, 
ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
+    } else {
+      className = Option.empty();
+    }
+    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, 
StringDeserializer.class.getName());
+    props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, deserializerName);
     this.offsetGen = new KafkaOffsetGen(props);
     if (this.shouldAddOffsets) {
       throw new HoodieReadFromSourceException("Appending kafka offsets to 
ProtoKafkaSource is not supported");
@@ -76,9 +88,17 @@ public class ProtoKafkaSource extends 
KafkaSource<JavaRDD<Message>> {
 
   @Override
   protected JavaRDD<Message> toBatch(OffsetRange[] offsetRanges) {
-    ProtoDeserializer deserializer = new ProtoDeserializer(className);
-    return KafkaUtils.<String, byte[]>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
-        LocationStrategies.PreferConsistent()).map(obj -> 
deserializer.parse(obj.value()));
+    if (deserializerName.equals(ByteArrayDeserializer.class.getName())) {
+      ValidationUtils.checkArgument(
+          className.isPresent(),
+          ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key() + 
" config must be present.");
+      ProtoDeserializer deserializer = new ProtoDeserializer(className.get());
+      return KafkaUtils.<String, byte[]>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
+          LocationStrategies.PreferConsistent()).map(obj -> 
deserializer.parse(obj.value()));
+    } else {
+      return KafkaUtils.<String, Message>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
+          LocationStrategies.PreferConsistent()).map(ConsumerRecord::value);
+    }
   }
 
   private static class ProtoDeserializer implements Serializable {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
index cf8532d65c8..c16c7e085cb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
@@ -17,15 +17,18 @@
 
 package org.apache.hudi.utilities.sources.helpers;
 
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import com.google.protobuf.BoolValue;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.DoubleValue;
+import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.FloatValue;
 import com.google.protobuf.Int32Value;
 import com.google.protobuf.Int64Value;
@@ -56,7 +59,12 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static 
org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH;
+import static 
org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS;
+import static 
org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS;
 
 /**
  * A utility class to help translate from Proto to Avro.
@@ -74,6 +82,17 @@ public class ProtoConversionUtil {
     return new AvroSupport(schemaConfig).getSchema(clazz);
   }
 
+  /**
+   * Creates an Avro {@link Schema} for the provided {@link 
Descriptors.Descriptor}.
+   * Intended for use when the descriptor is provided by an external registry.
+   * @param descriptor The protobuf descriptor
+   * @param schemaConfig configuration used to determine how to handle 
particular cases when converting from the proto schema
+   * @return An Avro schema
+   */
+  public static Schema 
getAvroSchemaForMessageDescriptor(Descriptors.Descriptor descriptor, 
SchemaConfig schemaConfig) {
+    return new AvroSupport(schemaConfig).getSchema(descriptor);
+  }
+
   /**
    * Converts the provided {@link Message} into an avro {@link GenericRecord} 
with the provided schema.
    * @param schema target schema to convert into
@@ -101,6 +120,13 @@ public class ProtoConversionUtil {
       this.timestampsAsRecords = timestampsAsRecords;
     }
 
+    public static SchemaConfig fromProperties(TypedProperties props) {
+      boolean wrappedPrimitivesAsRecords = getBooleanWithAltKeys(props, 
PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS);
+      int maxRecursionDepth = getIntWithAltKeys(props, 
PROTO_SCHEMA_MAX_RECURSION_DEPTH);
+      boolean timestampsAsRecords = getBooleanWithAltKeys(props, 
PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS);
+      return new ProtoConversionUtil.SchemaConfig(wrappedPrimitivesAsRecords, 
maxRecursionDepth, timestampsAsRecords);
+    }
+
     public boolean isWrappedPrimitivesAsRecords() {
       return wrappedPrimitivesAsRecords;
     }
@@ -157,11 +183,11 @@ public class ProtoConversionUtil {
       this.timestampsAsRecords = schemaConfig.isTimestampsAsRecords();
     }
 
-    public static GenericRecord convert(Schema schema, Message message) {
+    static GenericRecord convert(Schema schema, Message message) {
       return (GenericRecord) convertObject(schema, message);
     }
 
-    public Schema getSchema(Class c) {
+    Schema getSchema(Class c) {
       return SCHEMA_CACHE.computeIfAbsent(new SchemaCacheKey(c, 
wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords), key -> {
         try {
           Object descriptor = c.getMethod("getDescriptor").invoke(null);
@@ -177,6 +203,16 @@ public class ProtoConversionUtil {
       });
     }
 
+    /**
+     * Translates a Proto Message descriptor into an Avro Schema.
+     * Does not cache since external system may evolve the schema and that can 
result in a stale version of the avro schema.
+     * @param descriptor the descriptor for the proto message
+     * @return an avro schema
+     */
+    Schema getSchema(Descriptors.Descriptor descriptor) {
+      return getMessageSchema(descriptor, new CopyOnWriteMap<>(), 
getNamespace(descriptor.getFullName()));
+    }
+
     private Schema getEnumSchema(Descriptors.EnumDescriptor enumDescriptor) {
       List<String> symbols = new 
ArrayList<>(enumDescriptor.getValues().size());
       for (Descriptors.EnumValueDescriptor valueDescriptor : 
enumDescriptor.getValues()) {
@@ -402,7 +438,21 @@ public class ProtoConversionUtil {
           if (value instanceof Message) {
             // check if this is a Timestamp
             if 
(LogicalTypes.timestampMicros().equals(schema.getLogicalType())) {
-              return Timestamps.toMicros((Timestamp) value);
+              if (value instanceof Timestamp) {
+                return Timestamps.toMicros((Timestamp) value);
+              } else if (value instanceof DynamicMessage) {
+                Timestamp.Builder builder = Timestamp.newBuilder();
+                ((DynamicMessage) 
value).getAllFields().forEach((fieldDescriptor, fieldValue) -> {
+                  if 
(fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.seconds")) {
+                    builder.setSeconds((Long) fieldValue);
+                  } else if 
(fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.nanos")) {
+                    builder.setNanos((Integer) fieldValue);
+                  }
+                });
+                return Timestamps.toMicros(builder.build());
+              } else {
+                throw new HoodieSchemaException("Unexpected message type while 
handling timestamps: " + value.getClass().getName());
+              }
             } else {
               tmpValue = getWrappedValue(value);
             }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java
index 16d190ac45d..4fa582209ae 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java
@@ -93,7 +93,7 @@ public class TestKafkaAvroSchemaDeserializer {
   }
 
   /**
-   * Tests {@link KafkaAvroSchemaDeserializer#deserialize(Boolean, String, 
Boolean, byte[], Schema)}.
+   * Tests {@link KafkaAvroSchemaDeserializer#deserialize(String, Boolean, 
byte[], Schema)}.
    */
   @Test
   public void testKafkaAvroSchemaDeserializer() {
@@ -105,7 +105,7 @@ public class TestKafkaAvroSchemaDeserializer {
     avroDeserializer.configure(new HashMap(config), false);
     bytesOrigRecord = avroSerializer.serialize(topic, avroRecord);
     // record is serialized in orig schema and deserialized using same schema.
-    assertEquals(avroRecord, avroDeserializer.deserialize(false, topic, false, 
bytesOrigRecord, origSchema));
+    assertEquals(avroRecord, avroDeserializer.deserialize(topic, false, 
bytesOrigRecord, origSchema));
 
     IndexedRecord avroRecordWithAllField = createExtendUserRecord();
     byte[] bytesExtendedRecord = avroSerializer.serialize(topic, 
avroRecordWithAllField);
@@ -115,12 +115,12 @@ public class TestKafkaAvroSchemaDeserializer {
     avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new 
HashMap(config));
     avroDeserializer.configure(new HashMap(config), false);
     // record is serialized w/ evolved schema, and deserialized w/ evolved 
schema
-    IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) 
avroDeserializer.deserialize(false, topic, false, bytesExtendedRecord, 
evolSchema);
+    IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) 
avroDeserializer.deserialize(topic, false, bytesExtendedRecord, evolSchema);
     assertEquals(avroRecordWithAllField, avroRecordWithAllFieldActual);
     assertEquals(avroRecordWithAllFieldActual.getSchema(), evolSchema);
 
     // read old record w/ evolved schema.
-    IndexedRecord actualRec = (IndexedRecord) 
avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema);
+    IndexedRecord actualRec = (IndexedRecord) 
avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema);
     // record won't be equal to original record as we read w/ evolved schema. 
"age" will be added w/ default value of null
     assertNotEquals(avroRecord, actualRec);
     GenericRecord genericRecord = (GenericRecord) actualRec;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
new file mode 100644
index 00000000000..fed4bc5e0ed
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema.converter;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
+import org.apache.hudi.utilities.test.proto.Parent;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestProtoSchemaToAvroSchemaConverter {
+  @Test
+  void testConvert() throws Exception {
+    TypedProperties properties = new TypedProperties();
+    
properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(),
 Parent.class.getName());
+    Schema.Parser parser = new Schema.Parser();
+    String actual = new 
ProtoSchemaToAvroSchemaConverter(properties).convert(getProtoSchemaString());
+    Schema actualSchema = new Schema.Parser().parse(actual);
+
+    Schema expectedSchema = 
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive_default_limit.avsc"));
+    assertEquals(expectedSchema, actualSchema);
+  }
+
+  private String getProtoSchemaString() throws IOException, URISyntaxException 
{
+    return new 
String(Files.readAllBytes(Paths.get(getClass().getClassLoader().getResource("schema-provider/proto/recursive.proto").toURI())));
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index 662cd1dd985..b63c7c29a24 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -24,6 +24,7 @@ import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
 import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.test.proto.Nested;
@@ -37,10 +38,14 @@ import com.google.protobuf.DoubleValue;
 import com.google.protobuf.FloatValue;
 import com.google.protobuf.Int32Value;
 import com.google.protobuf.Int64Value;
+import com.google.protobuf.Message;
 import com.google.protobuf.StringValue;
 import com.google.protobuf.UInt32Value;
 import com.google.protobuf.UInt64Value;
+import com.google.protobuf.util.JsonFormat;
 import com.google.protobuf.util.Timestamps;
+import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
+import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -55,6 +60,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -64,13 +70,16 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Tests against {@link ProtoKafkaSource}.
  */
 public class TestProtoKafkaSource extends BaseTestKafkaSource {
+  private static final JsonFormat.Printer PRINTER = 
JsonFormat.printer().omittingInsignificantWhitespace();
   private static final Random RANDOM = new Random();
+  private static final String MOCK_REGISTRY_URL = "mock://127.0.0.1:8081";
 
   protected TypedProperties createPropsForKafkaSource(String topic, Long 
maxEventsToReadFromKafkaSource, String resetStrategy) {
     TypedProperties props = new TypedProperties();
@@ -93,6 +102,28 @@ public class TestProtoKafkaSource extends 
BaseTestKafkaSource {
     return new SourceFormatAdapter(protoKafkaSource);
   }
 
+  @Test
+  public void testProtoKafkaSourceWithConfluentProtoDeserialization() {
+    final String topic = TEST_TOPIC_PREFIX + 
"testProtoKafkaSourceWithConfluentDeserializer";
+    testUtils.createTopic(topic, 2);
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+    props.put(KAFKA_PROTO_VALUE_DESERIALIZER_CLASS.key(),
+        "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
+    props.put("schema.registry.url", MOCK_REGISTRY_URL);
+    props.put("hoodie.streamer.schemaprovider.registry.url", 
MOCK_REGISTRY_URL);
+    
props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(),
 "true");
+    // class name is not required so we'll remove it
+    
props.remove(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
+    SchemaProvider schemaProvider = new SchemaRegistryProvider(props, jsc());
+    ProtoKafkaSource protoKafkaSource = new ProtoKafkaSource(props, jsc(), 
spark(), schemaProvider, metrics);
+    List<Sample> messages = createSampleMessages(1000);
+    sendMessagesToKafkaWithConfluentSerializer(topic, 2, messages);
+    // Assert messages are read correctly
+    JavaRDD<Message> messagesRead = protoKafkaSource.fetchNext(Option.empty(), 
1000).getBatch().get();
+    
assertEquals(messages.stream().map(this::protoToJson).collect(Collectors.toSet()),
+        new HashSet<>(messagesRead.map(message -> 
PRINTER.print(message)).collect()));
+  }
+
   @Test
   public void testProtoKafkaSourceWithFlattenWrappedPrimitives() {
 
@@ -196,7 +227,7 @@ public class TestProtoKafkaSource extends 
BaseTestKafkaSource {
   @Override
   protected void sendMessagesToKafka(String topic, int count, int 
numPartitions) {
     List<Sample> messages = createSampleMessages(count);
-    try (Producer<String, byte[]> producer = new 
KafkaProducer<>(getProducerProperties())) {
+    try (Producer<String, byte[]> producer = new 
KafkaProducer<>(getProducerProperties(false))) {
       for (int i = 0; i < messages.size(); i++) {
         // use consistent keys to get even spread over partitions for test 
expectations
         producer.send(new ProducerRecord<>(topic, Integer.toString(i % 
numPartitions), messages.get(i).toByteArray()));
@@ -204,14 +235,38 @@ public class TestProtoKafkaSource extends 
BaseTestKafkaSource {
     }
   }
 
-  private Properties getProducerProperties() {
+  private void sendMessagesToKafkaWithConfluentSerializer(String topic, int 
numPartitions, List<Sample> messages) {
+    try (Producer<String, Message> producer = new 
KafkaProducer<>(getProducerProperties(true))) {
+      for (int i = 0; i < messages.size(); i++) {
+        // use consistent keys to get even spread over partitions for test 
expectations
+        producer.send(new ProducerRecord<>(topic, Integer.toString(i % 
numPartitions), messages.get(i)));
+      }
+    }
+  }
+
+  private Properties getProducerProperties(boolean 
useConfluentProtobufSerializer) {
     Properties props = new Properties();
     props.put("bootstrap.servers", testUtils.brokerAddress());
-    props.put("value.serializer", ByteArraySerializer.class.getName());
-    // Key serializer is required.
+    if (useConfluentProtobufSerializer) {
+      props.put("value.serializer", KafkaProtobufSerializer.class.getName());
+      props.put("value.deserializer", 
KafkaProtobufDeserializer.class.getName());
+      props.put("schema.registry.url", MOCK_REGISTRY_URL);
+      props.put("auto.register.schemas", "true");
+    } else {
+      props.put("value.serializer", ByteArraySerializer.class.getName());
+      // Key serializer is required.
+    }
     props.put("key.serializer", StringSerializer.class.getName());
     // wait for all in-sync replicas to ack sends
     props.put("acks", "all");
     return props;
   }
+
+  private String protoToJson(Message input) {
+    try {
+      return PRINTER.print(input);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to convert proto to json", e);
+    }
+  }
 }
diff --git a/packaging/hudi-utilities-bundle/pom.xml 
b/packaging/hudi-utilities-bundle/pom.xml
index b992e5bbeb8..10c1cccd4e2 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -133,6 +133,7 @@
                   <include>io.confluent:common-config</include>
                   <include>io.confluent:common-utils</include>
                   <include>io.confluent:kafka-schema-registry-client</include>
+                  <include>io.confluent:kafka-protobuf-serializer</include>
                   <include>io.dropwizard.metrics:metrics-core</include>
                   <include>io.dropwizard.metrics:metrics-graphite</include>
                   <include>io.dropwizard.metrics:metrics-jmx</include>
diff --git a/packaging/hudi-utilities-slim-bundle/pom.xml 
b/packaging/hudi-utilities-slim-bundle/pom.xml
index 3919b103465..0a2c271e6ce 100644
--- a/packaging/hudi-utilities-slim-bundle/pom.xml
+++ b/packaging/hudi-utilities-slim-bundle/pom.xml
@@ -119,6 +119,7 @@
                   <include>io.confluent:common-config</include>
                   <include>io.confluent:common-utils</include>
                   <include>io.confluent:kafka-schema-registry-client</include>
+                  <include>io.confluent:kafka-protobuf-serializer</include>
                   <include>io.dropwizard.metrics:metrics-core</include>
                   <include>io.dropwizard.metrics:metrics-graphite</include>
                   <include>io.dropwizard.metrics:metrics-jmx</include>
diff --git a/pom.xml b/pom.xml
index 3b9cc55562a..2fd99672a2f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
     <pulsar.spark.scala11.version>2.4.5</pulsar.spark.scala11.version>
     <pulsar.spark.scala12.version>3.1.1.4</pulsar.spark.scala12.version>
     <pulsar.spark.scala13.version>3.4.1.1</pulsar.spark.scala13.version>
-    <confluent.version>5.3.4</confluent.version>
+    <confluent.version>5.5.0</confluent.version>
     <glassfish.version>2.17</glassfish.version>
     <glassfish.el.version>3.0.1-b12</glassfish.el.version>
     <parquet.version>1.10.1</parquet.version>
@@ -934,6 +934,11 @@
         <version>${glassfish.el.version}</version>
         <scope>provided</scope>
       </dependency>
+      <dependency>
+        <groupId>org.glassfish.jersey.ext</groupId>
+        <artifactId>jersey-bean-validation</artifactId>
+        <version>${glassfish.version}</version>
+      </dependency>
 
       <!-- Avro -->
       <dependency>
@@ -1772,6 +1777,33 @@
           </exclusion>
         </exclusions>
       </dependency>
+
+      <!-- Confluent -->
+      <dependency>
+        <groupId>io.confluent</groupId>
+        <artifactId>kafka-avro-serializer</artifactId>
+        <version>${confluent.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.confluent</groupId>
+        <artifactId>common-config</artifactId>
+        <version>${confluent.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.confluent</groupId>
+        <artifactId>common-utils</artifactId>
+        <version>${confluent.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.confluent</groupId>
+        <artifactId>kafka-schema-registry-client</artifactId>
+        <version>${confluent.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.confluent</groupId>
+        <artifactId>kafka-protobuf-serializer</artifactId>
+        <version>${confluent.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <repositories>


Reply via email to