soumilshah1995 commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440153705
Here is sample code and not sure how really to call this in pyspark
```
"""
%connections hudi-connection
%glue_version 3.0
%region us-east-1
%worker_type G.1X
%number_of_workers 3
%spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
%additional_python_modules Faker
"""
try:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.session import SparkSession
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp,
monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from awsglue.utils import getResolvedOptions
from awsglueml.transforms import EntityDetector
from pyspark.sql.types import StringType
from pyspark.sql.types import *
from datetime import datetime
import boto3
from functools import reduce
except Exception as e:
print("Error ")
spark =
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer')
\
.config('spark.sql.hive.convertMetastoreParquet','false') \
.config('spark.sql.legacy.pathOptionBehavior.enabled', 'true')
.getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()
import uuid
from faker import Faker
global faker
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data():
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales',
'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL',
'RJ')),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
faker.date()
) for x in range(100)
]
data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary",
"age", "bonus", "ts", "email", "credit_card", "date"]
spark_df = spark.createDataFrame(data=data, schema=columns)
db_name = "hudidb"
table_name="hudi_table"
recordkey = 'emp_id'
path = "s3://hudi-demos-emr-serverless-project-soumil/tmp/"
method = 'upsert'
table_type = "COPY_ON_WRITE"
precombine = "ts"
partiton_field = "date"
connection_options={
"path": path,
"connectionName": "hudi-connection",
"hoodie.datasource.write.storage.type": table_type,
'hoodie.datasource.write.precombine.field': precombine,
'className': 'org.apache.hudi',
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.hive_sync.enable': 'true',
"hoodie.datasource.hive_sync.mode":"hms",
'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
'hoodie.datasource.hive_sync.database': db_name,
'hoodie.datasource.hive_sync.table': table_name,
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.write.hive_style_partitioning': 'true',
}
WriteDF = (
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
connection_type="marketplace.spark",
connection_options=connection_options,
transformation_ctx="glue_df",
)
)
df = spark. \
read. \
format("hudi"). \
load(path)
#### tried
query = """
call show_commits(table = 'hudi_table' limit 10 );
"""
spark.sql(query).show()
#### tried
query = """
call show_commits(table => 'hudi_table', limit => 10);
"""
spark.sql(query).show()
```
#### Error Message
```
ParseException:
mismatched input 'call' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE',
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS',
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST',
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE',
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE',
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 2, pos
0)
== SQL ==
call show_commits(table = 'hudi_table' limit 10 );
^^^
``
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]