soumilshah1995 opened a new issue, #8919:
URL: https://github.com/apache/hudi/issues/8919
Hello Hope all well
while i was playing with clustering i tried few ways to perform clustering
using stored procedures. thought to create an issue so it can be tracked
```
try:
import sys
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.sql.types import *
except Exception as e:
print("Modules are missing: {}".format(e))
# Get command-line arguments
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# Create a Spark session and Glue context
spark = (SparkSession.builder.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.config('spark.sql.catalog.spark_catalog',
'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
.config('spark.sql.extensions',
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('spark.sql.legacy.pathOptionBehavior.enabled',
'true').getOrCreate())
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()
job.init(args['JOB_NAME'], args)
db_name = "hudidb"
table_name = "customers"
path = "s3://soumilshah-hudi-demos/silver/table_name=customers/"
query_show_commits = f"call show_commits('{db_name}.{table_name}', 5)"
spark_df_commits = spark.sql(query_show_commits)
commits = list(map(lambda row: row[0], spark_df_commits.collect()))
spark_df_commits.show()
try:
print("Trying clustering 1..")
query_show_clustering = f"call run_clustering('{db_name}.{table_name}')"
spark_df_clusterings = spark.sql(query_show_clustering)
spark_df_clusterings.show()
print(" clustering 1 complete ")
except Exception as e:
print("Error 1", e)
try:
print("Try show clustering 2")
query = f"call show_clustering('{db_name}.{table_name}')"
result_df = spark.sql(query)
result_df.show()
print("Complete show clustering 2 ")
except Exception as e:
print("Error show clustering 2", e)
try:
print("Try show clustering 1 ")
query = f"call show_clustering('{path}')"
result_df = spark.sql(query)
result_df.show()
print("Complete show clustering 1 ")
except Exception as e:
print("Error show clustering 1", e)
```
# Output
```
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|
commit_time|total_bytes_written|total_files_added|total_files_updated|total_partitions_written|total_records_written|total_update_records_written|total_errors|
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|20230609120910005| 13079729| 30|
0| 30| 72| 0|
0|
|20230607130458189| 0| 0|
0| 0| 0| 0|
0|
|20230607125355320| 18749045| 43|
0| 43| 100| 0|
0|
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
+-----------------+----------------+---------+-------------------+
| timestamp|input_group_size| state|involved_partitions|
+-----------------+----------------+---------+-------------------+
|20230609121906926| 30|COMPLETED| *|
+-----------------+----------------+---------+-------------------+
```
# Output of show clustering
```
An error occurred while calling o91.sql.
: java.util.NoSuchElementException: No value present in Option
at org.apache.hudi.common.util.Option.get(Option.java:89)
at
org.apache.spark.sql.hudi.command.procedures.ShowClusteringProcedure.$anonfun$call$5(ShowClusteringProcedure.scala:79)
at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
at scala.collection.immutable.Stream.length(Stream.scala:312)
at scala.collection.SeqLike.size(SeqLike.scala:108)
at scala.collection.SeqLike.size$(SeqLike.scala:108)
at scala.collection.AbstractSeq.size(Seq.scala:45)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:341)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at scala.collection.AbstractTraversable.toArray(Traversable.scala:108)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConne
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]