stackfun opened a new issue #1860:
URL: https://github.com/apache/hudi/issues/1860


   **Describe the problem you faced**
   
   In one pyspark job, I'm appending 10 rows to a COW table in a loop
   In another pyspark job, I'm doing a select count(*) on the same table in 
another loop.
   
   When querying using the Spark Datasource API, the count is unpredictable, 
sometimes returning the right amount of rows. 
   When querying using hive, the select count(*) query returns expected results.
   
   **To Reproduce**
   
   I'm running two pyspark jobs simultaneously in GCP using dataproc.
   
   Writer Job
   ```python
   from pyspark.sql import SparkSession, functions
   import time
   
   table_name = "hudi_trips_cow"
   hudi_options = {
       "hoodie.table.name": table_name,
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.write.partitionpath.field": "continent,country,city",
       "hoodie.datasource.write.table.name": table_name,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.datasource.hive_sync.enable": True,
       "hoodie.datasource.hive_sync.database": "default",
       "hoodie.datasource.hive_sync.table": table_name,
       "hoodie.datasource.hive_sync.username": "hive",
       "hoodie.datasource.hive_sync.password": "hive",
       "hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://localhost:10000",
       "hoodie.datasource.hive_sync.partition_fields": "continent,country,city",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
   }
   
   def execute(spark: SparkSession, output_path: str):
       start_time = time.time()
       while time.time() < start_time + 60 * 15:
           df = generate_trips(spark)
           
df.write.format("hudi").options(**hudi_options).mode("append").save(output_path)
   
   def generate_trips(spark):
       sc = spark.sparkContext
       dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
       inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
           dataGen.generateInserts(10)
       )
       # split partitionspath, necessary to sync with hive
       df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
       split_col = functions.split(df["partitionpath"], "/")
       df = df.withColumn("continent", split_col.getItem(0))
       df = df.withColumn("country", split_col.getItem(1))
       return df.withColumn("city", split_col.getItem(2))
   
   spark = (
       SparkSession.builder.appName("test")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .enableHiveSupport()
       .getOrCreate()
   )
   execute(spark, "gs://random-gcs-folder-3adf/hudi-data")
   ```
   
   Reader Job
   ```python
   from pyspark.sql import SparkSession, functions, HiveContext
   from pyspark.sql.functions import col
   import time
   
   def spark_query(spark: SparkSession, input_path: str):
       df = spark.read.format("org.apache.hudi").load(input_path + "/*/*/*/*")
       df.createOrReplaceTempView("trips_spark_temp")
       spark.catalog.refreshTable("trips_spark_temp")
       print("Spark Query:")
       spark.sql("select count(*) from trips_spark_temp").show()
   
   def hive_query(hive_context: HiveContext):
       hudi_trips_table = hive_context.table("default.hudi_trips_cow")
       hudi_trips_table.createOrReplaceTempView("trips_temp")
       hive_context.sql("REFRESH TABLE trips_temp")
       print("Hive Query:")
       hive_context.sql("select count(*) from trips_temp").show()
   
   def execute(spark: SparkSession, input_path: str):
       hive_context = HiveContext(spark.sparkContext)
   
       start_time = time.time()
       while time.time() < start_time + (15 * 60):
           spark_query(spark, input_path)
           hive_query(hive_context)
   
   spark = (
       SparkSession.builder.appName("test")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .enableHiveSupport()
       .getOrCreate()
   )
   execute(spark, "gs://random-gcs-folder-3adf/hudi-data")
   ```
   Output from Reader Job:
   ```
   Spark Query:
   +--------+
   |count(1)|
   +--------+
   |     545|
   +--------+
   
   Hive Query:
   +--------+
   |count(1)|
   +--------+
   |    1750|
   +--------+
   
   Spark Query:
   +--------+
   |count(1)|
   +--------+
   |     1760|
   +--------+
   
   Hive Query:
   +--------+
   |count(1)|
   +--------+
   |    1760|
   +--------+
   
   ```
   
   **Expected behavior**
   
   Queries using spark datasource API should match the hive queries.
   
   **Environment Description**
   
   * Hudi version : 0.5.3
   
   * Spark version : 2.4.5
   
   * Hive version : 2.3.7
   
   * Hadoop version : 2.10
   
   * Storage (HDFS/S3/GCS..) : GCS
   
   * Running on Docker? (yes/no) : no
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to