stackfun opened a new issue #1860:
URL: https://github.com/apache/hudi/issues/1860
**Describe the problem you faced**
In one pyspark job, I'm appending 10 rows to a COW table in a loop
In another pyspark job, I'm doing a select count(*) on the same table in
another loop.
When querying using the Spark Datasource API, the count is unpredictable,
sometimes returning the right amount of rows.
When querying using hive, the select count(*) query returns expected results.
**To Reproduce**
I'm running two pyspark jobs simultaneously in GCP using dataproc.
Writer Job
```python
from pyspark.sql import SparkSession, functions
import time
table_name = "hudi_trips_cow"
hudi_options = {
"hoodie.table.name": table_name,
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.partitionpath.field": "continent,country,city",
"hoodie.datasource.write.table.name": table_name,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.hive_sync.enable": True,
"hoodie.datasource.hive_sync.database": "default",
"hoodie.datasource.hive_sync.table": table_name,
"hoodie.datasource.hive_sync.username": "hive",
"hoodie.datasource.hive_sync.password": "hive",
"hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://localhost:10000",
"hoodie.datasource.hive_sync.partition_fields": "continent,country,city",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
}
def execute(spark: SparkSession, output_path: str):
start_time = time.time()
while time.time() < start_time + 60 * 15:
df = generate_trips(spark)
df.write.format("hudi").options(**hudi_options).mode("append").save(output_path)
def generate_trips(spark):
sc = spark.sparkContext
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
dataGen.generateInserts(10)
)
# split partitionspath, necessary to sync with hive
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
split_col = functions.split(df["partitionpath"], "/")
df = df.withColumn("continent", split_col.getItem(0))
df = df.withColumn("country", split_col.getItem(1))
return df.withColumn("city", split_col.getItem(2))
spark = (
SparkSession.builder.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
)
execute(spark, "gs://random-gcs-folder-3adf/hudi-data")
```
Reader Job
```python
from pyspark.sql import SparkSession, functions, HiveContext
from pyspark.sql.functions import col
import time
def spark_query(spark: SparkSession, input_path: str):
df = spark.read.format("org.apache.hudi").load(input_path + "/*/*/*/*")
df.createOrReplaceTempView("trips_spark_temp")
spark.catalog.refreshTable("trips_spark_temp")
print("Spark Query:")
spark.sql("select count(*) from trips_spark_temp").show()
def hive_query(hive_context: HiveContext):
hudi_trips_table = hive_context.table("default.hudi_trips_cow")
hudi_trips_table.createOrReplaceTempView("trips_temp")
hive_context.sql("REFRESH TABLE trips_temp")
print("Hive Query:")
hive_context.sql("select count(*) from trips_temp").show()
def execute(spark: SparkSession, input_path: str):
hive_context = HiveContext(spark.sparkContext)
start_time = time.time()
while time.time() < start_time + (15 * 60):
spark_query(spark, input_path)
hive_query(hive_context)
spark = (
SparkSession.builder.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
)
execute(spark, "gs://random-gcs-folder-3adf/hudi-data")
```
Output from Reader Job:
```
Spark Query:
+--------+
|count(1)|
+--------+
| 545|
+--------+
Hive Query:
+--------+
|count(1)|
+--------+
| 1750|
+--------+
Spark Query:
+--------+
|count(1)|
+--------+
| 1760|
+--------+
Hive Query:
+--------+
|count(1)|
+--------+
| 1760|
+--------+
```
**Expected behavior**
Queries using spark datasource API should match the hive queries.
**Environment Description**
* Hudi version : 0.5.3
* Spark version : 2.4.5
* Hive version : 2.3.7
* Hadoop version : 2.10
* Storage (HDFS/S3/GCS..) : GCS
* Running on Docker? (yes/no) : no
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]