luky777 opened a new issue, #9844:
URL: https://github.com/apache/hudi/issues/9844

   **Describe the problem you faced**
   Hi,
   I Can't get Procedures working in Glue version 4. 
   Getting error: 
   py4j.protocol.Py4JJavaError: An error occurred while calling o96.sql. : 
java.lang.AssertionError: assertion failed: It's not a Hudi table
   
   
   Here is my Glue script
   ```
   import sys
   from awsglue.transforms import *
   from awsglue.utils import getResolvedOptions
   from pyspark.context import SparkContext
   from pyspark.sql.session import SparkSession
   from awsglue.context import GlueContext
   from awsglue.job import Job
   from pyspark.sql.functions import concat_ws, col, split, size, lit
   from pyspark.sql.types import StringType, BooleanType, DateType
   import pyspark.sql.functions as F
   
   args = getResolvedOptions(sys.argv, ["JOB_NAME", "hudi_table_name", 
"database_name", "s3_path_hudi", "s3_source", "id_column"])
   spark = 
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer')
 \
       .config('spark.sql.hive.convertMetastoreParquet','false') \
       .config('spark.sql.catalog.spark_catalog', 
'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
       
.config('spark.sql.extensions','org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
 \
       .config('spark.sql.legacy.pathOptionBehavior.enabled', 
'true').getOrCreate()
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   job.init(args["JOB_NAME"], args)
   
   hudi_table_name = args["hudi_table_name"]
   database_name = args["database_name"]
   s3_path_hudi = args["s3_path_hudi"]
   s3_source = args["s3_source"]
   id_column = args["id_column"]
   
   #Load S3 Raw Data
   S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
       format_options={"multiline": True},
       connection_type="s3",
       format="parquet",
       connection_options={
           "paths": [
               s3_source
           ],
           "recurse": True,
       },
       transformation_ctx="S3bucket_node1",
   )
   
   # Hudi Settings
   def get_hudi_connection_option(write_operation_type):
           connection_params = {
               "className": "org.apache.hudi",
               "hoodie.table.name": hudi_table_name,
               "path": s3_path_hudi,
               "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
               "hoodie.datasource.write.operation": write_operation_type,
               "hoodie.datasource.write.precombine.field": "timestamp",
               "hoodie.datasource.write.recordkey.field": id_column,
               "hoodie.datasource.write.hive_style_partitioning": "true",
               "hoodie.datasource.hive_sync.enable": "true",
               "hoodie.datasource.hive_sync.sync_as_datasource": "false",
               "hoodie.datasource.hive_sync.database": database_name,
               "hoodie.datasource.hive_sync.table": hudi_table_name,
               "hoodie.datasource.hive_sync.use_jdbc": "false",
               "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
               "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS" ,
               "hoodie.cleaner.fileversions.retained": 1 ,
               "hoodie.cleaner.parallelism": 200,
               "hoodie.upsert.shuffle.parallelism": 200,
               "hoodie.datasource.write.payload.class" : 
"org.apache.hudi.payload.AWSDmsAvroPayload",
               "hoodie.datasource.write.transformer.class" 
:"org.apache.hudi.utilities.transform.AWSDmsTransformer",
           }
           return connection_params
   
   #Save UPSERT to Hudi Table
   if S3bucket_node1.count() > 0:
       print("TOTAL: ", S3bucket_node1.count())
       hudi = glueContext.write_dynamic_frame.from_options(
                   frame=S3bucket_node1,
                   connection_type="marketplace.spark",
                   connection_options=get_hudi_connection_option("upsert"),
                   transformation_ctx="hudi",
               ) 
   
   # Get the database and table names
   db_name = database_name
   table_name = hudi_table_name
   
   # Create a query to call the Hudi Call Procedure
   query = f"call show_commits('{db_name}.{table_name}', 5)"
   # query = f"call help(cmd => 'show_commits')"
   # query = "call show_savepoints('adpdb.hudi_raw_revenue_property')"
   
   # Execute the query and show the results
   spark_df_commits = spark.sql(query)
   spark_df_commits.show()
       
   job.commit()
   ```
   
   This is what I have in Error Log: 
   
   <html>
   <body>
   <!--StartFragment-->
   
   2023-10-10 14:00:56,574 ERROR [main] glue.ProcessLauncher 
