Tyler-Rendina commented on issue #8974:
URL: https://github.com/apache/hudi/issues/8974#issuecomment-1599011030
@ad1happy2go Of course, let me add some more context since the fix was a
python cleaning step:
### The overall AWS based ETL system developed until running into the
unexpected behavior
1. ECS Fargate container listening for API data, producing to stream.
2. Kinesis stream consumer registered as lakeformation table
3. Consumer Glue 4.0 script with hudi connector and 0.13.1 jars creating
then upserting to lakeformation hudi table
4. Local/EMR pyspark notebook leaveraging readStream to eventually transform
to EMR Serverless script.
(Credit to @soumilshah1995 for the demo on step 2 and 3)
### Problem (unexpected behavior)
The readStream function returned the first batch of data correctly,
subsequent batches would return the values of proceeding columns. This was due
to missing column(s) in all of the new records for the batch.
### Solution
In the ECS Fargate container I had already created a whitelist of columns
since the API would return one-off extra fields, some containing structs
inconsistent with preceding schemas. This created issues upserting data from
glue while initially developing the pipeline.
I added one more step into the whitelist filter that would add a null value
to any missing fields from the whitelist:
```python
def _whitelisted_fields(x):
whitelist = ["fields", "to_keep", "or_add"]
x = {k.lower(): v for k, v in x.items() if k.lower() in whitelist}
for key in whitelist:
if key not in x.keys():
x[key] = None
return x
```
### Thoughts
I spent a ton of time looking for a solution to this since non-incremental
queries did not have this issue, which led me to believe I had an issue
installing 0.13 or had a config issue with hudi. I'm not sure if this actually
is because the parquet file is picked up by readStream before some cleaning can
happen, but I thought the behavior was interesting enough to mention to the
community.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]