soumilshah1995 opened a new issue, #10174:
URL: https://github.com/apache/hudi/issues/10174
I hope this message finds you well. I am currently encountering an issue
with the Hudi delta streamer, and I am seeking your assistance in resolving it.
The error I am facing is as follows:
```
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to
parse schema from registry:
{"type":"record","name":"Order","fields":[{"name":"order_id","type":"string"},{"name":"name","type":"string"},{"name":"order_value","type":"string"},{"name":"priority","type":"string"},{"name":"order_date","type":"string"},{"name":"customer_id","type":"string"},{"name":"ts","type":"string"}]}
at
org.apache.hudi.utilities.schema.SchemaRegistryProvider.parseSchemaFromRegistry(SchemaRegistryProvider.java:105)
at
org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:196)
... 18 more
Caused by: java.lang.IllegalArgumentException: Property
hoodie.streamer.schemaprovider.registry.schemaconverter not found
at
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys(ConfigUtils.java:334)
at
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys(ConfigUtils.java:308)
at
org.apache.hudi.utilities.schema.SchemaRegistryProvider.parseSchemaFromRegistry(SchemaRegistryProvider.java:99)
... 19 more
```
Here is a summary of my setup:
Kafka is deployed using a Docker Compose file, and the services include
Zookeeper, Schema Registry, and a Kafka broker.
Docker Compose file:
```
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
schema-registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
depends_on:
- kafka-broker-1
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081/
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS:
plaintext://kafka-broker-1:9092,PLAINTEXT_internal://localhost:19092
SCHEMA_REGISTRY_DEBUG: 'true'
kafka-broker-1:
image: confluentinc/cp-kafka:latest
hostname: kafka-broker-1
ports:
- "19092:19092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS:
plaintext://kafka-broker-1:9092,PLAINTEXT_internal://localhost:19092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
```
Publishing Messages
```
from faker import Faker
from time import sleep
import random
import uuid
from datetime import datetime
from kafka_schema_registry import prepare_producer
# Configuration
KAFKA_BOOTSTRAP_SERVERS = ['localhost:19092']
SCHEMA_REGISTRY_URL = 'http://localhost:8081'
TOPIC_NAME = 'orders'
NUM_MESSAGES = 20
SLEEP_INTERVAL = 1
# Avro Schema
SAMPLE_SCHEMA = {
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "order_value", "type": "string"},
{"name": "priority", "type": "string" },
{"name": "order_date", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "ts", "type": "string"}
]
}
# Kafka Producer
producer = prepare_producer(
KAFKA_BOOTSTRAP_SERVERS,
SCHEMA_REGISTRY_URL,
TOPIC_NAME,
1,
1,
value_schema=SAMPLE_SCHEMA,
)
# Faker instance
faker = Faker()
class DataGenerator:
@staticmethod
def get_orders_data():
return {
"order_id": str(uuid.uuid4()),
"name": faker.text(max_nb_chars=20),
"order_value": str(random.randint(10, 1000)),
"priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
"order_date": faker.date_between(start_date='-30d',
end_date='today').strftime('%Y-%m-%d'),
"customer_id": str(uuid.uuid4()),
"ts": str(datetime.now().timestamp()),
}
@staticmethod
def produce_avro_message(producer, topic, data):
producer.send(topic, data)
# Generate and send order data
for _ in range(NUM_MESSAGES):
order_data = DataGenerator.get_orders_data()
DataGenerator.produce_avro_message(producer, TOPIC_NAME, order_data)
print("Order Payload:", order_data)
sleep(SLEEP_INTERVAL)
```
Delta Streamer Job
```
spark-submit \
--class org.apache.hudi.utilities.streamer.HoodieStreamer \
--packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0 \
--properties-file spark-config.properties \
--master 'local[*]' \
--executor-memory 1g \
jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
--table-type COPY_ON_WRITE \
--op UPSERT \
--source-ordering-field ts \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--target-base-path
file:///Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E6/hudidb/orders
\
--target-table orders \
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--props hudi_tbl.props
```
Hudi CONF
```
hoodie.datasource.write.recordkey.field=order_id
hoodie.datasource.write.partitionpath.field=order_date
hoodie.datasource.write.precombine.field=ts
bootstrap.servers=localhost:19092
auto.offset.reset=earliest
hoodie.deltastreamer.source.kafka.topic=orders
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
schema.registry.url=https://localhost:8081/
#
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders-value/versions/latest
```
### Quick Note
if I uncomment #
hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter
Then I am getting different t error
```
09:54:44 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition orders-0 at offset 0. If needed, please seek past the
record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 1
Caused by: javax.net.ssl.SSLException: Unsupported or unrecognized SSL
message
at
java.base/sun.security.ssl.SSLSocketInputRecord.handleUnknownRecord(SSLSocketInputRecord.java:457)
at
java.base/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:175)
at
java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:111)
at
java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1511)
at
java.base/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1421)
at
java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:456)
at
java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:427)
at
java.base/sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:580)
at
java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:201)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1592)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)
at
java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
at
java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:334)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:218)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:495)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:488)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:177)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:256)
at io.confluent
```
Despite my efforts, I am unable to identify the root cause of this error.
Could you please review the provided information and suggest any possible
solutions or areas that might need adjustment?
Your guidance and expertise in resolving this matter would be greatly
appreciated.
Also i have referred several GH as well now hoping to seek help from experts
GH : 9132
GH 8519
GH 8521
If needed i can create GH
Thank you for your time and assistance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]