YousifS7 opened a new issue, #12838:
URL: https://github.com/apache/hudi/issues/12838
Hello,
We are using `org.apache.hudi.utilities.streamer.HoodieStreamer` class to
extract data out of Kafka and write to Hudi table. The Kafka topic is populated
via Debezium using SQL Server table. The converter used in Debezium is Avro. We
are using EMR 7.6.0 to run Spark-Submit.
This works perfectly when using Hudi 0.15.0. However, after switching to
Hudi 1.0.1, we started encountering this error:
```
java.lang.ClassNotFoundException:
io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider
```
**To Reproduce**
Steps to reproduce the behavior:
1. Sync SQL Server table to Kafka via Debezium (AvroConverter)
2. Provision EMR 7.6.0
3. Run below Spark-Submit:
> -- Spark-Submit
```
spark-submit
--deploy-mode cluster
--master yarn
--conf spark.rdd.compress=true
--conf spark.shuffle.service.enabled=true
--conf spark.kryoserializer.buffer.max=512m
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.sql.shuffle.partitions=200
--conf spark.default.parallelism=200
--conf spark.streaming.kafka.allowNonConsecutiveOffsets=true
--jars
s3://some_bucket/libs/hudi-aws-bundle-1.0.1.jar,s3://some_bucket/libs/hudi-spark3.5-bundle_2.12-1.0.1.jar
--class org.apache.hudi.utilities.streamer.HoodieStreamer
s3://some_bucket/libs/hudi-utilities-slim-bundle_2.12-1.0.1.jar
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider
--props s3://path/to/file.properties
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--source-ordering-field ts
--target-base-path s3://path/to/file
--target-table some_table
--sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool
--op UPSERT
--transformer-class
org.apache.hudi.utilities.transform.SqlFileBasedTransformer
--enable-sync
--table-type COPY_ON_WRITE
```
> -- Properties File
```
hoodie.streamer.transformer.sql.file=s3://path/to/file.sql
hoodie.streamer.schemaprovider.registry.url=some_url
hoodie.streamer.schemaprovider.registry.targetUrl=some_url
schema.registry.url=some_ip:8081
hoodie.streamer.source.kafka.topic=some_topic
bootstrap.servers=some_ip:9092
auto.offset.reset=earliest
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.datasource.write.partitionpath.field=partition_path
hoodie.datasource.write.recordkey.field=hudi_key
hoodie.datasource.write.precombine.field=ts
hoodie.datasource.hive_sync.database=some_db_name
hoodie.streamer.kafka.source.maxEvents=1000000000000
hoodie.datasource.hive_sync.support_timestamp=true
hoodie.parquet.max.file.size=18874368
hoodie.copyonwrite.record.size.estimate=2048
hoodie.parquet.small.file.limit=104857600
hoodie.cleaner.fileversions.retained=5
```
> -- SQL Transfomer File
```
CACHE TABLE dbz_filtered AS
SELECT CONCAT(source.commit_lsn, ':', ifnull(source.change_lsn, 0), ':',
ifnull(source.event_serial_no, 0)) AS ts, CASE WHEN op = 'd' THEN before ELSE
after END AS source_fields, CASE WHEN op = 'd' THEN true ELSE false END AS
is_deleted FROM <SRC> WHERE op IN ('d', 'u', 'c', 'r');
SELECT ts, is_deleted, source_fields.*,
CONCAT(trim(source_fields.some_col1), source_fields.some_col2, latest) AS
hudi_key, from_unixtime(source_fields.some_col2/1000, 'yyyyMM') AS
partition_path FROM dbz_filtered;
```
**Environment Description**
* Hudi version : 1.0.1
* Spark version : 3.5.3
* Hive version : Glue Metastore
* Hadoop version : N/A
* Storage : S3
* Running on Docker? : No
**Error Message**
```
ERROR Client: Application diagnostics message: User class threw exception:
java.lang.NoClassDefFoundError:
io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider
at
org.apache.hudi.utilities.schema.SchemaRegistryProvider.lambda$new$1e9d4812$1(SchemaRegistryProvider.java:113)
at
org.apache.hudi.utilities.schema.SchemaRegistryProvider.fetchSchemaFromRegistry(SchemaRegistryProvider.java:173)
at
org.apache.hudi.utilities.schema.SchemaRegistryProvider.parseSchemaFromRegistry(SchemaRegistryProvider.java:141)
at
org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:252)
at
org.apache.hudi.utilities.streamer.SourceFormatAdapter.avroDataInRowFormat(SourceFormatAdapter.java:212)
at
org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:238)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:639)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:582)
at
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:554)
at
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:464)
at
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:911)
at
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:226)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:646)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741)
Caused by: java.lang.ClassNotFoundException:
io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
```
We are not using protobuf anywhere in the pipeline. Not sure why this
version is complaining about it. If we switch back to 0.15.0 the error goes
away. Any help would be appreciated.
Thank you
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]