soumilshah1995 commented on issue #10110:
URL: https://github.com/apache/hudi/issues/10110#issuecomment-2023493106
Here is Full code
```
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType,
TimestampType, FloatType
from datetime import datetime
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType,
TimestampType, FloatType
from datetime import datetime
import os
import sys
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages
org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION}
pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions',
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
data = [
[1695159649, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A',
'driver-K', 19.10, 'san_francisco'],
[1695159649, 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C',
'driver-M', 27.70, 'san_francisco'],
]
# Define schema for the DataFrame
schema = StructType([
StructField("ts", StringType(), True),
StructField("transaction_id", StringType(), True),
StructField("rider", StringType(), True),
StructField("driver", StringType(), True),
StructField("price", FloatType(), True),
StructField("location", StringType(), True),
])
# Create Spark DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show()
path = 'file:///Users/soumilshah/Desktop/hudidemo/'
hudi_options = {
'hoodie.table.name': 'hudi_table_func_index',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'transaction_id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.table.metadata.enable': 'true',
'hoodie.datasource.write.partitionpath.field': 'location',
'hoodie.parquet.small.file.limit':'0'
}
df.write.format("hudi").options(**hudi_options).option("path",
path).mode("append").saveAsTable("table_name")
spark.read.format("hudi").load(path)
TABLE_NAME = "table_name"
spark.sql(f"""SELECT from_unixtime(ts, 'yyyy-MM-dd') as datestr FROM
{TABLE_NAME}""").show()
"""
+----------+
| datestr|
+----------+
|2023-09-19|
|2023-09-19|
+----------+
"""
query = """
CREATE INDEX hudi_table_func_index_datestr
ON table_name
USING column_stats(ts)
OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
"""
spark.sql(query)
```
# Error
```
24/03/27 14:14:10 WARN ScheduleIndexActionExecutor: Following partitions
already exist or inflight: [files]. Going to schedule indexing of only these
partitions: [func_index_]
24/03/27 14:14:10 ERROR HoodieBackedTableMetadataWriter: Bootstrap on
func_index_ partition failed for
file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata
org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not
exist:
file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet.
at
org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
at
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[14], line 8
1 query = """
2 CREATE INDEX hudi_table_func_index_datestr
3 ON table_name
4 USING column_stats(ts)
5 OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
6 """
----> 8 spark.sql(query)
File /opt/homebrew/lib/python3.11/site-packages/pyspark/sql/session.py:1440,
in SparkSession.sql(self, sqlQuery, args, **kwargs)
1438 try:
1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or
{}).items()}
-> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs),
self)
1441 finally:
1442 if len(kwargs) > 0:
File /opt/homebrew/lib/python3.11/site-packages/py4j/java_gateway.py:1322,
in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File
/opt/homebrew/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:169,
in capture_sql_exception.<locals>.deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)
File /opt/homebrew/lib/python3.11/site-packages/py4j/protocol.py:326, in
get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o34.sql.
: org.apache.hudi.exception.HoodieMetadataException: Failed to index
partition [func_index_hudi_table_func_index_datestr]
at
org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:179)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.index(HoodieSparkCopyOnWriteTable.java:308)
at
org.apache.hudi.client.BaseHoodieWriteClient.index(BaseHoodieWriteClient.java:1003)
at
org.apache.hudi.HoodieSparkFunctionalIndexClient.create(HoodieSparkFunctionalIndexClient.java:100)
at
org.apache.spark.sql.hudi.command.CreateIndexCommand.run(IndexCommands.scala:53)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hudi.exception.HoodieMetadataException: func_index_
bootstrap failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata
at
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:453)
at
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:293)
at
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:192)
at
org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.<init>(SparkHoodieBackedTableMetadataWriter.java:109)
at
org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:95)
at
org.apache.hudi.table.HoodieSparkTable.getMetadataWriter(HoodieSparkTable.java:103)
at
org.apache.hudi.table.HoodieTable.getIndexingMetadataWriter(HoodieTable.java:915)
at
org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:147)
... 47 more
Caused by: org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path
does not exist:
file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet.
at
org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419)
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
at
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]