Tyler-Rendina opened a new issue, #8974: URL: https://github.com/apache/hudi/issues/8974
**Describe the problem you faced** Issue with readStream pulling column values back to another column when a previous column is not present in the batch. **To Reproduce** Steps to reproduce the behavior: 1. Upstream glue 4.0 job upserts parquet CoW data from kinesis stream successfully using a near identical method to this [demo](https://github.com/soumilshah1995/Build-Glue-Spark-Streaming-pipeline-for-clicksstreams-and-power-data-lake-with-Apache-Hudi-and-Quer/tree/main). 2. Launch spark in notebook on ipykernel running pyspark ```python import os import sys import re from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import StringType, IntegerType from pyspark.ml import PipelineModel import sparknlp from sparknlp.annotator import * from sparknlp.base import * from sparknlp.functions import * packages = [ "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1", "org.apache.hadoop:hadoop-aws:3.3.2", "com.amazonaws:aws-java-sdk-bundle:1.12.477", "com.johnsnowlabs.nlp:spark-nlp-aarch64_2.12:4.4.3" ] SUBMIT_ARGS = f"--packages={','.join(packages)} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ["PYSPARK_PYTHON"] = sys.executable os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID") os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY") os.environ["AWS_REGION"] = "us-east-1" spark = ( SparkSession.builder.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.hive.convertMetastoreParquet", "false") .config("spark.driver.memory", "10g") .config("spark.hadoop.fs.s3a.path.style.access", "true") .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) .config("spark.hadoop.fs.s3a.endpoint", "s3.us-east-1.amazonaws.com") .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config( "spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true" ) .getOrCreate() ``` 3. Launch stream (note all debug output modes have been tested with the same result) ```python def foreach_batch_function(df, epoch_id): df.show() df_bw = ( spark .readStream .format("hudi") .load("s3a://path/to/table") .writeStream .foreachBatch(foreach_batch_function) .start() ) ``` **Expected behavior** (Added selectExpr on some columns to highlight the issue) Batches to have consistent data in each named column i.e: 4 +----------+--------------------+--------------------+ | published| resourceid| fulltext| +----------+--------------------+--------------------+ |2023-06-14|c6e820d0640887665...|...audiences thro...| +----------+--------------------+--------------------+ 5 +----------+--------------------+--------------------+ | published| resourceid| fulltext| +----------+--------------------+--------------------+ |2023-06-14|747473179dc94626d...|Indusladies Forum...| |2023-06-14|05a0a824663209bd5...|Indusladies Forum...| |2023-06-14|8984d7d17788562ad...|these, projected ...| +----------+--------------------+--------------------+ But the next batch is returned as (note fulltext is an initial column so it wasn't distorted): 6 +--------------------+--------------------+--------------------+ | published| resourceid| fulltext| +--------------------+--------------------+--------------------+ |1c2fc53b259cb0b52...|[https://news.dove...|...Pharmaceutica...| |5c7aeb1014894b03f...|[https://going-pos...|...Biontech lore...| |91ed3fc50812240b7...|[http://www.allusa...|...whos lorepsum...| +--------------------+--------------------+--------------------+ **Environment Description** * Hudi version : 0.13.1 * Spark version : 3.3.2 * Hive version : * Hadoop version : 3.3.2 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : yes - jupyter/pyspark-notebook:spark-3.3.2 (using this to develop EMR scripts) **Additional context** This issue doesn't seem to appear anywhere else and never occurs when I read the whole table. I suspect it could have something to do with the upsert schema parquet when columns are missing. **Stacktrace** When using memory sink I will get type mismatch errors for obvious reasons. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
