kyoungrok0517 opened a new issue, #37057: URL: https://github.com/apache/arrow/issues/37057
### Describe the usage question you have. Please include as many useful details as possible. Hello. I'm trying to interact with HDFS storage from a driver and workers of pyspark YARN cluster. Precisely I'm using huggingface's `datasets` ([link](https://github.com/huggingface/datasets)) library that relies on pyarrow to communicate with HDFS. Below is the error I'm encountering. Note that I've masked sensitive paths. My code is sent to worker containers (docker) from driver container then executed. I confirmed that in both driver and worker images I can connect to HDFS using pyarrow since the envs are properly set, but strangely that becomes impossible when the same image runs in worker nodes. These are some peculiarities in my environment that might caused this issue. * **Cluster requires kerberos authentication** * But I think the error message implies that's not the problem in this case * **The user that actually runs the worker process is different from that built the docker image** * To avoid permission-related issues I made all directories that are accessed from the script accessible to everyone * **Pyspark part of my code has no problem interacting with HDFS.** Even pyarrow doesn't experience problem when I run the code in interactive session of the same docker image. The problem occurs only when it runs as cluster's worker runtime. Hope I could get some help. Thanks. ```bash 2023-08-08 18:51:19,638 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2023-08-08 18:51:20,280 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 23/08/08 18:51:22 WARN TaskSetManager: Lost task 0.0 in stage 142.0 (TID 9732) (ac3bax2062.bdp.bdata.ai executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000003/pyspark.zip/pyspark/worker.py", line 830, in main process() File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000003/pyspark.zip/pyspark/worker.py", line 820, in process out_iter = func(split_index, iterator) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/root/spark/python/pyspark/rdd.py", line 5405, in pipeline_func File "/root/spark/python/pyspark/rdd.py", line 828, in func File "/opt/conda/lib/python3.11/site-packages/datasets/packaged_modules/spark/spark.py", line 130, in create_cache_and_write_probe open(probe_file, "a") File "/opt/conda/lib/python3.11/site-packages/datasets/streaming.py", line 74, in wrapper return function(*args, download_config=download_config, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/datasets/download/streaming_download_manager.py", line 496, in xopen file_obj = fsspec.open(file, mode=mode, *args, **kwargs).open() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 439, in open out = open_files( ^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 282, in open_files fs, fs_token, paths = get_fs_token_paths( ^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 609, in get_fs_token_paths fs = filesystem(protocol, **inkwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/registry.py", line 267, in filesystem return cls(**storage_options) ^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 79, in __call__ obj = super().__call__(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/implementations/arrow.py", line 278, in __init__ fs = HadoopFileSystem( ^^^^^^^^^^^^^^^^^ File "pyarrow/_hdfs.pyx", line 96, in pyarrow._hdfs.HadoopFileSystem.__init__ File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status OSError: HDFS connection failed at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 23/08/08 18:51:24 WARN TaskSetManager: Lost task 0.1 in stage 142.0 (TID 9733) (ac3iax2079.bdp.bdata.ai executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000005/pyspark.zip/pyspark/worker.py", line 830, in main process() File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000005/pyspark.zip/pyspark/worker.py", line 820, in process out_iter = func(split_index, iterator) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/root/spark/python/pyspark/rdd.py", line 5405, in pipeline_func File "/root/spark/python/pyspark/rdd.py", line 828, in func File "/opt/conda/lib/python3.11/site-packages/datasets/packaged_modules/spark/spark.py", line 130, in create_cache_and_write_probe open(probe_file, "a") File "/opt/conda/lib/python3.11/site-packages/datasets/streaming.py", line 74, in wrapper return function(*args, download_config=download_config, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/datasets/download/streaming_download_manager.py", line 496, in xopen file_obj = fsspec.open(file, mode=mode, *args, **kwargs).open() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 439, in open out = open_files( ^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 282, in open_files fs, fs_token, paths = get_fs_token_paths( ^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 609, in get_fs_token_paths fs = filesystem(protocol, **inkwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/registry.py", line 267, in filesystem return cls(**storage_options) ^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 79, in __call__ obj = super().__call__(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/implementations/arrow.py", line 278, in __init__ fs = HadoopFileSystem( ^^^^^^^^^^^^^^^^^ File "pyarrow/_hdfs.pyx", line 96, in pyarrow._hdfs.HadoopFileSystem.__init__ File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status OSError: HDFS connection failed at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 23/08/08 18:51:38 WARN TaskSetManager: Lost task 0.2 in stage 142.0 (TID 9734) (<MASKED> executor 4): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000008/pyspark.zip/pyspark/worker.py", line 830, in main process() File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000008/pyspark.zip/pyspark/worker.py", line 820, in process out_iter = func(split_index, iterator) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/root/spark/python/pyspark/rdd.py", line 5405, in pipeline_func File "/root/spark/python/pyspark/rdd.py", line 828, in func File "/opt/conda/lib/python3.11/site-packages/datasets/packaged_modules/spark/spark.py", line 130, in create_cache_and_write_probe open(probe_file, "a") File "/opt/conda/lib/python3.11/site-packages/datasets/streaming.py", line 74, in wrapper return function(*args, download_config=download_config, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/datasets/download/streaming_download_manager.py", line 496, in xopen file_obj = fsspec.open(file, mode=mode, *args, **kwargs).open() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 439, in open out = open_files( ^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 282, in open_files fs, fs_token, paths = get_fs_token_paths( ^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 609, in get_fs_token_paths fs = filesystem(protocol, **inkwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/registry.py", line 267, in filesystem return cls(**storage_options) ^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 79, in __call__ obj = super().__call__(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/fsspec/implementations/arrow.py", line 278, in __init__ fs = HadoopFileSystem( ^^^^^^^^^^^^^^^^^ File "pyarrow/_hdfs.pyx", line 96, in pyarrow._hdfs.HadoopFileSystem.__init__ File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status OSError: HDFS connection failed at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ### Component(s) Integration, Parquet, Python -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
