[ https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuanjian Li updated SPARK-26549: -------------------------------- Description: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because the specialize rdd.parallelize for xrange generated data by xrange, which don't need the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse take no effect. Relative code block list below: Current specialize logic of xrange don't need the passed-in iterator, context.py: {code:java} if isinstance(c, xrange): ... def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) ... return self.parallelize([], numSlices).mapPartitionsWithIndex(f) {code} We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py: {code:java} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code:java} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} was: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because the specialize logic generated data by xrange, which don't need the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse take no effect. Relative code block list below: Current specialize logic of xrange don't need the passed-in iterator, context.py: {code} if isinstance(c, xrange): ... def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) ... return self.parallelize([], numSlices).mapPartitionsWithIndex(f) {code} We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py: {code} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} > PySpark worker reuse take no effect for parallelize xrange > ---------------------------------------------------------- > > Key: SPARK-26549 > URL: https://issues.apache.org/jira/browse/SPARK-26549 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.0.0 > Reporter: Yuanjian Li > Priority: Major > > During [the follow-up > work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for > PySpark worker reuse scenario, we found that the worker reuse takes no effect > for `sc.parallelize(xrange(...))`. > It happened because the specialize rdd.parallelize for xrange generated data > by xrange, which don't need the passed-in iterator. But this will break the > end of stream checking in python worker and finally cause worker reuse take > no effect. > Relative code block list below: > Current specialize logic of xrange don't need the passed-in iterator, > context.py: > {code:java} > if isinstance(c, xrange): > ... > def f(split, iterator): > return xrange(getStart(split), getStart(split + 1), step) > ... > return self.parallelize([], numSlices).mapPartitionsWithIndex(f) > {code} > We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check > end of stream. See the code in worker.py: > {code:java} > # check end of stream > if read_int(infile) == SpecialLengths.END_OF_STREAM: > write_int(SpecialLengths.END_OF_STREAM, outfile) > else: > # write a different value to tell JVM to not reuse this worker > write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) > sys.exit(-1) > {code} > The code works well for parallelize(range) because the END_OF_DATA_SECTION > has been handled during load iterator from the socket stream, see the code in > FramedSerializer: > {code:java} > def load_stream(self, stream): > while True: > try: > yield self._read_with_length(stream) > except EOFError: > return > ... > def _read_with_length(self, stream): > length = read_int(stream) > if length == SpecialLengths.END_OF_DATA_SECTION: > raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in > load_stream > elif length == SpecialLengths.NULL: > return None > obj = stream.read(length) > if len(obj) < length: > raise EOFError > return self.loads(obj) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org