chinmay-032 commented on issue #8625:
URL: https://github.com/apache/hudi/issues/8625#issuecomment-1534283709

   **Another Update:**
   I think we have pinpointed where we're getting error. When the dataframe is 
created using a json strings (and cast into a Structtype() schema), it fails to 
do partial updates. Relevant code which was used (can be useful for 
reproduction of issue): 
   ```
   from datetime import *
   
   ### First we try using pyspark.sql.Row API: 
   df = spark.createDataFrame([Row(profile_id="172597", 
timestamp=datetime.fromtimestamp(1683010485669/1000), 
id="4c33de14-986c-444d-8b94-e2c8905c83ca", Enjoy=None, DOB=None, zip="560037", 
country="India", email_vendor="Mailchimp", city="Bangalore", 
active_audience=57890.0, last_name="Guy", migrated_from="Moengage", 
product_range=["Laptop", "Mobile", "Headphones"], email_sub=True, 
sms_vendor="Serfo", audience_count=80000.0, whatsapp_vendor="Gupshup", 
first_name="Some")] , json_schema)
   df = df.withColumn("timestamp__date_", to_date(df["timestamp"]))
   table_name = "row_api_table"
   df.write.format("org.apache.hudi") \
               .options(**common_config) \
               .option('hoodie.table.name', table_name) \
               .option('hoodie.datasource.hive_sync.table', table_name) \
               .mode("append") \
               .save("s3a://path-to-bucket/" + table_name + "/")
   
   newDf = spark.createDataFrame([Row(profile_id="172597", 
timestamp=datetime.fromtimestamp(1683016101429/1000), 
id="39cff44b-22c7-41d1-bc66-7d03ff38e4b9", Enjoy=None, DOB=None, zip="560037", 
country="India", email_vendor=None, city="Bangalore", active_audience=75000.0, 
last_name="Guy", migrated_from=None, product_range=None, email_sub=True, 
sms_vendor=None, audience_count=95000.0, whatsapp_vendor=None, 
first_name="Some")] , json_schema)
   table_name = "row_api_table"
   newDf = newDf.withColumn("timestamp__date_", to_date(newDf["timestamp"]))
   newDf.write.format("org.apache.hudi") \
               .options(**common_config) \
               .option('hoodie.table.name', table_name) \
               .option('hoodie.datasource.hive_sync.table', table_name) \
               .mode("append") \
               .save("s3a://path-to-bucket/" + table_name + "/")
   
   ## The above makes the partial update!
   
   ### Using json string: 
   json_string1 = """
   {
     "zip": "560037",
     "country": "India",
     "email_vendor": "Mailchimp",
     "city": "Bangalore",
     "active_audience": 57890,
     "last_name": "Guy",
     "migrated_from": "Moengage",
     "product_range": [
       "Laptop",
       "Mobile",
       "Headphones"
     ],
     "email_sub": true,
     "profile_id": "172597",
     "audience_count": 80000,
     "sms_vendor": "Serfo",
     "whatsapp_vendor": "Gupshup",
     "id": "4c33de14-986c-444d-8b94-e2c8905c83ca",
     "first_name": "Some",
     "timestamp": 1683010485669
   }
   """
   json_df = spark.read.json(sc.parallelize([json_string1]), json_schema)
   json_df = json_df.withColumn("timestamp__date_", 
to_date(json_df["timestamp"]))
   table_name = "json_data_table"
   json_df.write.format("org.apache.hudi") \
               .options(**common_config) \
               .option('hoodie.table.name', table_name) \
               .option('hoodie.datasource.hive_sync.table', table_name) \
               .mode("append") \
               .save("s3a://path-to-bucket/" + table_name + "/")
   json_string2 = """
   {
     "zip": "560037",
     "country": "India",
     "email_sub": true,
     "city": "Bangalore",
     "profile_id": "172597",
     "audience_count": 95000,
     "active_audience": 77800,
     "last_name": "Guy",
     "id": "39cff44b-22c7-41d1-bc66-7d03ff38e4b9",
     "first_name": "Some",
     "timestamp": 1683016101429
   }
   """
   json_df1 = spark.read.json(sc.parallelize([json_string2]), json_schema)
   json_df1 = json_df1.withColumn("timestamp__date_", 
to_date(json_df1["timestamp"]))
   table_name = "json_data_table"
   json_df1.write.format("org.apache.hudi") \
               .options(**common_config) \
               .option('hoodie.table.name', table_name) \
               .option('hoodie.datasource.hive_sync.table', table_name) \
               .mode("append") \
               .save("s3a://path-to-bucket/" + table_name + "/")
   
   ## Overwrote existing data with null values.
   ```
   
   The config used is the same as given in initial comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to