luky777 opened a new issue, #9844:
URL: https://github.com/apache/hudi/issues/9844
**Describe the problem you faced**
Hi,
I Can't get Procedures working in Glue version 4.
Getting error:
py4j.protocol.Py4JJavaError: An error occurred while calling o96.sql. :
java.lang.AssertionError: assertion failed: It's not a Hudi table
Here is my Glue script
```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import concat_ws, col, split, size, lit
from pyspark.sql.types import StringType, BooleanType, DateType
import pyspark.sql.functions as F
args = getResolvedOptions(sys.argv, ["JOB_NAME", "hudi_table_name",
"database_name", "s3_path_hudi", "s3_source", "id_column"])
spark =
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer')
\
.config('spark.sql.hive.convertMetastoreParquet','false') \
.config('spark.sql.catalog.spark_catalog',
'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
.config('spark.sql.extensions','org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
\
.config('spark.sql.legacy.pathOptionBehavior.enabled',
'true').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
hudi_table_name = args["hudi_table_name"]
database_name = args["database_name"]
s3_path_hudi = args["s3_path_hudi"]
s3_source = args["s3_source"]
id_column = args["id_column"]
#Load S3 Raw Data
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": True},
connection_type="s3",
format="parquet",
connection_options={
"paths": [
s3_source
],
"recurse": True,
},
transformation_ctx="S3bucket_node1",
)
# Hudi Settings
def get_hudi_connection_option(write_operation_type):
connection_params = {
"className": "org.apache.hudi",
"hoodie.table.name": hudi_table_name,
"path": s3_path_hudi,
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": write_operation_type,
"hoodie.datasource.write.precombine.field": "timestamp",
"hoodie.datasource.write.recordkey.field": id_column,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.sync_as_datasource": "false",
"hoodie.datasource.hive_sync.database": database_name,
"hoodie.datasource.hive_sync.table": hudi_table_name,
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS" ,
"hoodie.cleaner.fileversions.retained": 1 ,
"hoodie.cleaner.parallelism": 200,
"hoodie.upsert.shuffle.parallelism": 200,
"hoodie.datasource.write.payload.class" :
"org.apache.hudi.payload.AWSDmsAvroPayload",
"hoodie.datasource.write.transformer.class"
:"org.apache.hudi.utilities.transform.AWSDmsTransformer",
}
return connection_params
#Save UPSERT to Hudi Table
if S3bucket_node1.count() > 0:
print("TOTAL: ", S3bucket_node1.count())
hudi = glueContext.write_dynamic_frame.from_options(
frame=S3bucket_node1,
connection_type="marketplace.spark",
connection_options=get_hudi_connection_option("upsert"),
transformation_ctx="hudi",
)
# Get the database and table names
db_name = database_name
table_name = hudi_table_name
# Create a query to call the Hudi Call Procedure
query = f"call show_commits('{db_name}.{table_name}', 5)"
# query = f"call help(cmd => 'show_commits')"
# query = "call show_savepoints('adpdb.hudi_raw_revenue_property')"
# Execute the query and show the results
spark_df_commits = spark.sql(query)
spark_df_commits.show()
job.commit()
```
This is what I have in Error Log:
<html>
<body>
<!--StartFragment-->
2023-10-10 14:00:56,574 ERROR [main] glue.ProcessLauncher
(Logging.scala:logError(77)): Error from Python:Traceback (most recent call
last): File "/tmp/Hudi_CLI_Test.py", line 89, in <module> spark_df_commits
= spark.sql(query) File
"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034,
in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self) File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line
1321, in __call__ return_value = get_return_value( File
"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in
deco return f(*a, **kw) File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line
326, in get_return_value raise Py4JJavaError(py4j.protocol.Py4JJavaError: An
error occurred while calling o96.sql.: java.lang.AssertionError: assertion
failed: It's not a Hudi table at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.sql.catalyst.catalog.HoodieCata
logTable.<init>(HoodieCatalogTable.scala:51) at
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:367)
at
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:363)
at
org.apache.hudi.HoodieCLIUtils$.getHoodieCatalogTable(HoodieCLIUtils.scala:70)
at
org.apache.spark.sql.hudi.command.procedures.ShowCommitsProcedure.call(ShowCommitsProcedure.scala:82)
at
org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand.run(CallProcedureHoodieCommand.scala:33)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
at org.apache.spark.sql.catalyst
.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.
spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
at
org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalP
lan.scala:30) at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222) at
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102) at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99) at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at
sun.reflect.NativeMethodAccessorI
mpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at
py4j.Gateway.invoke(Gateway.java:282) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at
py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at
py4j.ClientServerConnection.run(ClientServerConnection.java:106) at
java.lang.Thread.run(Thread.java:750) | 2023-10-10 14:00:56,574 ERROR [main]
glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback
(most recent call last): File "/tmp/Hudi_CLI_Test.py", line 89, in <module>
spark_df_commits = spark.sql(query) File "/opt/amazon/spark/
python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql return
DataFrame(self._jsparkSession.sql(sqlQuery), self) File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line
1321, in __call__ return_value = get_return_value( File
"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in
deco return f(*a, **kw) File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line
326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An
error occurred while calling o96.sql. : java.lang.AssertionError: assertion
failed: It's not a Hudi table at scala.Predef$.assert(Predef.scala:223) at
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.<init>(HoodieCatalogTable.scala:51)
at
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:367)
at
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:363)
at org.apache.hudi.HoodieCLIUtils
$.getHoodieCatalogTable(HoodieCLIUtils.scala:70) at
org.apache.spark.sql.hudi.command.procedures.ShowCommitsProcedure.call(ShowCommitsProcedure.scala:82)
at
org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand.run(CallProcedureHoodieCommand.scala:33)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at org.apache.spark.sql.execution.
SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
at
org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
at org.apache.spark.sql.catalyst.trees.TreeNode
.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommand
s(QueryExecution.scala:96) at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222) at
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102) at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99) at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.Meth
odInvoker.invoke(MethodInvoker.java:244) at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at
py4j.Gateway.invoke(Gateway.java:282) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at
py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at
py4j.ClientServerConnection.run(ClientServerConnection.java:106) at
java.lang.Thread.run(Thread.java:750)
-- | --
<!--EndFragment-->
</body>
</html>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]