YousifS7 opened a new issue, #12838:
URL: https://github.com/apache/hudi/issues/12838

   Hello,
   We are using `org.apache.hudi.utilities.streamer.HoodieStreamer` class to 
extract data out of Kafka and write to Hudi table. The Kafka topic is populated 
via Debezium using SQL Server table. The converter used in Debezium is Avro. We 
are using EMR 7.6.0 to run Spark-Submit.
   This works perfectly when using Hudi 0.15.0. However, after switching to 
Hudi 1.0.1, we started encountering this error:
   
   ```
   java.lang.ClassNotFoundException: 
io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider
   ```
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Sync SQL Server table to Kafka via Debezium (AvroConverter)
   2. Provision EMR 7.6.0
   3. Run below Spark-Submit:
   
   > -- Spark-Submit
   ```
   spark-submit 
   --deploy-mode cluster 
   --master yarn 
   --conf spark.rdd.compress=true 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.kryoserializer.buffer.max=512m 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --conf spark.sql.shuffle.partitions=200 
   --conf spark.default.parallelism=200 
   --conf spark.streaming.kafka.allowNonConsecutiveOffsets=true 
   --jars 
s3://some_bucket/libs/hudi-aws-bundle-1.0.1.jar,s3://some_bucket/libs/hudi-spark3.5-bundle_2.12-1.0.1.jar
 
   --class org.apache.hudi.utilities.streamer.HoodieStreamer 
s3://some_bucket/libs/hudi-utilities-slim-bundle_2.12-1.0.1.jar 
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider 
   --props s3://path/to/file.properties 
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
   --source-ordering-field ts 
   --target-base-path s3://path/to/file
   --target-table some_table 
   --sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool 
   --op UPSERT 
   --transformer-class 
org.apache.hudi.utilities.transform.SqlFileBasedTransformer 
   --enable-sync 
   --table-type COPY_ON_WRITE
   ```
   
   > -- Properties File
   ```
   hoodie.streamer.transformer.sql.file=s3://path/to/file.sql
   hoodie.streamer.schemaprovider.registry.url=some_url
   hoodie.streamer.schemaprovider.registry.targetUrl=some_url
   schema.registry.url=some_ip:8081
   hoodie.streamer.source.kafka.topic=some_topic
   bootstrap.servers=some_ip:9092
   auto.offset.reset=earliest
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
   hoodie.datasource.write.partitionpath.field=partition_path
   hoodie.datasource.write.recordkey.field=hudi_key
   hoodie.datasource.write.precombine.field=ts
   hoodie.datasource.hive_sync.database=some_db_name
   hoodie.streamer.kafka.source.maxEvents=1000000000000
   hoodie.datasource.hive_sync.support_timestamp=true
   hoodie.parquet.max.file.size=18874368
   hoodie.copyonwrite.record.size.estimate=2048
   hoodie.parquet.small.file.limit=104857600
   hoodie.cleaner.fileversions.retained=5
   ```
   
   > -- SQL Transfomer File
   ```
   CACHE TABLE dbz_filtered AS
   SELECT CONCAT(source.commit_lsn, ':', ifnull(source.change_lsn, 0), ':', 
ifnull(source.event_serial_no, 0)) AS ts, CASE WHEN op = 'd' THEN before ELSE 
after END AS source_fields, CASE WHEN op = 'd' THEN true ELSE false END AS 
is_deleted FROM <SRC> WHERE op IN ('d', 'u', 'c', 'r');
   
   SELECT ts, is_deleted, source_fields.*, 
CONCAT(trim(source_fields.some_col1), source_fields.some_col2, latest) AS 
hudi_key, from_unixtime(source_fields.some_col2/1000, 'yyyyMM') AS 
partition_path FROM dbz_filtered;
   ```
   
   **Environment Description**
   
   * Hudi version : 1.0.1
   
   * Spark version : 3.5.3
   
   * Hive version : Glue Metastore
   
   * Hadoop version : N/A
   
   * Storage : S3
   
   * Running on Docker? : No
   
   **Error Message**
   
   ```
   ERROR Client: Application diagnostics message: User class threw exception: 
java.lang.NoClassDefFoundError: 
io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider
        at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.lambda$new$1e9d4812$1(SchemaRegistryProvider.java:113)
        at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.fetchSchemaFromRegistry(SchemaRegistryProvider.java:173)
        at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.parseSchemaFromRegistry(SchemaRegistryProvider.java:141)
        at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:252)
        at 
org.apache.hudi.utilities.streamer.SourceFormatAdapter.avroDataInRowFormat(SourceFormatAdapter.java:212)
        at 
org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:238)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:639)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:582)
        at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:554)
        at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:464)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:911)
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:226)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:646)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741)
   Caused by: java.lang.ClassNotFoundException: 
io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
   ```
   
   We are not using protobuf anywhere in the pipeline. Not sure why this 
version is complaining about it. If we switch back to 0.15.0 the error goes 
away. Any help would be appreciated.
   
   Thank you
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to