Philippe Prados created SPARK-36600:
---------------------------------------

             Summary: reduces memory consumption win Pyspark CreateDataFrame
                 Key: SPARK-36600
                 URL: https://issues.apache.org/jira/browse/SPARK-36600
             Project: Spark
          Issue Type: Improvement
          Components: PySpark
    Affects Versions: 3.1.2
            Reporter: Philippe Prados


The Python method {{SparkSession._createFromLocal()}} start to the data, and 
create a list if it's not an instance of list. But it is necessary only if the 
scheme is not present.
{quote}# make sure data could consumed multiple times
 if not isinstance(data, list):
  data = list(data)
{quote}
If you use {{createDataFrame(data=_a_generator_,...)}}, all the datas were save 
in memory in a list, then convert to a row in memory, then convert to buffer in 
pickle format, etc.

Two lists were present at the same time in memory. The list created by 
_createFromLocal() and the list created later with
{quote}# convert python objects to sql data
data = [schema.toInternal(row) for row in data]
{quote}
The purpose of using a generator is to reduce the memory footprint when the 
data are dynamically build.
{quote}def _createFromLocal(self, data, schema):
  """
  Create an RDD for DataFrame from a list or pandas.DataFrame, returns
  the RDD and schema.
  """

  if schema is None or isinstance(schema, (list, tuple)):
    *# make sure data could consumed multiple times*
    *if inspect.isgeneratorfunction(data):* 
      *data = list(data)*
    struct = self._inferSchemaFromList(data, names=schema)
    converter = _create_converter(struct)
    data = map(converter, data)
    if isinstance(schema, (list, tuple)):
      for i, name in enumerate(schema):
        struct.fields[i].name = name
        struct.names[i] = name
      schema = struct

    elif not isinstance(schema, StructType):
      raise TypeError("schema should be StructType or list or None, but got: 
%s" % schema)

  # convert python objects to sql data
  data = [schema.toInternal(row) for row in data]
  return self._sc.parallelize(data), schema{quote}
Then, it is interesting to use a generator.

 
{quote}The patch:

diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 57c680fd04..0dba590451 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import inspect
 import sys
 import warnings
 from functools import reduce
@@ -504,11 +505,11 @@ class SparkSession(SparkConversionMixin):
 Create an RDD for DataFrame from a list or pandas.DataFrame, returns
 the RDD and schema.
 """
- # make sure data could consumed multiple times
- if not isinstance(data, list):
- data = list(data)
 
 if schema is None or isinstance(schema, (list, tuple)):
+ # make sure data could consumed multiple times
+ if inspect.isgeneratorfunction(data): # PPR
+ data = list(data)
 struct = self._inferSchemaFromList(data, names=schema)
 converter = _create_converter(struct)
 data = map(converter, data)
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to