soumilshah1995 opened a new issue, #10174:
URL: https://github.com/apache/hudi/issues/10174

   
   I hope this message finds you well. I am currently encountering an issue 
with the Hudi delta streamer, and I am seeking your assistance in resolving it.
   
   The error I am facing is as follows:
   
   ```
   
   
   org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
           at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to 
parse schema from registry: 
{"type":"record","name":"Order","fields":[{"name":"order_id","type":"string"},{"name":"name","type":"string"},{"name":"order_value","type":"string"},{"name":"priority","type":"string"},{"name":"order_date","type":"string"},{"name":"customer_id","type":"string"},{"name":"ts","type":"string"}]}
           at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.parseSchemaFromRegistry(SchemaRegistryProvider.java:105)
           at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:196)
           ... 18 more
   Caused by: java.lang.IllegalArgumentException: Property 
hoodie.streamer.schemaprovider.registry.schemaconverter not found
           at 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys(ConfigUtils.java:334)
           at 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys(ConfigUtils.java:308)
           at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.parseSchemaFromRegistry(SchemaRegistryProvider.java:99)
           ... 19 more
   ```
   
   Here is a summary of my setup:
   Kafka is deployed using a Docker Compose file, and the services include 
Zookeeper, Schema Registry, and a Kafka broker.
   Docker Compose file:
   
   ```
   
   version: '3'
   services:
     zookeeper:
       image: confluentinc/cp-zookeeper:latest
       container_name: zookeeper
       environment:
         ZOOKEEPER_CLIENT_PORT: 2181
         ZOOKEEPER_TICK_TIME: 2000
       ports:
         - "2181:2181"
   
     schema-registry:
       image: confluentinc/cp-schema-registry:latest
       hostname: schema-registry
       depends_on:
         - kafka-broker-1
       ports:
         - "8081:8081"
       environment:
         SCHEMA_REGISTRY_HOST_NAME: schema-registry
         SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
         SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081/
         SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 
plaintext://kafka-broker-1:9092,PLAINTEXT_internal://localhost:19092
         SCHEMA_REGISTRY_DEBUG: 'true'
   
     kafka-broker-1:
       image: confluentinc/cp-kafka:latest
       hostname: kafka-broker-1
       ports:
         - "19092:19092"
       depends_on:
         - zookeeper
       environment:
         KAFKA_BROKER_ID: 1
         KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
         KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
         KAFKA_ADVERTISED_LISTENERS: 
plaintext://kafka-broker-1:9092,PLAINTEXT_internal://localhost:19092
         KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
   ```
   
   Publishing Messages
   ```
   
   from faker import Faker
   from time import sleep
   import random
   import uuid
   from datetime import datetime
   from kafka_schema_registry import prepare_producer
   
   # Configuration
   KAFKA_BOOTSTRAP_SERVERS = ['localhost:19092']
   SCHEMA_REGISTRY_URL = 'http://localhost:8081'
   TOPIC_NAME = 'orders'
   NUM_MESSAGES = 20
   SLEEP_INTERVAL = 1
   
   # Avro Schema
   SAMPLE_SCHEMA = {
       "type": "record",
       "name": "Order",
       "fields": [
           {"name": "order_id", "type": "string"},
           {"name": "name", "type": "string"},
           {"name": "order_value", "type": "string"},
           {"name": "priority", "type": "string" },
           {"name": "order_date", "type": "string"},
           {"name": "customer_id", "type": "string"},
           {"name": "ts", "type": "string"}
       ]
   }
   
   # Kafka Producer
   producer = prepare_producer(
       KAFKA_BOOTSTRAP_SERVERS,
       SCHEMA_REGISTRY_URL,
       TOPIC_NAME,
       1,
       1,
       value_schema=SAMPLE_SCHEMA,
   )
   
   # Faker instance
   faker = Faker()
   
   
   class DataGenerator:
       @staticmethod
       def get_orders_data():
           return {
               "order_id": str(uuid.uuid4()),
               "name": faker.text(max_nb_chars=20),
               "order_value": str(random.randint(10, 1000)),
               "priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
               "order_date": faker.date_between(start_date='-30d', 
end_date='today').strftime('%Y-%m-%d'),
               "customer_id": str(uuid.uuid4()),
               "ts": str(datetime.now().timestamp()),
           }
   
       @staticmethod
       def produce_avro_message(producer, topic, data):
           producer.send(topic, data)
   
   
   # Generate and send order data
   for _ in range(NUM_MESSAGES):
       order_data = DataGenerator.get_orders_data()
       DataGenerator.produce_avro_message(producer, TOPIC_NAME, order_data)
       print("Order Payload:", order_data)
       sleep(SLEEP_INTERVAL)
   
   ```
   
   Delta Streamer  Job
   ```
   
   spark-submit \
       --class org.apache.hudi.utilities.streamer.HoodieStreamer \
       --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0 \
       --properties-file spark-config.properties \
       --master 'local[*]' \
       --executor-memory 1g \
       jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
       --table-type COPY_ON_WRITE \
       --op UPSERT \
       --source-ordering-field ts \
       --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
       --target-base-path 
file:///Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E6/hudidb/orders
 \
       --target-table orders \
       --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
       --props hudi_tbl.props
   ```
   
   Hudi CONF
   ```
   hoodie.datasource.write.recordkey.field=order_id
   hoodie.datasource.write.partitionpath.field=order_date
   hoodie.datasource.write.precombine.field=ts
   bootstrap.servers=localhost:19092
   auto.offset.reset=earliest
   hoodie.deltastreamer.source.kafka.topic=orders
   
   
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
   schema.registry.url=https://localhost:8081/
   # 
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter
   
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders-value/versions/latest
   ```
   
   ### Quick Note 
   if I uncomment  # 
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter
   
   Then I am getting different t error 
   ```
   
    09:54:44 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
   org.apache.kafka.common.errors.SerializationException: Error deserializing 
key/value for partition orders-0 at offset 0. If needed, please seek past the 
record to continue consumption.
   Caused by: org.apache.kafka.common.errors.SerializationException: Error 
deserializing Avro message for id 1
   Caused by: javax.net.ssl.SSLException: Unsupported or unrecognized SSL 
message
           at 
java.base/sun.security.ssl.SSLSocketInputRecord.handleUnknownRecord(SSLSocketInputRecord.java:457)
           at 
java.base/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:175)
           at 
java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:111)
           at 
java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1511)
           at 
java.base/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1421)
           at 
java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:456)
           at 
java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:427)
           at 
java.base/sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:580)
           at 
java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:201)
           at 
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1592)
           at 
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)
           at 
java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
           at 
java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:334)
           at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:218)
           at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
           at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:495)
           at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:488)
           at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:177)
           at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:256)
           at io.confluent
   ```
   
   
   
   Despite my efforts, I am unable to identify the root cause of this error. 
Could you please review the provided information and suggest any possible 
solutions or areas that might need adjustment?
   
   Your guidance and expertise in resolving this matter would be greatly 
appreciated.
   Also i have referred several GH as well now hoping to seek help from experts
   
   GH : 9132
   GH 8519
   GH 8521
   
   
   If needed i can create GH
   Thank you for your time and assistance.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to