This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bf5a52e  [HUDI-2320] Add support ByteArrayDeserializer in 
AvroKafkaSource (#3502)
bf5a52e is described below

commit bf5a52e51bbeaa089995335a0a4c55884792e505
Author: 董可伦 <[email protected]>
AuthorDate: Mon Aug 30 10:01:15 2021 +0800

    [HUDI-2320] Add support ByteArrayDeserializer in AvroKafkaSource (#3502)
---
 hudi-utilities/pom.xml                             |  2 +-
 .../hudi/utilities/sources/AvroKafkaSource.java    | 22 ++++++++++++++++++----
 2 files changed, 19 insertions(+), 5 deletions(-)

diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 4dcc966..089b780 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -254,7 +254,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>bijection-avro_${scala.binary.version}</artifactId>
-      <version>0.9.3</version>
+      <version>0.9.7</version>
     </dependency>
 
     <!-- Kafka -->
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 500c412..ff8ea5a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -26,11 +26,13 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
 import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -55,13 +57,15 @@ public class AvroKafkaSource extends AvroSource {
 
   private final KafkaOffsetGen offsetGen;
   private final HoodieDeltaStreamerMetrics metrics;
+  private final SchemaProvider schemaProvider;
+  private final String deserializerClassName;
 
   public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
       SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
     super(props, sparkContext, sparkSession, schemaProvider);
 
     props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
-    String deserializerClassName = 
props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
+    deserializerClassName = 
props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
             
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
 
     try {
@@ -78,6 +82,7 @@ public class AvroKafkaSource extends AvroSource {
       throw new HoodieException(error, e);
     }
 
+    this.schemaProvider = schemaProvider;
     this.metrics = metrics;
     offsetGen = new KafkaOffsetGen(props);
   }
@@ -91,12 +96,21 @@ public class AvroKafkaSource extends AvroSource {
       return new InputBatch<>(Option.empty(), 
CheckpointUtils.offsetsToStr(offsetRanges));
     }
     JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
-    return new InputBatch<>(Option.of(newDataRDD), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+    return new InputBatch<>(Option.of(newDataRDD), 
CheckpointUtils.offsetsToStr(offsetRanges));
   }
 
   private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
-    return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), 
offsetRanges,
-            LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) 
obj.value());
+    if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
+      if (schemaProvider == null) {
+        throw new HoodieException("Please provide a valid schema provider 
class when use ByteArrayDeserializer!");
+      }
+      AvroConvertor convertor = new 
AvroConvertor(schemaProvider.getSourceSchema());
+      return KafkaUtils.<String, byte[]>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
+              LocationStrategies.PreferConsistent()).map(obj -> 
convertor.fromAvroBinary(obj.value()));
+    } else {
+      return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), 
offsetRanges,
+              LocationStrategies.PreferConsistent()).map(obj -> 
(GenericRecord) obj.value());
+    }
   }
 
   @Override

Reply via email to