serhii-donetskyi-datacubed opened a new issue, #12333:
URL: https://github.com/apache/hudi/issues/12333
**Describe the problem you faced**
When doing inline clustering, if table's path contains comma, it truncates
path (up to the first comma) and fails, since no such directory exists.
**To Reproduce**
Steps to reproduce the behavior:
1. Run the script:
```
import sys
import boto3
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import pyspark.sql.functions as F
spark = SparkSession.builder \
.config("spark.sql.legacy.parquet.datetimeRebaseModeInRead",
"CORRECTED") \
.config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite",
"CORRECTED") \
.config("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED") \
.config("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED") \
.config('spark.sql.adaptive.enabled', 'true') \
.config('spark.sql.adaptive.coalescePartitions.enabled', 'true') \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions',
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.getOrCreate()
hudi_options = {
'hoodie.database.name': 'test',
'hoodie.table.name': 'test',
'hoodie.enable.data.skipping': 'true',
'hoodie.datasource.write.recordkey.field': 'id1,id2',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.datasource.write.reconcile.schema': 'false',
'hoodie.datasource.write.schema.allow.auto.evolution.column.drop':
'true',
'hoodie.datasource.write.operation': 'insert',
# 'hoodie.datasource.hive_sync.enable': 'true',
# 'hoodie.datasource.hive_sync.use_jdbc': 'false',
# 'hoodie.datasource.hive_sync.database': curated_db,
# 'hoodie.datasource.hive_sync.table': curated_table,
# 'hoodie.datasource.hive_sync.support_timestamp': 'true',
# 'hoodie.datasource.hive_sync.mode': 'hms',
'hoodie.parquet.outputtimestamptype': 'TIMESTAMP_MICROS',
# 'hoodie.index.type': 'RECORD_INDEX',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': 10,
'hoodie.parquet.small.file.limit': 0,
'hoodie.clustering.inline': 'true',
'hoodie.clustering.inline.max.commits': 2,
'format': 'hudi',
'mode': 'append',
'path': '/tmp/test/id1,id2/'
}
df = spark.sql('select 1 as id1, 1 as id2, now() as ts, 99 as col')
for _ in range(2):
df.write.save(**hudi_options)
```
**Expected behavior**
Data is appended to hudi table and clustering service is executed without
any errors.
**Environment Description**
Using [EMR
7.3.0](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-730-release.html),
which has Hudi version 0.15.0
* Hudi version : 0.15.0
* Spark version : 3.5.1
* Hive version : 3.1.3
* Hadoop version : 3.3.5
* Storage (HDFS/S3/GCS..) : ALL
* Running on Docker? (yes/no) : no
**Stacktrace**
```
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[3], line 2
1 for _ in range(2):
----> 2 df.write.save(**hudi_options)
File /usr/local/lib/python3.9/dist-packages/pyspark/sql/readwriter.py:1463,
in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
1461 self._jwrite.save()
1462 else:
-> 1463 self._jwrite.save(path)
File /usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py:1322, in
JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File
/usr/local/lib/python3.9/dist-packages/pyspark/errors/exceptions/captured.py:179,
in capture_sql_exception.<locals>.deco(*a, **kw)
177 def deco(*a: Any, **kw: Any) -> Any:
178 try:
--> 179 return f(*a, **kw)
180 except Py4JJavaError as e:
181 converted = convert_exception(e.java_exception)
File /usr/local/lib/python3.9/dist-packages/py4j/protocol.py:326, in
get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o59.save.
: java.util.concurrent.CompletionException:
org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist:
file:/tmp/test/id1.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path
does not exist: file:/tmp/test/id1.
at
org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1500)
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
at
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:384)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]