(Logging.scala:logError(77)): Error from Python:Traceback (most recent call 
last):  File "/tmp/Hudi_CLI_Test.py", line 89, in <module>    spark_df_commits 
= spark.sql(query)  File 
"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, 
in sql    return DataFrame(self._jsparkSession.sql(sqlQuery), self)  File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 
1321, in __call__    return_value = get_return_value(  File 
"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in 
deco    return f(*a, **kw)  File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 
326, in get_return_value    raise Py4JJavaError(py4j.protocol.Py4JJavaError: An 
error occurred while calling o96.sql.: java.lang.AssertionError: assertion 
failed: It's not a Hudi table        at scala.Predef$.assert(Predef.scala:223)  
     at org.apache.spark.sql.catalyst.catalog.HoodieCata
 logTable.<init>(HoodieCatalogTable.scala:51)   at 
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:367)
        at 
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:363)
        at 
org.apache.hudi.HoodieCLIUtils$.getHoodieCatalogTable(HoodieCLIUtils.scala:70)  
     at 
org.apache.spark.sql.hudi.command.procedures.ShowCommitsProcedure.call(ShowCommitsProcedure.scala:82)
        at 
org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand.run(CallProcedureHoodieCommand.scala:33)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
       at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
 at 
org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
      at org.apache.spark.sql.catalyst
 .QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)     at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
     at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
   at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
      at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
   at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
   at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
       at org.apache.
 
spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
      at 
org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
 at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
    at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$super$transformDownWithPruning(LogicalPlan.scala:30)
      at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
       at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalP
 lan.scala:30)  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
       at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)  
     at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
    at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
       at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222)       at 
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102)   at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at 
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)       at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)     at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at 
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)        at 
sun.reflect.NativeMethodAccessorI
 mpl.invoke0(Native Method)     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)     at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)   at 
py4j.Gateway.invoke(Gateway.java:282)        at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at 
py4j.commands.CallCommand.execute(CallCommand.java:79)       at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at 
py4j.ClientServerConnection.run(ClientServerConnection.java:106)     at 
java.lang.Thread.run(Thread.java:750) | 2023-10-10 14:00:56,574 ERROR [main] 
glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback 
(most recent call last): File "/tmp/Hudi_CLI_Test.py", line 89, in <module> 
spark_df_commits = spark.sql(query) File "/opt/amazon/spark/
 python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql return 
DataFrame(self._jsparkSession.sql(sqlQuery), self) File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 
1321, in __call__ return_value = get_return_value( File 
"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in 
deco return f(*a, **kw) File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 
326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An 
error occurred while calling o96.sql. : java.lang.AssertionError: assertion 
failed: It's not a Hudi table at scala.Predef$.assert(Predef.scala:223) at 
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.<init>(HoodieCatalogTable.scala:51)
 at 
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:367)
 at 
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:363)
 at org.apache.hudi.HoodieCLIUtils
 $.getHoodieCatalogTable(HoodieCLIUtils.scala:70) at 
org.apache.spark.sql.hudi.command.procedures.ShowCommitsProcedure.call(ShowCommitsProcedure.scala:82)
 at 
org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand.run(CallProcedureHoodieCommand.scala:33)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
 at 
org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
 at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
 at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
 at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
 at org.apache.spark.sql.execution.
 SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
 at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
 at 
org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
 at 
org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
 at org.apache.spark.sql.catalyst.trees.TreeNode
 .$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$super$transformDownWithPruning(LogicalPlan.scala:30)
 at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
 at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) 
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommand
 s(QueryExecution.scala:96) at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
 at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
 at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222) at 
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at 
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99) at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at 
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.Meth
 odInvoker.invoke(MethodInvoker.java:244) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at 
py4j.Gateway.invoke(Gateway.java:282) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at 
py4j.ClientServerConnection.run(ClientServerConnection.java:106) at 
java.lang.Thread.run(Thread.java:750)
   -- | --
   
   
   <!--EndFragment-->
   </body>
   </html>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to