bksrepo commented on issue #10609:
URL: https://github.com/apache/hudi/issues/10609#issuecomment-1991619367
I am using spark 3.4.1 with hudi bundle
'hudi-spark3.4-bundle_2.12-0.14.0.jar', Hadoop is 3.3.6 and source database is
mysql version 8.0.36
Reported ERROR comes at the time of saving the data-frame. upto df.show()
code works fine.
=================================================================================================================
from pyspark.sql import SparkSession,functions
from pyspark.sql.types import StructType, StructField, IntegerType,
StringType, DecimalType, DateType, TimestampType, BooleanType
# SparkSession
spark = SparkSession.builder \
.appName('Sample_COW') \
.config("spark.yarn.jars", "/opt/spark-3.4.1-bin-hadoop3/jars/*.jar") \
.config('spark.sql.extensions',
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('spark.sql.catalog.spark_catalog',
'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
.config('spark.kryo.registrator','org.apache.spark.HoodieSparkKryoRegistrar') \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
.config('spark.sql.warehouse.dir','hdfs://nn:8020/mnt/hive/warehouse') \
.config('spark.sql.debug.maxToStringFields', '200') \
.config('spark.hadoop.fs.defaultFS','hdfs://Name-Node-Server:8020') \
.config('spark.executor.extraClassPath','/opt/spark-3.4.1-bin-hadoop3/jars/jackson-databind-2.14.2.jar')
\
.config('spark.driver.extraClassPath','/opt/spark-3.4.1-bin-hadoop3/jars/jackson-databind-2.14.2.jar')
\
.config('spark.hadoop.yarn.resourcemanager.hostname','Name-Node-Server')
\
.config("spark.sql.hive.convertMetastoreParquet", "true") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version",
"2") \
.config("spark.hadoop.fs.replication", "1") \
.enableHiveSupport() \
.getOrCreate()
# Define MySQL connection properties along with whole table as a dump or
selective columns with a where clause.
mysql_props = {
"url": "jdbc:mysql://localhost:3306/XXXX",
"driver": "com.mysql.cj.jdbc.Driver",
"user": "XXXXXx",
"password": "XXXXX",
"dbtable": "(select id, pid, center_id, center_code, visit_type,
create_price_list_id, gender, age, age_frequency, clinical_detail,
clinical_history_file, sample_drawn_date, sample_drawn_time_hrs,
sample_drawn_time_min, referal_doctor_id, referal_doctor, referal_customer_id,
referal_customer, department_id, profile_ids, test_ids, amount, discount,
total_amount, mrp, payment_mode, amount_paid, amount_balance, test_status_code,
UNIX_TIMESTAMP(log_date_created) AS log_date_created, created_by, deleted,
sample_status, other_comments, team_lead_id, tech_lead_id, pathologist_id,
tele_pathologist_id, Graph_path,
UNIX_TIMESTAMP(CONVERT_TZ(authentication_date,'+05:30','+00:00')) AS
authentication_date, reference_patient_id, protocol_id, visit_info, ref_center,
investigator_details, month_year,
UNIX_TIMESTAMP(CONVERT_TZ(sample_collection_datetime_at_source,'+05:30',
'+00:00')) AS sample_collection_timestamp FROM sample) as sample"
}
# Read data from MySQL
df = spark.read.format("jdbc").options(**mysql_props).load()
# Define Hudi tables schema to avoide any auto FieldType conversion and
casting issues.
hoodie_schema = StructType([
StructField("id", IntegerType(), True),
StructField("pid", StringType(), True),
StructField("center_id", IntegerType(), True),
StructField("center_code", StringType(), True),
StructField("visit_type", StringType(), True),
StructField("create_price_list_id",
IntegerType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True),
StructField("age_frequency", StringType(), True),
StructField("clinical_detail", StringType(),
True),
StructField("clinical_history_file",
StringType(), True),
StructField("sample_drawn_date", DateType(),
True),
StructField("sample_drawn_time_hrs",
StringType(), True),
StructField("sample_drawn_time_min",
StringType(), True),
StructField("referal_doctor_id", StringType(),
True),
StructField("referal_doctor", StringType(),
True),
StructField("referal_customer_id", StringType(),
True),
StructField("referal_customer", StringType(),
True),
StructField("department_id", IntegerType(),
True),
StructField("profile_ids", StringType(), True),
StructField("test_ids", StringType(), True),
StructField("amount", DecimalType(precision=11,
scale=2), True),
StructField("discount",
DecimalType(precision=11, scale=2), True),
StructField("total_amount",
DecimalType(precision=11, scale=2), True),
StructField("mrp", DecimalType(precision=11,
scale=2), True),
StructField("payment_mode", StringType(), True),
StructField("amount_paid",
DecimalType(precision=11, scale=2), True),
StructField("amount_balance",
DecimalType(precision=11, scale=2), True),
StructField("test_status_code", StringType(),
True),
StructField("log_date_created", IntegerType(),
True),
StructField("created_by", StringType(), True),
StructField("deleted", BooleanType(), True),
StructField("sample_status", StringType(), True),
StructField("other_comments", StringType(),
True),
StructField("team_lead_id", IntegerType(), True),
StructField("tech_lead_id", IntegerType(), True),
StructField("pathologist_id", IntegerType(),
True),
StructField("tele_pathologist_id",
IntegerType(), True),
StructField("graph_path", StringType(), True),
StructField("authentication_date",
IntegerType(), True),
StructField("reference_patient_id",
StringType(), True),
StructField("protocol_id", StringType(), True),
StructField("visit_info", StringType(), True),
StructField("ref_center", StringType(), True),
StructField("investigator_details",
StringType(), True),
StructField("month_year", StringType(), True),
StructField("sample_collection_timestamp",
IntegerType(), True)
])
hudi_options = {
'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.table.name': 'np_sample_registration_2023',
'hoodie.datasource.write.schema': hoodie_schema.json(),
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.operation':'bulk_insert',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
'hoodie.upsert.shuffle.parallelism': '2',
'hoodie.insert.shuffle.parallelism': '2',
'hoodie.table.name':'sample_cow',
'path': '/datalake/sample/etl/sample_cow/np/cow/'
}
df.show()
# Write data to Hudi COW table in Parquet format
(df
.write
.format("org.apache.hudi")
.options(**hudi_options)
.mode('overwrite')
.save()) # Save to HDFS
spark.stop()
=================================================================================================================
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]