[
https://issues.apache.org/jira/browse/SPARK-26019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692722#comment-16692722
]
Liang-Chi Hsieh commented on SPARK-26019:
-----------------------------------------
{{TCPServer}} begins to process requests only after we call its
{{serve_forever}} method. When the server goes to process a request, it will
instantiate {{RequestHandlerClass}}:
https://github.com/python/cpython/blob/2.7/Lib/SocketServer.py#L332
As {{RequestHandlerClass}} in {{AccumulatorServer}} is
{{_UpdateRequestHandler}}, at the moment, {{_UpdateRequestHandler.handle}} is
called and to access {{auth_token}}.
All of above should be happened after we call {{serve_forever}} in
{{_start_update_server}}:
{code}
def _start_update_server(auth_token):
"""Start a TCP server to receive accumulator updates in a daemon thread,
and returns it"""
server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler,
auth_token)
thread = threading.Thread(target=server.serve_forever)
{code}
And I think that we already finish instantiating {{AccumulatorServer}} and
{{self.auth_token = auth_token}} is run.
> pyspark/accumulators.py: "TypeError: object of type 'NoneType' has no len()"
> in authenticate_and_accum_updates()
> ----------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-26019
> URL: https://issues.apache.org/jira/browse/SPARK-26019
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.3.2, 2.4.0
> Reporter: Ruslan Dautkhanov
> Priority: Major
>
> Started happening after 2.3.1 -> 2.3.2 upgrade.
>
> {code:python}
> Exception happened during processing of request from ('127.0.0.1', 43418)
> ----------------------------------------
> Traceback (most recent call last):
> File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line
> 290, in _handle_request_noblock
> self.process_request(request, client_address)
> File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line
> 318, in process_request
> self.finish_request(request, client_address)
> File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line
> 331, in finish_request
> self.RequestHandlerClass(request, client_address, self)
> File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line
> 652, in __init__
> self.handle()
> File
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py",
> line 263, in handle
> poll(authenticate_and_accum_updates)
> File
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py",
> line 238, in poll
> if func():
> File
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py",
> line 251, in authenticate_and_accum_updates
> received_token = self.rfile.read(len(auth_token))
> TypeError: object of type 'NoneType' has no len()
>
> {code}
>
> Error happens here:
> https://github.com/apache/spark/blob/cb90617f894fd51a092710271823ec7d1cd3a668/python/pyspark/accumulators.py#L254
> The PySpark code was just running a simple pipeline of
> binary_rdd = sc.binaryRecords(full_file_path, record_length).map(lambda .. )
> and then converting it to a dataframe and running a count on it.
> It seems error is flaky - on next rerun it didn't happen.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]