kyoungrok0517 opened a new issue, #37057:
URL: https://github.com/apache/arrow/issues/37057

   ### Describe the usage question you have. Please include as many useful 
details as  possible.
   
   
   Hello. I'm trying to interact with HDFS storage from a driver and workers of 
pyspark YARN cluster. Precisely I'm using huggingface's `datasets` 
([link](https://github.com/huggingface/datasets)) library that relies on 
pyarrow to communicate with HDFS. 
   
   Below is the error I'm encountering. Note that I've masked sensitive paths. 
My code is sent to worker containers (docker) from driver container then 
executed. I confirmed that in both driver and worker images I can connect to 
HDFS using pyarrow since the envs are properly set, but strangely that becomes 
impossible when the same image runs in worker nodes. 
   
   These are some peculiarities in my environment that might caused this issue. 
   * **Cluster requires kerberos authentication**
     * But I think the error message implies that's not the problem in this case
   * **The user that actually runs the worker process is different from that 
built the docker image**
     * To avoid permission-related issues I made all directories that are 
accessed from the script accessible to everyone
   * **Pyspark part of my code has no problem interacting with HDFS.** Even 
pyarrow doesn't experience problem when I run the code in interactive session 
of the same docker image. The problem occurs only when it runs as cluster's 
worker runtime.
   
   Hope I could get some help. Thanks.
   
   ```bash
   2023-08-08 18:51:19,638 WARN util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
   2023-08-08 18:51:20,280 WARN shortcircuit.DomainSocketFactory: The 
short-circuit local reads feature cannot be used because libhadoop cannot be 
loaded.
   23/08/08 18:51:22 WARN TaskSetManager: Lost task 0.0 in stage 142.0 (TID 
9732) (ac3bax2062.bdp.bdata.ai executor 1): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
     File 
"<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000003/pyspark.zip/pyspark/worker.py",
 line 830, in main
       process()
     File 
"<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000003/pyspark.zip/pyspark/worker.py",
 line 820, in process
       out_iter = func(split_index, iterator)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/root/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
     File "/root/spark/python/pyspark/rdd.py", line 828, in func
     File 
"/opt/conda/lib/python3.11/site-packages/datasets/packaged_modules/spark/spark.py",
 line 130, in create_cache_and_write_probe
       open(probe_file, "a")
     File "/opt/conda/lib/python3.11/site-packages/datasets/streaming.py", line 
74, in wrapper
       return function(*args, download_config=download_config, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/conda/lib/python3.11/site-packages/datasets/download/streaming_download_manager.py",
 line 496, in xopen
       file_obj = fsspec.open(file, mode=mode, *args, **kwargs).open()
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 439, 
in open
       out = open_files(
             ^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 282, 
in open_files
       fs, fs_token, paths = get_fs_token_paths(
                             ^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 609, 
in get_fs_token_paths
       fs = filesystem(protocol, **inkwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/registry.py", line 
267, in filesystem
       return cls(**storage_options)
              ^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 79, in 
__call__
       obj = super().__call__(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/conda/lib/python3.11/site-packages/fsspec/implementations/arrow.py", line 
278, in __init__
       fs = HadoopFileSystem(
            ^^^^^^^^^^^^^^^^^
     File "pyarrow/_hdfs.pyx", line 96, in 
pyarrow._hdfs.HadoopFileSystem.__init__
     File "pyarrow/error.pxi", line 144, in 
pyarrow.lib.pyarrow_internal_check_status
     File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
   OSError: HDFS connection failed
   
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
        at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
        at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
        at 
org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
        at 
org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
        at 
org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
   
   23/08/08 18:51:24 WARN TaskSetManager: Lost task 0.1 in stage 142.0 (TID 
9733) (ac3iax2079.bdp.bdata.ai executor 2): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
     File 
"<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000005/pyspark.zip/pyspark/worker.py",
 line 830, in main
       process()
     File 
"<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000005/pyspark.zip/pyspark/worker.py",
 line 820, in process
       out_iter = func(split_index, iterator)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/root/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
     File "/root/spark/python/pyspark/rdd.py", line 828, in func
     File 
"/opt/conda/lib/python3.11/site-packages/datasets/packaged_modules/spark/spark.py",
 line 130, in create_cache_and_write_probe
       open(probe_file, "a")
     File "/opt/conda/lib/python3.11/site-packages/datasets/streaming.py", line 
74, in wrapper
       return function(*args, download_config=download_config, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/conda/lib/python3.11/site-packages/datasets/download/streaming_download_manager.py",
 line 496, in xopen
       file_obj = fsspec.open(file, mode=mode, *args, **kwargs).open()
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 439, 
in open
       out = open_files(
             ^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 282, 
in open_files
       fs, fs_token, paths = get_fs_token_paths(
                             ^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 609, 
in get_fs_token_paths
       fs = filesystem(protocol, **inkwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/registry.py", line 
267, in filesystem
       return cls(**storage_options)
              ^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 79, in 
__call__
       obj = super().__call__(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/conda/lib/python3.11/site-packages/fsspec/implementations/arrow.py", line 
278, in __init__
       fs = HadoopFileSystem(
            ^^^^^^^^^^^^^^^^^
     File "pyarrow/_hdfs.pyx", line 96, in 
pyarrow._hdfs.HadoopFileSystem.__init__
     File "pyarrow/error.pxi", line 144, in 
pyarrow.lib.pyarrow_internal_check_status
     File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
   OSError: HDFS connection failed
   
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
        at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
        at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
        at 
org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
        at 
org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
        at 
org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
   
   23/08/08 18:51:38 WARN TaskSetManager: Lost task 0.2 in stage 142.0 (TID 
9734) (<MASKED> executor 4): org.apache.spark.api.python.PythonException: 
Traceback (most recent call last):
     File 
"<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000008/pyspark.zip/pyspark/worker.py",
 line 830, in main
       process()
     File 
"<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000008/pyspark.zip/pyspark/worker.py",
 line 820, in process
       out_iter = func(split_index, iterator)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/root/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
     File "/root/spark/python/pyspark/rdd.py", line 828, in func
     File 
"/opt/conda/lib/python3.11/site-packages/datasets/packaged_modules/spark/spark.py",
 line 130, in create_cache_and_write_probe
       open(probe_file, "a")
     File "/opt/conda/lib/python3.11/site-packages/datasets/streaming.py", line 
74, in wrapper
       return function(*args, download_config=download_config, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/conda/lib/python3.11/site-packages/datasets/download/streaming_download_manager.py",
 line 496, in xopen
       file_obj = fsspec.open(file, mode=mode, *args, **kwargs).open()
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 439, 
in open
       out = open_files(
             ^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 282, 
in open_files
       fs, fs_token, paths = get_fs_token_paths(
                             ^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 609, 
in get_fs_token_paths
       fs = filesystem(protocol, **inkwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/registry.py", line 
267, in filesystem
       return cls(**storage_options)
              ^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 79, in 
__call__
       obj = super().__call__(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/conda/lib/python3.11/site-packages/fsspec/implementations/arrow.py", line 
278, in __init__
       fs = HadoopFileSystem(
            ^^^^^^^^^^^^^^^^^
     File "pyarrow/_hdfs.pyx", line 96, in 
pyarrow._hdfs.HadoopFileSystem.__init__
     File "pyarrow/error.pxi", line 144, in 
pyarrow.lib.pyarrow_internal_check_status
     File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
   OSError: HDFS connection failed
   
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
        at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
        at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
        at 
org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
        at 
org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
        at 
org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
   ```
   
   
   ### Component(s)
   
   Integration, Parquet, Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to