yihua commented on code in PR #11070:
URL: https://github.com/apache/hudi/pull/11070#discussion_r1597357603
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java:
##########
@@ -63,11 +67,18 @@ public ProtoKafkaSource(TypedProperties props,
JavaSparkContext sparkContext, Sp
public ProtoKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics,
StreamContext streamContext) {
super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics,
new
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(),
properties, sparkContext), streamContext.getSourceProfileSupplier()));
- checkRequiredConfigProperties(props, Collections.singletonList(
- ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
- props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
- props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP,
ByteArrayDeserializer.class);
- className = getStringWithAltKeys(props,
ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME);
+ this.deserializerName = ConfigUtils.getStringWithAltKeys(props,
KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS, true);
+ if (!deserializerName.equals(ByteArrayDeserializer.class.getName()) &&
!deserializerName.equals(KafkaProtobufDeserializer.class.getName())) {
+ throw new HoodieReadFromSourceException("Only ByteArrayDeserializer and
KafkaProtobufDeserializer are supported for ProtoKafkaSource");
+ }
+ if (deserializerName.equals(ByteArrayDeserializer.class.getName())) {
+ checkRequiredConfigProperties(props,
Collections.singletonList(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
+ className = getStringWithAltKeys(props,
ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME);
+ } else {
+ className = null;
Review Comment:
Avoid using `null`
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java:
##########
@@ -64,21 +69,24 @@
import java.util.stream.IntStream;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Tests against {@link ProtoKafkaSource}.
*/
public class TestProtoKafkaSource extends BaseTestKafkaSource {
+ private static final JsonFormat.Printer PRINTER =
JsonFormat.printer().omittingInsignificantWhitespace();
private static final Random RANDOM = new Random();
+ private static final String MOCK_REGISTRY_URL = "mock://127.0.0.1:8081";
protected TypedProperties createPropsForKafkaSource(String topic, Long
maxEventsToReadFromKafkaSource, String resetStrategy) {
TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.streamer.source.kafka.topic", topic);
+ props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
Review Comment:
nit: should not be changed.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java:
##########
@@ -158,7 +187,7 @@ private static List<Sample> createSampleMessages(int count)
{
.setPrimitiveFixedSignedLong(RANDOM.nextLong())
.setPrimitiveBoolean(RANDOM.nextBoolean())
.setPrimitiveString(UUID.randomUUID().toString())
-
.setPrimitiveBytes(ByteString.copyFrom(getUTF8Bytes(UUID.randomUUID().toString())));
+
.setPrimitiveBytes(ByteString.copyFrom(UUID.randomUUID().toString().getBytes()));
Review Comment:
similar here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]