This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bf5a52e [HUDI-2320] Add support ByteArrayDeserializer in
AvroKafkaSource (#3502)
bf5a52e is described below
commit bf5a52e51bbeaa089995335a0a4c55884792e505
Author: 董可伦 <[email protected]>
AuthorDate: Mon Aug 30 10:01:15 2021 +0800
[HUDI-2320] Add support ByteArrayDeserializer in AvroKafkaSource (#3502)
---
hudi-utilities/pom.xml | 2 +-
.../hudi/utilities/sources/AvroKafkaSource.java | 22 ++++++++++++++++++----
2 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 4dcc966..089b780 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -254,7 +254,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_${scala.binary.version}</artifactId>
- <version>0.9.3</version>
+ <version>0.9.7</version>
</dependency>
<!-- Kafka -->
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 500c412..ff8ea5a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -26,11 +26,13 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -55,13 +57,15 @@ public class AvroKafkaSource extends AvroSource {
private final KafkaOffsetGen offsetGen;
private final HoodieDeltaStreamerMetrics metrics;
+ private final SchemaProvider schemaProvider;
+ private final String deserializerClassName;
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
- String deserializerClassName =
props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
+ deserializerClassName =
props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
try {
@@ -78,6 +82,7 @@ public class AvroKafkaSource extends AvroSource {
throw new HoodieException(error, e);
}
+ this.schemaProvider = schemaProvider;
this.metrics = metrics;
offsetGen = new KafkaOffsetGen(props);
}
@@ -91,12 +96,21 @@ public class AvroKafkaSource extends AvroSource {
return new InputBatch<>(Option.empty(),
CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
- return new InputBatch<>(Option.of(newDataRDD),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+ return new InputBatch<>(Option.of(newDataRDD),
CheckpointUtils.offsetsToStr(offsetRanges));
}
private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
- return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(),
offsetRanges,
- LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord)
obj.value());
+ if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
+ if (schemaProvider == null) {
+ throw new HoodieException("Please provide a valid schema provider
class when use ByteArrayDeserializer!");
+ }
+ AvroConvertor convertor = new
AvroConvertor(schemaProvider.getSourceSchema());
+ return KafkaUtils.<String, byte[]>createRDD(sparkContext,
offsetGen.getKafkaParams(), offsetRanges,
+ LocationStrategies.PreferConsistent()).map(obj ->
convertor.fromAvroBinary(obj.value()));
+ } else {
+ return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(),
offsetRanges,
+ LocationStrategies.PreferConsistent()).map(obj ->
(GenericRecord) obj.value());
+ }
}
@Override