soumilshah1995 commented on issue #10110:
URL: https://github.com/apache/hudi/issues/10110#issuecomment-2023493106

   Here is Full code
   ```
   from pyspark.sql import SparkSession
   from pyspark.sql.types import StructType, StructField, StringType, 
TimestampType, FloatType
   from datetime import datetime
   import os
   import sys
   
   from pyspark.sql import SparkSession
   from pyspark.sql.types import StructType, StructField, StringType, 
TimestampType, FloatType
   from datetime import datetime
   import os
   import sys
   
   
   HUDI_VERSION = '1.0.0-beta1'
   SPARK_VERSION = '3.4'
   
   os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
   SUBMIT_ARGS = f"--packages 
org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} 
pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   
   # Spark session
   spark = SparkSession.builder \
       .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
       .config('spark.sql.extensions', 
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   
   
   data = [
       [1695159649, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 
'driver-K', 19.10, 'san_francisco'],
       [1695159649, 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 
'driver-M', 27.70, 'san_francisco'],
   ]
   
   # Define schema for the DataFrame
   schema = StructType([
       StructField("ts", StringType(), True),
       StructField("transaction_id", StringType(), True),
       StructField("rider", StringType(), True),
       StructField("driver", StringType(), True),
       StructField("price", FloatType(), True),
       StructField("location", StringType(), True),
   ])
   
   # Create Spark DataFrame
   df = spark.createDataFrame(data, schema=schema)
   
   df.show()
   
   path = 'file:///Users/soumilshah/Desktop/hudidemo/'
   
   hudi_options = {
       'hoodie.table.name': 'hudi_table_func_index',
       'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
       'hoodie.datasource.write.operation': 'upsert',
       'hoodie.datasource.write.recordkey.field': 'transaction_id',
       'hoodie.datasource.write.precombine.field': 'ts',
       'hoodie.table.metadata.enable': 'true',
       'hoodie.datasource.write.partitionpath.field': 'location',
       'hoodie.parquet.small.file.limit':'0'
   }
   
   df.write.format("hudi").options(**hudi_options).option("path", 
path).mode("append").saveAsTable("table_name")
   spark.read.format("hudi").load(path)
   
   TABLE_NAME = "table_name"
   
   spark.sql(f"""SELECT from_unixtime(ts, 'yyyy-MM-dd') as datestr FROM 
{TABLE_NAME}""").show()
   
   """
   +----------+
   |   datestr|
   +----------+
   |2023-09-19|
   |2023-09-19|
   +----------+
   
   
   """
   
   query = """
   CREATE INDEX hudi_table_func_index_datestr 
   ON table_name 
   USING column_stats(ts) 
   OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
   """
   
   spark.sql(query)
   
   ```
   
   # Error 
   ```
   24/03/27 14:14:10 WARN ScheduleIndexActionExecutor: Following partitions 
already exist or inflight: [files]. Going to schedule indexing of only these 
partitions: [func_index_]
   24/03/27 14:14:10 ERROR HoodieBackedTableMetadataWriter: Bootstrap on 
func_index_ partition failed for 
file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata
   org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not 
exist: 
file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet.
        at 
org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
        at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
        at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
        at 
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at 
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
   ---------------------------------------------------------------------------
   Py4JJavaError                             Traceback (most recent call last)
   Cell In[14], line 8
         1 query = """
         2 CREATE INDEX hudi_table_func_index_datestr 
         3 ON table_name 
         4 USING column_stats(ts) 
         5 OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
         6 """
   ----> 8 spark.sql(query)
   
   File /opt/homebrew/lib/python3.11/site-packages/pyspark/sql/session.py:1440, 
in SparkSession.sql(self, sqlQuery, args, **kwargs)
      1438 try:
      1439     litArgs = {k: _to_java_column(lit(v)) for k, v in (args or 
{}).items()}
   -> 1440     return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), 
self)
      1441 finally:
      1442     if len(kwargs) > 0:
   
   File /opt/homebrew/lib/python3.11/site-packages/py4j/java_gateway.py:1322, 
in JavaMember.__call__(self, *args)
      1316 command = proto.CALL_COMMAND_NAME +\
      1317     self.command_header +\
      1318     args_command +\
      1319     proto.END_COMMAND_PART
      1321 answer = self.gateway_client.send_command(command)
   -> 1322 return_value = get_return_value(
      1323     answer, self.gateway_client, self.target_id, self.name)
      1325 for temp_arg in temp_args:
      1326     if hasattr(temp_arg, "_detach"):
   
   File 
/opt/homebrew/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:169,
 in capture_sql_exception.<locals>.deco(*a, **kw)
       167 def deco(*a: Any, **kw: Any) -> Any:
       168     try:
   --> 169         return f(*a, **kw)
       170     except Py4JJavaError as e:
       171         converted = convert_exception(e.java_exception)
   
   File /opt/homebrew/lib/python3.11/site-packages/py4j/protocol.py:326, in 
get_return_value(answer, gateway_client, target_id, name)
       324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
       325 if answer[1] == REFERENCE_TYPE:
   --> 326     raise Py4JJavaError(
       327         "An error occurred while calling {0}{1}{2}.\n".
       328         format(target_id, ".", name), value)
       329 else:
       330     raise Py4JError(
       331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
       332         format(target_id, ".", name, value))
   
   Py4JJavaError: An error occurred while calling o34.sql.
   : org.apache.hudi.exception.HoodieMetadataException: Failed to index 
partition [func_index_hudi_table_func_index_datestr]
        at 
org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:179)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.index(HoodieSparkCopyOnWriteTable.java:308)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.index(BaseHoodieWriteClient.java:1003)
        at 
org.apache.hudi.HoodieSparkFunctionalIndexClient.create(HoodieSparkFunctionalIndexClient.java:100)
        at 
org.apache.spark.sql.hudi.command.CreateIndexCommand.run(IndexCommands.scala:53)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.hudi.exception.HoodieMetadataException: func_index_ 
bootstrap failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:453)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:293)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:192)
        at 
org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.<init>(SparkHoodieBackedTableMetadataWriter.java:109)
        at 
org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:95)
        at 
org.apache.hudi.table.HoodieSparkTable.getMetadataWriter(HoodieSparkTable.java:103)
        at 
org.apache.hudi.table.HoodieTable.getIndexingMetadataWriter(HoodieTable.java:915)
        at 
org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:147)
        ... 47 more
   Caused by: org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path 
does not exist: 
file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet.
        at 
org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
        at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
        at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
        at 
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at 
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to