VIKASPATID commented on issue #4635:
URL: https://github.com/apache/hudi/issues/4635#issuecomment-1049491206
Hi @nsivabalan,
Here is the reproducible code
<details>
<summary> pyspark script </summary>
```
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col, to_timestamp,
monotonically_increasing_id, to_date, when
from pyspark.sql.types import *
import time
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, when, expr
import argparse
import threading
spark = SparkSession.builder.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet',
'false').getOrCreate()
sc = spark.sparkContext
table_name = None
table_path = None
header = [["A0", "STRING"], ["A1", "STRING"], ["A2", "STRING"], ["A3",
"STRING"], ["A4", "STRING"], ["A5", "INTEGER"], ["A6", "INTEGER"], ["A7",
"SHORT"], ["A8", "INTEGER"], ["A9", "LONG"], ["A10", "DOUBLE"], ["A11",
"INTEGER"], ["A12", "LONG"], ["A13", "DOUBLE"], ["A14", "LONG"], ["A15",
"DOUBLE"], ["A16", "DOUBLE"], ["A17", "INTEGER"], ["A18", "SHORT"], ["A19",
"DOUBLE"], ["A20", "INTEGER"], ["A21", "SHORT"], ["A22", "DOUBLE"], ["A23",
"STRING"], ["A24", "STRING"], ["A25", "INTEGER"], ["A26", "INTEGER"], ["A27",
"STRING"], ["A28", "INTEGER"], ["A29", "INTEGER"], ["A30", "STRING"], ["A31",
"DOUBLE"], ["A32", "DOUBLE"], ["A33", "STRING"], ["A34", "DOUBLE"], ["A35",
"INTEGER"], ["A36", "SHORT"], ["A37", "STRING"], ["A38", "DOUBLE"], ["A39",
"STRING"], ["A40", "STRING"], ["A41", "STRING"], ["A42", "STRING"], ["A43",
"STRING"], ["A44", "INTEGER"], ["A45", "LONG"], ["A46", "LONG"], ["A47",
"LONG"], ["A48", "LONG"], ["A49", "LONG"], ["A50", "LONG"], ["A51", "INTEGER"],
["A52", "INTEGER
"], ["A53", "INTEGER"], ["A54", "INTEGER"], ["A55", "INTEGER"], ["A56",
"DOUBLE"], ["A57", "DOUBLE"], ["A58", "DOUBLE"], ["A59", "DOUBLE"], ["A60",
"LONG"], ["A61", "STRING"], ["A62", "DOUBLE"], ["A63", "STRING"], ["A64",
"DOUBLE"], ["A65", "DOUBLE"], ["A66", "LONG"], ["A67", "LONG"]]
common_config = {
'className' : 'org.apache.hudi',
'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
'hoodie.write.lock.provider':'org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider',
'hoodie.cleaner.policy.failed.writes':'LAZY',
'hoodie.write.lock.zookeeper.url':'xxxxxxx',
'hoodie.write.lock.zookeeper.port':'2181',
'hoodie.write.lock.zookeeper.lock_key': f"{table_name}",
'hoodie.write.lock.zookeeper.base_path':'/hudi',
'hoodie.datasource.write.row.writer.enable': 'false',
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field': 'A1,A9',
'hoodie.datasource.write.partitionpath.field': 'A2,A5',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.precombine.field': "A5",
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.enable': 'false',
'hoodie.compaction.payload.class':
'org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload',
'hoodie.datasource.hive_sync.table': f"{table_name}",
'hoodie.datasource.hive_sync.partition_fields': 'A2,A5',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.copyonwrite.record.size.estimate': 256,
'hoodie.write.lock.client.wait_time_ms': 1000,
'hoodie.write.lock.client.num_retries': 50
}
init_load_config = {
'hoodie.parquet.max.file.size': 1024*1024*1024,
'hoodie.bulkinsert.shuffle.parallelism': 10,
'compactionSmallFileSize': 100*1024*1024,
'hoodie.datasource.write.operation': 'bulk_insert',
'hoodie.write.markers.type': "DIRECT"
#'hoodie.compact.inline': True
# 'hoodie.datasource.write.insert.drop.duplicates' : 'true'
}
increamental_config = {
'hoodie.upsert.shuffle.parallelism': 1,
'hoodie.insert.shuffle.parallelism': 1,
'hoodie.cleaner.commits.retained': 1,
'hoodie.clean.automatic': True
}
def get_parameters():
parser = argparse.ArgumentParser(
description='Usage: --table_path=<path of table>
--table_name=<table_name>')
parser.add_argument('--table_path', help='table_path', required=True)
parser.add_argument('--table_name', help='table_name', required=True)
(args, unknown) = parser.parse_known_args()
return args
def main():
global table_path
global table_name
params = get_parameters()
table_path = params.table_path
table_name = params.table_name
common_config['hoodie.table.name'] = table_name
common_config['hoodie.datasource.hive_sync.table'] = table_name
common_config['path'] = table_path
schema = ",".join([ f"{field[0]} {field[1]}" for field in header])
records = [
['A','ABC','DEF','USA','1',20211215,1,2,3,-4,None,None,None,None,None,None,5.19,None,None,0.0,None,None,None,'0',None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,1,0,None,8,None,7,6,None,None,None,None,None,None,None,None,None,None,None,None,None]
]
df = spark.createDataFrame(records, schema)
bulk_insert(df)
print("Wrote 1 file")
records = [
['A','ABC','DEF','USA','1',20211215,1,2,3,-4,None,None,None,None,None,None,5.19,None,None,0.0,None,None,None,'0',None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,1,0,None,8,None,7,6,None,None,None,None,None,None,None,None,None,None,None,None,None],
['A','ABC','EEF','USA','1',20211215,1,2,3,-4,None,None,None,None,None,None,5.19,None,None,0.0,None,None,None,'0',None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,1,0,None,8,None,7,6,None,None,None,None,None,None,None,None,None,None,None,None,None],
['A','ABC','FEF','USA','1',20211215,1,2,3,-4,None,None,None,None,None,None,5.19,None,None,0.0,None,None,None,'0',None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,1,0,None,8,None,7,6,None,None,None,None,None,None,None,None,None,None,None,None,None],
['A','ABC','GEF','USA','1',20211215,1,2,3,-4,None,None,None,None,None,None,5.19,None,None,0.0,None,None,None,'0',None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,1,0,None,8,None,7,6,None,None,None,None,None,None,None,None,None,None,None,None,None],
['A','ABC','HEF','USA','1',20211215,1,2,3,-4,None,None,None,None,None,None,5.19,None,None,0.0,None,None,None,'0',None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,1,0,None,8,None,7,6,None,None,None,None,None,None,None,None,None,None,None,None,None],
]
df = spark.createDataFrame(records, schema)
for i in range(1,20):
print(f"Writing file #{i+1}")
bulk_insert(df)
print("Job finished")
def bulk_insert(input_df):
isos = input_df.select("A2").distinct().rdd.flatMap(lambda x:
x).collect()
dfs = {iso:input_df.where(input_df.A2 == iso) for iso in isos}
combinedConf = {**common_config, **init_load_config }
#dfs = {"ALL": input_df}
print("Total frames: {}".format(len(dfs)))
print("Running bulk-insert in table {}:{}".format(table_path,
table_name))
cnt = 1
#executor = ThreadPoolExecutor(len(dfs))
tlist = list()
for iso in dfs:
df = dfs[iso]
print("Writing dataframe: {}".format(cnt))
t = BulkWriterThread()
t.set(df, iso, combinedConf)
t.start()
tlist.append(t)
time.sleep(1)
print("Wrote")
cnt += 1
print("Waiting for finish")
for t in tlist:
t.join()
print("Wait finished")
print("Write completed")
class BulkWriterThread(threading.Thread):
def run(self):
name = threading.current_thread().name
print(f"{name}: writing {self.iso} data")
self.exc = None
try:
self.df.write.format('org.apache.hudi').option('hoodie.datasource.write.operation',
'bulk_insert').options(**self.conf).mode('append').save()
#glueContext.write_dynamic_frame.from_options(frame =
DynamicFrame.fromDF(self.df, glueContext, "df"), connection_type =
"marketplace.spark", connection_options = self.conf)
print(f"{name}: {self.iso} data written")
except Exception as e:
print(e)
self.exc = e
def set(self, df, iso, conf):
self.df = df
self.iso = iso
self.conf = conf
def join(self):
print("Joining")
name = threading.current_thread().name
threading.Thread.join(self)
print("Joined")
if self.exc:
print(f"{name}: Error in writing {self.iso} data")
raise self.exc
if __name__ == "__main__":
main()
```
</details>
We are runining it like this:
```
spark-submit --conf
"spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf
"spark.sql.hive.convertMetastoreParquet=false" --jars
s3://xxxx/jars/hudi-spark3-bundle_2.12-0.10.0.jar,/usr/lib/spark/external/lib/spark-avro.jar
rep.py --table_path s3://xxx/tables/rept5 --table_name=rept5
```
Please let me know if it's reproducible on your side or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]