ksrihari93 opened a new issue, #5361:
URL: https://github.com/apache/hudi/issues/5361
Hi Team,
We are running cdc pipelines with Mysql, Kafka Connect,Debezium and Hudi (
Delta streamer).And Schema n schema registry
For streaming pipelines, Hudi Job is running fine with proper schema
integration between MySQL and Kafka topic.
while backfilling the data using Delta streamer JDBC source we are seeing
the mismatch between Debezium generated schema in schema registry and Hudi
Generated. This is making the job Fail.
mysql> desc ev_test;
+--------------+---------------------+------+-----+----------------------+----------------+
| Field | Type | Null | Key | Default |
Extra |
+--------------+---------------------+------+-----+----------------------+----------------+
| id | bigint(20) unsigned | NO | PRI | NULL |
auto_increment |
| Name | varchar(20) | NO | | NULL |
|
| created_at | timestamp | NO | | CURRENT_TIMESTAMP |
|
| date_entered | datetime | NO | | CURRENT_TIMESTAMP |
|
| dt1 | datetime(3) | NO | | CURRENT_TIMESTAMP(3) |
|
| ts1 | timestamp(3) | NO | | CURRENT_TIMESTAMP(3) |
|
| time1 | datetime | NO | | CURRENT_TIMESTAMP |
|
+--------------+---------------------+------+-----+----------------------+----------------+
Hudi generated Schema for the table :
{
"type": "record",
"name": "hoodie_source",
"namespace": "hoodie.source",
"fields": [
{
"name": "id",
"type": [
"null",
{
"type": "fixed",
"name": "fixed",
"namespace": "hoodie.source.hoodie_source.id",
"size": 9,
"logicalType": "decimal",
"precision": 20,
"scale": 0
}
],
"default": null
},
{
"name": "Name",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "created_at",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-micros"
}
],
"default": null
},
{
"name": "date_entered",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-micros"
}
],
"default": null
},
{
"name": "dt1",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-micros"
}
],
"default": null
},
{
"name": "ts1",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-micros"
}
],
"default": null
},
{
"name": "time1",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-micros"
}
],
"default": null
}
]
}
Debezium Generated schema:
{
"type": "record",
"name": "Value",
"namespace": "ev_orders_test.blackbox_prod.ev_test",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "Name",
"type": "string"
},
{
"name": "created_at",
"type": [
{
"type": "string",
"connect.default": "1970-01-01 05:30:00"
},
"null"
],
"default": "1970-01-01 05:30:00"
},
{
"name": "date_entered",
"type": [
{
"type": "string",
"connect.default": "0"
},
"null"
],
"default": "0"
},
{
"name": "dt1",
"type": [
{
"type": "string",
"connect.default": "0"
},
"null"
],
"default": "0"
},
{
"name": "ts1",
"type": [
{
"type": "string",
"connect.default": "1970-01-01 05:30:00"
},
"null"
],
"default": "1970-01-01 05:30:00"
},
{
"name": "time1",
"type": [
{
"type": "string",
"connect.default": "0"
},
"null"
],
"default": "0"
},
{
"name": "__op",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "__ts_ms",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "__source_ts_ms",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "__source_connector",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "__source_snapshot",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.parameters": {
"allowed": "true,last,false"
},
"connect.name": "io.debezium.data.Enum"
}
],
"default": null
},
{
"name": "__source_query",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "__deleted",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "ev_test"
}
when pointed the Hoodie delta streamer job with Debezium generated schema
it's getting failed with below error.Can someone please help in maintaining
schema integrity between stream pipelines and backfilling pipelines .
22514 [consumer-thread-1] ERROR org.apache.hudi.io.HoodieWriteHandle -
Error writing record HoodieRecord{key=HoodieKey { recordKey=106
partitionPath=dt=2021-12-28}, currentLocation='null', newLocation='null'}
java.io.EOFException
at
org.apache.avro.io.BinaryDecoder$ByteArrayByteSource.readRaw(BinaryDecoder.java:944)
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:349)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:137)
at
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:127)
at
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75)
at
org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105)
at
org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
at
org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:105)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Job Conf:
below conf .
--target-base-path
/Users/hudi_op55/
--table-type
COPY_ON_WRITE
--target-table
test
--source-class
org.apache.hudi.utilities.sources.JdbcSource
--op
BULK_INSERT
--source-ordering-field
ts1
--props
/Users/jdbcF.properties
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider
Properties File:
hoodie.deltastreamer.jdbc.url=jdbc:mysql://localhost:3306/sys
hoodie.deltastreamer.jdbc.driver.class=com.mysql.cj.jdbc.Driver
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=created_at
hoodie.deltastreamer.jdbc.incr.pull=false
hoodie.deltastreamer.jdbc.user=
hoodie.deltastreamer.jdbc.password=
hoodie.deltastreamer.jdbc.table.name=blackbox_prod.ev_test
hoodie.deltastreamer.keygen.timebased.timestamp.type=SCALAR
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd hh:mm:ss
hoodie.deltastreamer.keygen.timebased.timezone=IST
hoodie.deltastreamer.keygen.timebased.output.dateformat='dt='yyyy-MM-dd
hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit=MICROSECONDS
schema.registry.url=http://localhost:9000/
hoodie.deltastreamer.schemaprovider.registry.url=
hoodie.deltastreamer.schemaprovider.registry.targetUrl=
hoodie.deltastreamer.keygen.timebased.timestamp.type=SCALAR
hoodie.deltastreamer.keygen.timebased.timezone=IST
hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit=MICROSECONDS
hoodie.deltastreamer.schemaprovider.schema_post_processor=org.apache.hudi.utilities.schema.SparkAvroPostProcessor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]