serhii-donetskyi-datacubed opened a new issue, #12333:
URL: https://github.com/apache/hudi/issues/12333

   **Describe the problem you faced**
   
   When doing inline clustering, if table's path contains comma, it truncates 
path (up to the first comma) and fails, since no such directory exists.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Run the script:
   
   ```
   import sys
   import boto3
   
   from pyspark.sql import SparkSession
   from pyspark.context import SparkContext
   import pyspark.sql.functions as F
   
   spark = SparkSession.builder \
       .config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", 
"CORRECTED") \
       .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", 
"CORRECTED") \
       .config("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED") \
       .config("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED") \
       .config('spark.sql.adaptive.enabled', 'true') \
       .config('spark.sql.adaptive.coalescePartitions.enabled', 'true') \
       .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
       .config('spark.sql.extensions', 
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
       .getOrCreate()
   
   hudi_options = {
       'hoodie.database.name': 'test',
       'hoodie.table.name': 'test',
       'hoodie.enable.data.skipping': 'true',
       'hoodie.datasource.write.recordkey.field': 'id1,id2',
       'hoodie.datasource.write.precombine.field': 'ts',
       'hoodie.datasource.write.reconcile.schema': 'false',
       'hoodie.datasource.write.schema.allow.auto.evolution.column.drop': 
'true',
       'hoodie.datasource.write.operation': 'insert',
   
       # 'hoodie.datasource.hive_sync.enable': 'true',
       # 'hoodie.datasource.hive_sync.use_jdbc': 'false',
       # 'hoodie.datasource.hive_sync.database': curated_db,
       # 'hoodie.datasource.hive_sync.table': curated_table,
       # 'hoodie.datasource.hive_sync.support_timestamp': 'true',
       # 'hoodie.datasource.hive_sync.mode': 'hms',
   
       'hoodie.parquet.outputtimestamptype': 'TIMESTAMP_MICROS',
       # 'hoodie.index.type': 'RECORD_INDEX',
   
       'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
       'hoodie.cleaner.commits.retained': 10,
   
       'hoodie.parquet.small.file.limit': 0,
       'hoodie.clustering.inline': 'true',
       'hoodie.clustering.inline.max.commits': 2,
   
       'format': 'hudi',
       'mode': 'append',
       'path': '/tmp/test/id1,id2/'
   }
   
   df = spark.sql('select 1 as id1, 1 as id2, now() as ts, 99 as col')
   
   for _ in range(2):
       df.write.save(**hudi_options)
   ```
   
   **Expected behavior**
   
   Data is appended to hudi table and clustering service is executed without 
any errors.
   
   **Environment Description**
   Using [EMR 
7.3.0](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-730-release.html),
 which has Hudi version 0.15.0
   
   * Hudi version : 0.15.0
   
   * Spark version : 3.5.1
   
   * Hive version : 3.1.3
   
   * Hadoop version : 3.3.5
   
   * Storage (HDFS/S3/GCS..) : ALL
   
   * Running on Docker? (yes/no) : no
   
   **Stacktrace**
   
   
   ```
   ---------------------------------------------------------------------------
   Py4JJavaError                             Traceback (most recent call last)
   Cell In[3], line 2
         1 for _ in range(2):
   ----> 2     df.write.save(**hudi_options)
   
   File /usr/local/lib/python3.9/dist-packages/pyspark/sql/readwriter.py:1463, 
in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
      1461     self._jwrite.save()
      1462 else:
   -> 1463     self._jwrite.save(path)
   
   File /usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py:1322, in 
JavaMember.__call__(self, *args)
      1316 command = proto.CALL_COMMAND_NAME +\
      1317     self.command_header +\
      1318     args_command +\
      1319     proto.END_COMMAND_PART
      1321 answer = self.gateway_client.send_command(command)
   -> 1322 return_value = get_return_value(
      1323     answer, self.gateway_client, self.target_id, self.name)
      1325 for temp_arg in temp_args:
      1326     if hasattr(temp_arg, "_detach"):
   
   File 
/usr/local/lib/python3.9/dist-packages/pyspark/errors/exceptions/captured.py:179,
 in capture_sql_exception.<locals>.deco(*a, **kw)
       177 def deco(*a: Any, **kw: Any) -> Any:
       178     try:
   --> 179         return f(*a, **kw)
       180     except Py4JJavaError as e:
       181         converted = convert_exception(e.java_exception)
   
   File /usr/local/lib/python3.9/dist-packages/py4j/protocol.py:326, in 
get_return_value(answer, gateway_client, target_id, name)
       324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
       325 if answer[1] == REFERENCE_TYPE:
   --> 326     raise Py4JJavaError(
       327         "An error occurred while calling {0}{1}{2}.\n".
       328         format(target_id, ".", name), value)
       329 else:
       330     raise Py4JError(
       331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
       332         format(target_id, ".", name, value))
   
   Py4JJavaError: An error occurred while calling o59.save.
   : java.util.concurrent.CompletionException: 
org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: 
file:/tmp/test/id1.
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path 
does not exist: file:/tmp/test/id1.
        at 
org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1500)
        at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
        at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
        at 
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:384)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at 
java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to