ad1happy2go commented on issue #8625:
URL: https://github.com/apache/hudi/issues/8625#issuecomment-1540021140
@chinmay-032 I just tried to reproduce with OSS version. It's very clear
that OSS hudi doesn't contain partial update class.
I got Class not Found error for PartialUpdateAvroPayload. Code pasted below.
Now I do understand your scenario when you are saying AWS provided code have
this class. We need to check with AWS team for that. I recommend you using OSS
hudi 0.13.0 version which should ideally work.
```
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import Row
from datetime import date
spark = SparkSession \
.builder \
.master("local[1]") \
.config("spark.driver.memory", "8g") \
.config("spark.jars.packages",
"org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.2") \
.config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
common_config={
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.precombine.field": "timestamp",
"hoodie.datasource.write.partitionpath.field": "timestamp__date_",
'hoodie.table.name': 'issue_8625',
'hoodie.index.type': 'GLOBAL_BLOOM',
'hoodie.bloom.index.update.partition.path': 'false',
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.TimestampBasedKeyGenerator",
"hoodie.deltastreamer.keygen.timebased.timestamp.type": "SCALAR",
"hoodie.deltastreamer.keygen.timebased.output.dateformat": 'yyyy/MM/dd',
"hoodie.deltastreamer.keygen.timebased.timezone": "GMT",
"hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit":
"DAYS",
"hoodie.datasource.write.payload.class":
"org.apache.hudi.common.model.PartialUpdateAvroPayload",
"hoodie.compaction.payload.class":
"org.apache.hudi.common.model.PartialUpdateAvroPayload",
"hoodie.datasource.compaction.async.enable": "false",
"hoodie.payload.ordering.field": "timestamp"
}
def get_structfield(fieldname, typestring):
if typestring == "NUMBER":
return StructField(fieldname, DoubleType(), True)
elif typestring == "BOOLEAN":
return StructField(fieldname, BooleanType(), True)
elif typestring == "RELATIVE_TIME":
return StructField(fieldname, TimestampType(), True)
elif typestring == "LIST_DOUBLE":
return StructField(fieldname, ArrayType(DoubleType(), False), True)
elif typestring == "LIST_STRING":
return StructField(fieldname, ArrayType(StringType(), False), True)
return StructField(fieldname, StringType(), True)
def write_to_hudi(df):
t1 = time.time()
table_name = "issue_8625"
try:
df.write.format("org.apache.hudi") \
.options(**common_config) \
.option('hoodie.table.name', table_name) \
.option('hoodie.datasource.hive_sync.table', table_name) \
.mode("append") \
.save("s3a://path-to-private-table/" + table_name + "/")
except Exception as e:
print(f"Table {table_name}: Error while writing. Job aborted
{str(e)}")
t2 = time.time()
print(f"Table {table_name}: Writing to hudi completed in {(t2-t1):.2f}
seconds")
return True
# Define the schema
schema = StructType([
StructField("id", IntegerType(), True),
StructField("b", IntegerType(), True),
StructField("c", StringType(), True),
StructField("d", DateType(), True),
StructField("timestamp", TimestampType(), True),
StructField("f", StringType(), True)
])
df = spark.createDataFrame([
Row(id=1, b=2, c='string1', d=date(2000, 1, 1),
timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f =
"2000-01-01 12:00:00"),
Row(id=2, b=3, c='string2', d=date(2000, 2, 1),
timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f =
"2000-01-01 12:21:20"),
Row(id=4, b=5, c='string3', d=date(2000, 3, 1),
timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f =
"2000-01-03 12:00:00")
], schema)
df.write.format("org.apache.hudi") \
.options(**common_config) \
.mode("append") \
.save("file:///tmp/issue_8625_2")
newdf = spark.createDataFrame([
Row(id=4, b=16, c=None, d=None, timestamp=datetime.strptime("2000-01-01
12:35:00", '%Y-%m-%d %H:%M:%S'), f=None)
], schema)
newdf.show()
newdf.write.format("org.apache.hudi") \
.options(**common_config) \
.mode("append") \
.save("file:///tmp/issue_8625_2")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]