Tyler-Rendina opened a new issue, #8974:
URL: https://github.com/apache/hudi/issues/8974

   **Describe the problem you faced**
   
   Issue with readStream pulling column values back to another column when a 
previous column is not present in the batch.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Upstream glue 4.0 job upserts parquet CoW data from kinesis stream 
successfully using a near identical method to this 
[demo](https://github.com/soumilshah1995/Build-Glue-Spark-Streaming-pipeline-for-clicksstreams-and-power-data-lake-with-Apache-Hudi-and-Quer/tree/main).
   2. Launch spark in notebook on ipykernel running pyspark
   ```python
   import os
   import sys
   import re
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import *
   from pyspark.sql.types import StringType, IntegerType
   from pyspark.ml import PipelineModel
   import sparknlp
   from sparknlp.annotator import *
   from sparknlp.base import *
   from sparknlp.functions import *
   
   packages = [
       "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1",
       "org.apache.hadoop:hadoop-aws:3.3.2",
       "com.amazonaws:aws-java-sdk-bundle:1.12.477",
       "com.johnsnowlabs.nlp:spark-nlp-aarch64_2.12:4.4.3"
   ]
   SUBMIT_ARGS = f"--packages={','.join(packages)} pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ["PYSPARK_PYTHON"] = sys.executable
   os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
   os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID")
   os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY")
   os.environ["AWS_REGION"] = "us-east-1"
   
   spark = (
       SparkSession.builder.config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
       .config("spark.sql.hive.convertMetastoreParquet", "false")
       .config("spark.driver.memory", "10g")
       .config("spark.hadoop.fs.s3a.path.style.access", "true")
       .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID"))
       .config("spark.hadoop.fs.s3a.secret.key", 
os.getenv("AWS_SECRET_ACCESS_KEY"))
       .config("spark.hadoop.fs.s3a.endpoint", "s3.us-east-1.amazonaws.com")
       .config("spark.hadoop.fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")
       .config(
           "spark.driver.extraJavaOptions", 
"-Dcom.amazonaws.services.s3.enableV4=true"
       )
       .getOrCreate()
   ```
   3. Launch stream (note all debug output modes have been tested with the same 
result)
   ```python
   def foreach_batch_function(df, epoch_id):
       df.show()
   
   df_bw = (
       spark
       .readStream
       .format("hudi")
       .load("s3a://path/to/table")
       .writeStream
       .foreachBatch(foreach_batch_function)
       .start()
   )
   ```
   
   **Expected behavior**
   
   (Added selectExpr on some columns to highlight the issue)
   Batches to have consistent data in each named column i.e:
   
   4
   +----------+--------------------+--------------------+
   | published|          resourceid|            fulltext|
   +----------+--------------------+--------------------+
   |2023-06-14|c6e820d0640887665...|...audiences thro...|
   +----------+--------------------+--------------------+
   
   5
   +----------+--------------------+--------------------+
   | published|          resourceid|            fulltext|
   +----------+--------------------+--------------------+
   |2023-06-14|747473179dc94626d...|Indusladies Forum...|
   |2023-06-14|05a0a824663209bd5...|Indusladies Forum...|
   |2023-06-14|8984d7d17788562ad...|these, projected ...|
   +----------+--------------------+--------------------+
   
   But the next batch is returned as (note fulltext is an initial column so it 
wasn't distorted):
   
   6
   +--------------------+--------------------+--------------------+
   |           published|          resourceid|            fulltext|
   +--------------------+--------------------+--------------------+
   |1c2fc53b259cb0b52...|[https://news.dove...|...Pharmaceutica...|
   |5c7aeb1014894b03f...|[https://going-pos...|...Biontech lore...|
   |91ed3fc50812240b7...|[http://www.allusa...|...whos lorepsum...|
   +--------------------+--------------------+--------------------+
   
   **Environment Description**
   
   * Hudi version : 0.13.1
   
   * Spark version : 3.3.2
   
   * Hive version :
   
   * Hadoop version : 3.3.2
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : yes - jupyter/pyspark-notebook:spark-3.3.2 
(using this to develop EMR scripts)
   
   
   **Additional context**
   
   This issue doesn't seem to appear anywhere else and never occurs when I read 
the whole table.  I suspect it could have something to do with the upsert 
schema parquet when columns are missing.
   
   **Stacktrace**
   
   When using memory sink I will get type mismatch errors for obvious reasons.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to