luongngochoa opened a new issue, #9132:
URL: https://github.com/apache/hudi/issues/9132

   when trying to read data from Kafka which is stored as Json from schema 
registry. using below configuration. pls tell me where do I'm wrong with the 
configuration
       - --checkpoint
       - "OCR.VEHICLE.REGISTRATION,0:10"
       - --schemaprovider-class
       - org.apache.hudi.utilities.schema.SchemaRegistryProvider
       - --source-class
       - org.apache.hudi.utilities.sources.JsonKafkaSource
       - --hoodie-conf
       - 
"hoodie.deltastreamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter"
       - --table-type
       - COPY_ON_WRITE
       - --source-ordering-field
       - plate
       - --target-base-path
       - hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest
       - --target-table
       - ocr_vehicle_registration_cow_latest
       - --op
       - INSERT
       - --props
       - 
file:///var/hoodie/ws/properties/ocr_vehicle_registration_cow_latest.properties
       - --min-sync-interval-seconds
       - "1"
       - --continuous
       - --enable-hive-sync
       - --source-limit
       - "1000"
   
   this is my property file
       hoodie.datasource.write.recordkey.field=plate
       
hoodie.deltastreamer.schemaprovider.registry.url=http://confluent-schema-registry.kafka.svc:8081/subjects/OCR.VEHICLE.REGISTRATION-value/versions/latest
       hoodie.deltastreamer.source.kafka.topic=OCR.VEHICLE.REGISTRATION
       hoodie.datasource.hive_sync.table=ocr_vehicle_registration_cow_latest
       hoodie.datasource.hive_sync.database=raw
       
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
       
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
   
   this is my schema registry
   ```{
        "type": "object",
        "additionalProperties": false,
        "title": "ocr vehicle registration",
        "description": "ocr vehicle registration",
        "properties": {
                "filename": {
                        "type": "string"
                },
                "type": {
                        "type": "number"
                },
                "name": {
                        "type": "string"
                },
                "address": {
                        "type": "string"
                },
                "brand": {
                        "type": "string"
                },
                "model": {
                        "type": "string"
                },
                "engine": {
                        "type": "string"
                },
                "chassis": {
                        "type": "string"
                },
                "color": {
                        "type": "string"
                },
                "sit": {
                        "type": "number"
                },
                "type_": {
                        "type": "string"
                },
                "capacity": {
                        "type": "string"
                },
                "day": {
                        "type": "number"
                },
                "month": {
                        "type": "number"
                },
                "year": {
                        "type": "number"
                },
                "plate": {
                        "type": "string"
                },
                "expired_date": {
                        "type": "string"
                }
        }
   }```
   
   **Environment Description**
   
   * Hudi version : 0.13.0
   
   * Spark version : 3.2.1
   
   * Hive version : 3.1.0
   
   * Hadoop version : 3.1.1.3.1
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```++ id -u
   + myuid=0
   ++ id -g
   + mygid=0
   + set +e
   ++ getent passwd 0
   + uidentry=root:x:0:0:root:/root:/bin/bash
   + set -e
   + '[' -z root:x:0:0:root:/root:/bin/bash ']'
   + SPARK_CLASSPATH=':/opt/spark/jars/*'
   + env
   + grep SPARK_JAVA_OPT_
   + sort -t_ -k4 -n
   + sed 's/[^=]*=\(.*\)/\1/g'
   + readarray -t SPARK_EXECUTOR_JAVA_OPTS
   + '[' -n '' ']'
   + '[' -z ']'
   + '[' -z ']'
   + '[' -n '' ']'
   + '[' -z ']'
   + '[' -z x ']'
   + SPARK_CLASSPATH='/etc/spark/conf::/opt/spark/jars/*'
   + case "$1" in
   + shift 1
   + CMD=("$SPARK_HOME/bin/spark-submit" --conf 
"spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@")
   + exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf 
spark.driver.bindAddress=10.233.104.236 --deploy-mode client --properties-file 
/opt/spark/conf/spark.properties --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
local:///app/var/hoodie/ws/packaging/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar
 --checkpoint OCR.VEHICLE.REGISTRATION,0:10 --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider --source-class 
org.apache.hudi.utilities.sources.JsonKafkaSource --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter
 --table-type COPY_ON_WRITE --source-ordering-field plate --target-base-path 
hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest --target-table 
ocr_vehicle_registration_cow_latest --op INSERT --props 
file:///var/hoodie/ws/properties/ocr_vehicle_registration_cow_latest.properties 
--min-sync-interval-seconds 1 --continuous --enable-hiv
 e-sync --source-limit 1000
   23/07/06 06:48:57 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
   log4j:WARN No appenders could be found for logger 
(org.apache.hudi.utilities.deltastreamer.SchedulerConfGenerator).
   log4j:WARN Please initialize the log4j system properly.
   log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for 
more info.
   Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties
   23/07/06 06:48:58 INFO SparkContext: Running Spark version 3.2.1
   23/07/06 06:48:58 INFO ResourceUtils: 
==============================================================
   23/07/06 06:48:58 INFO ResourceUtils: No custom resources configured for 
spark.driver.
   23/07/06 06:48:58 INFO ResourceUtils: 
==============================================================
   23/07/06 06:48:58 INFO SparkContext: Submitted application: 
delta-streamer-ocr_vehicle_registration_cow_latest
   23/07/06 06:48:58 INFO ResourceProfile: Default ResourceProfile created, 
executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , 
memory -> name: memory, amount: 500, script: , vendor: , offHeap -> name: 
offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: 
cpus, amount: 1.0)
   23/07/06 06:48:58 INFO ResourceProfile: Limiting resource is cpus at 1 tasks 
per executor
   23/07/06 06:48:58 INFO ResourceProfileManager: Added ResourceProfile id: 0
   23/07/06 06:48:59 INFO SecurityManager: Changing view acls to: root,hdfs
   23/07/06 06:48:59 INFO SecurityManager: Changing modify acls to: root,hdfs
   23/07/06 06:48:59 INFO SecurityManager: Changing view acls groups to: 
   23/07/06 06:48:59 INFO SecurityManager: Changing modify acls groups to: 
   23/07/06 06:48:59 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(root, hdfs); 
groups with view permissions: Set(); users  with modify permissions: Set(root, 
hdfs); groups with modify permissions: Set()
   23/07/06 06:48:59 INFO deprecation: mapred.output.compression.codec is 
deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
   23/07/06 06:48:59 INFO deprecation: mapred.output.compression.type is 
deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
   23/07/06 06:48:59 INFO deprecation: mapred.output.compress is deprecated. 
Instead, use mapreduce.output.fileoutputformat.compress
   23/07/06 06:49:00 INFO Utils: Successfully started service 'sparkDriver' on 
port 7078.
   23/07/06 06:49:00 INFO SparkEnv: Registering MapOutputTracker
   23/07/06 06:49:00 INFO SparkEnv: Registering BlockManagerMaster
   23/07/06 06:49:00 INFO BlockManagerMasterEndpoint: Using 
org.apache.spark.storage.DefaultTopologyMapper for getting topology information
   23/07/06 06:49:00 INFO BlockManagerMasterEndpoint: 
BlockManagerMasterEndpoint up
   23/07/06 06:49:00 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
   23/07/06 06:49:00 INFO DiskBlockManager: Created local directory at 
/var/data/spark-a32e287f-3ad6-4bb9-8b35-af6bb8445b85/blockmgr-2530d594-5eea-4225-bd95-202f31a698cc
   23/07/06 06:49:00 INFO MemoryStore: MemoryStore started with capacity 110.0 
MiB
   23/07/06 06:49:00 INFO SparkEnv: Registering OutputCommitCoordinator
   23/07/06 06:49:01 INFO Utils: Successfully started service 'SparkUI' on port 
8090.
   23/07/06 06:49:02 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
http://ocr-vehicle-registration-src-9b7f828929f5e5ad-driver-svc.spark.svc:8090
   23/07/06 06:49:02 INFO SparkContext: Added JAR 
local:///app/var/hoodie/ws/packaging/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar
 at 
file:/app/var/hoodie/ws/packaging/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar
 with timestamp 1688626138612
   23/07/06 06:49:02 INFO SparkContext: The JAR 
local:///app/var/hoodie/ws/packaging/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar
 at 
file:/app/var/hoodie/ws/packaging/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar
 has been added already. Overwriting of added jar is not supported in the 
current version.
   23/07/06 06:49:02 INFO SparkContext: The JAR 
local:///app/var/hoodie/ws/packaging/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar
 at 
file:/app/var/hoodie/ws/packaging/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar
 has been added already. Overwriting of added jar is not supported in the 
current version.
   23/07/06 06:49:02 INFO SparkKubernetesClientFactory: Auto-configuring K8S 
client using current context from users K8S config file
   23/07/06 06:49:09 INFO ExecutorPodsAllocator: Going to request 1 executors 
from Kubernetes for ResourceProfile Id: 0, target: 1, known: 0, 
sharedSlotFromPendingPods: 2147483647.
   23/07/06 06:49:09 INFO KubernetesClientUtils: Spark configuration files 
loaded from Some(/etc/spark/conf) : core-site.xml,hdfs-site.xml,hive-site.xml
   23/07/06 06:49:10 INFO KubernetesClientUtils: Spark configuration files 
loaded from Some(/etc/spark/conf) : core-site.xml,hdfs-site.xml,hive-site.xml
   23/07/06 06:49:10 INFO BasicExecutorFeatureStep: Decommissioning not 
enabled, skipping shutdown script
   23/07/06 06:49:12 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 7079.
   23/07/06 06:49:12 INFO NettyBlockTransferService: Server created on 
ocr-vehicle-registration-src-9b7f828929f5e5ad-driver-svc.spark.svc:7079
   23/07/06 06:49:12 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy
   23/07/06 06:49:12 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(driver, 
ocr-vehicle-registration-src-9b7f828929f5e5ad-driver-svc.spark.svc, 7079, None)
   23/07/06 06:49:12 INFO BlockManagerMasterEndpoint: Registering block manager 
ocr-vehicle-registration-src-9b7f828929f5e5ad-driver-svc.spark.svc:7079 with 
110.0 MiB RAM, BlockManagerId(driver, 
ocr-vehicle-registration-src-9b7f828929f5e5ad-driver-svc.spark.svc, 7079, None)
   23/07/06 06:49:12 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(driver, 
ocr-vehicle-registration-src-9b7f828929f5e5ad-driver-svc.spark.svc, 7079, None)
   23/07/06 06:49:12 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(driver, 
ocr-vehicle-registration-src-9b7f828929f5e5ad-driver-svc.spark.svc, 7079, None)
   23/07/06 06:49:18 INFO 
KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Registered executor 
NettyRpcEndpointRef(spark-client://Executor) (10.233.104.254:53680) with ID 1,  
ResourceProfileId 0
   23/07/06 06:49:18 INFO KubernetesClusterSchedulerBackend: SchedulerBackend 
is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
   23/07/06 06:49:18 WARN HoodieDeltaStreamer: --enable-hive-sync will be 
deprecated in a future release; please use --enable-sync instead for Hive 
syncing
   23/07/06 06:49:18 INFO BlockManagerMasterEndpoint: Registering block manager 
10.233.104.254:41773 with 110.0 MiB RAM, BlockManagerId(1, 10.233.104.254, 
41773, None)
   23/07/06 06:49:20 WARN DomainSocketFactory: The short-circuit local reads 
feature cannot be used because libhadoop cannot be loaded.
   23/07/06 06:49:20 WARN DFSPropertiesConfiguration: Cannot find 
HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
   23/07/06 06:49:20 WARN DFSPropertiesConfiguration: Properties file 
file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
   23/07/06 06:49:20 INFO UtilHelpers: Adding overridden properties to file 
properties.
   23/07/06 06:49:21 WARN SparkContext: Using an existing SparkContext; some 
configuration may not take effect.
   23/07/06 06:49:21 INFO HoodieDeltaStreamer: Creating delta streamer with 
configs:
   auto.offset.reset: earliest
   bootstrap.servers: carpla-dev-kafka-brokers.kafka.svc.cluster.local:9093
   hoodie.auto.adjust.lock.configs: true
   hoodie.bulkinsert.shuffle.parallelism: 1
   hoodie.clean.async: true
   hoodie.clean.automatic: true
   hoodie.cleaner.hours.retained: 72
   hoodie.cleaner.parallelism: 200
   hoodie.cleaner.policy: KEEP_LATEST_BY_HOURS
   hoodie.combine.before.upsert: true
   hoodie.compact.inline: false
   hoodie.copyonwrite.insert.auto.split: false
   hoodie.copyonwrite.insert.split.size: 10000000
   hoodie.copyonwrite.record.size.estimate: 100
   hoodie.datasource.hive_sync.database: raw
   hoodie.datasource.hive_sync.enable: true
   hoodie.datasource.hive_sync.jdbcurl: jdbc:hive2://10.0.9.131:10000
   hoodie.datasource.hive_sync.metastore.uris: thrift://10.0.9.131:9083
   hoodie.datasource.hive_sync.mode: hms
   hoodie.datasource.hive_sync.partition_extractor_class: 
org.apache.hudi.hive.NonPartitionedExtractor
   hoodie.datasource.hive_sync.password: carpla@123
   hoodie.datasource.hive_sync.table: ocr_vehicle_registration_cow_latest
   hoodie.datasource.hive_sync.use_jdbc: false
   hoodie.datasource.hive_sync.username: hive
   hoodie.datasource.write.keygenerator.class: 
org.apache.hudi.keygen.NonpartitionedKeyGenerator
   hoodie.datasource.write.operation: upsert
   hoodie.datasource.write.reconcile.schema: false
   hoodie.datasource.write.recordkey.field: plate
   hoodie.delete.shuffle.parallelism: 1
   hoodie.deltastreamer.schemaprovider.registry.schemaconverter: 
org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter
   hoodie.deltastreamer.schemaprovider.registry.url: 
http://confluent-schema-registry.kafka.svc:8081/subjects/OCR.VEHICLE.REGISTRATION-value/versions/latest
   hoodie.deltastreamer.source.kafka.topic: OCR.VEHICLE.REGISTRATION
   hoodie.embed.timeline.server: true
   hoodie.filesystem.view.type: EMBEDDED_KV_STORE
   hoodie.insert.shuffle.parallelism: 1
   hoodie.metadata.enable: false
   hoodie.parquet.max.file.size: 125829120
   hoodie.parquet.small.file.limit: 104857600
   hoodie.upsert.shuffle.parallelism: 1
   schema.registry.url: 
http://confluent-schema-registry.kafka.svc.cluster.local:8081
   security.protocol: SSL
   ssl.keystore.location: /var/hoodie/ws/keystore/user.p12
   ssl.keystore.password: XkSWMeRTARYSrpOVvxKpPBQHW7ZHkdZP
   ssl.keystore.type: PKCS12
   ssl.truststore.location: /var/hoodie/ws/truststore/ca.p12
   ssl.truststore.password: FFLk1JPWDWx0
   ssl.truststore.type: PKCS12
   
   23/07/06 06:49:21 INFO HoodieTableMetaClient: Initializing 
hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest as hoodie table 
hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest
   23/07/06 06:49:22 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest
   23/07/06 06:49:22 INFO HoodieTableConfig: Loading table properties from 
hdfs:/dwh/raw/ocr/ocr_vehicle_registration_cow_latest/.hoodie/hoodie.properties
   23/07/06 06:49:23 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest
   23/07/06 06:49:23 INFO HoodieTableMetaClient: Finished initializing Table of 
type COPY_ON_WRITE from hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest
   23/07/06 06:49:23 INFO SparkUI: Stopped Spark web UI at 
http://ocr-vehicle-registration-src-9b7f828929f5e5ad-driver-svc.spark.svc:8090
   23/07/06 06:49:23 INFO KubernetesClusterSchedulerBackend: Shutting down all 
executors
   23/07/06 06:49:23 INFO 
KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each 
executor to shut down
   23/07/06 06:49:23 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client 
has been closed.
   23/07/06 06:49:23 INFO MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!
   23/07/06 06:49:23 INFO MemoryStore: MemoryStore cleared
   23/07/06 06:49:23 INFO BlockManager: BlockManager stopped
   23/07/06 06:49:23 INFO BlockManagerMaster: BlockManagerMaster stopped
   23/07/06 06:49:23 INFO 
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
   23/07/06 06:49:23 INFO SparkContext: Successfully stopped SparkContext
   Exception in thread "main" org.apache.avro.SchemaParseException: Type not 
supported: object
        at org.apache.avro.Schema.parse(Schema.java:1734)
        at org.apache.avro.Schema$Parser.parse(Schema.java:1430)
        at org.apache.avro.Schema$Parser.parse(Schema.java:1418)
        at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSchema(SchemaRegistryProvider.java:100)
        at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:107)
        at 
org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:42)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:911)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:243)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:680)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:148)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:121)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:573)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   23/07/06 06:49:23 INFO ShutdownHookManager: Shutdown hook called
   23/07/06 06:49:23 INFO ShutdownHookManager: Deleting directory 
/var/data/spark-a32e287f-3ad6-4bb9-8b35-af6bb8445b85/spark-fb2e8bd9-b281-4391-87d3-75f552ae9e7f
   23/07/06 06:49:23 INFO ShutdownHookManager: Deleting directory 
/tmp/spark-65b0ea58-5e2b-47a3-97fc-d573497d4115
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to