Dmitry Kravchuk created ARROW-10957:
---------------------------------------

             Summary: Expanding pyarrow buffer size more than 2GB for 
pandas_udf functions
                 Key: ARROW-10957
                 URL: https://issues.apache.org/jira/browse/ARROW-10957
             Project: Apache Arrow
          Issue Type: Improvement
          Components: C++, Java, Python
    Affects Versions: 2.0.0
         Environment: Spark: 2.4.4

Python:
Dcycler (0.10.0)
glmnet-py (0.1.0b2)
joblib (1.0.0)
kiwisolver (1.3.1)
lightgbm (3.1.1) EPRECATION
matplotlib (3.0.3)
numpy (1.19.4)
pandas (1.1.5)
pip (9.0.3: The default format will switch to columns in the future. You can)
pyarrow 2.0.0
pyparsing (2.4.7) use --format=(legacy|columns) (or define a 
format=(python-dateutil (2.8.1)
pytz (202legacy|columns) in yo0.4)
scikit-learn (0.23.2)
scipy (1.5.4)
setuptools (51.0.0) ur pip.conf under the [list] section) to disable this 
warnsix (1.15.0)
sklearn (0.0)
threadpoolctl (2.1.0)
venv-paing. ck (0.2.0)
wheel (0.36.2)
            Reporter: Dmitry Kravchuk
             Fix For: 2.0.1


There is 2GB limit for data that can be passed to any pandas_udf function and 
the aim of this issue is to expand this limit. It's very small buffer size if 
we use pyspark and our goal is fitting machine learning models.

Steps to reproduce - just use following spark-submit for executing following 
after python function.

{code:java}
%sh
cd /home/zeppelin/code && \
export PYSPARK_DRIVER_PYTHON=/home/zeppelin/envs/env3/bin/python && \
export PYSPARK_PYTHON=./env3/bin/python && \
export ARROW_PRE_0_15_IPC_FORMAT=1 && \
spark-submit \
--master yarn \
--deploy-mode client \
--num-executors 5 \
--executor-cores 5 \
--driver-memory 8G \
--executor-memory 8G \
--conf spark.executor.memoryOverhead=4G \
--conf spark.driver.memoryOverhead=4G \
--archives /home/zeppelin/env3.tar.gz#env3 \
--jars "/opt/deltalake/delta-core_2.11-0.5.0.jar" \
--py-files jobs.zip,"/opt/deltalake/delta-core_2.11-0.5.0.jar" main.py \
--job temp
{code}
 
{code:java|title=Bar.Python|borderStyle=solid}
import pyspark
from pyspark.sql import functions as F, types as T
import pandas as pd

def analyze(spark):

    pdf1 = pd.DataFrame(
        [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
        columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
    )
    df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
range(429)]).reset_index()).drop('index')

    pdf2 = pd.DataFrame(
        [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
"abcdefghijklmno"]],
        columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
    )
    df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
range(48993)]).reset_index()).drop('index')
    df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')

    def myudf(df):
        import os
        os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
        return df

    df4 = df3 \
        .withColumn('df1_c1', F.col('df1_c1').cast(T.IntegerType())) \
        .withColumn('df1_c2', F.col('df1_c2').cast(T.DoubleType())) \
        .withColumn('df1_c3', F.col('df1_c3').cast(T.StringType())) \
        .withColumn('df1_c4', F.col('df1_c4').cast(T.StringType())) \
        .withColumn('df2_c1', F.col('df2_c1').cast(T.IntegerType())) \
        .withColumn('df2_c2', F.col('df2_c2').cast(T.DoubleType())) \
        .withColumn('df2_c3', F.col('df2_c3').cast(T.StringType())) \
        .withColumn('df2_c4', F.col('df2_c4').cast(T.StringType())) \
        .withColumn('df2_c5', F.col('df2_c5').cast(T.StringType())) \
        .withColumn('df2_c6', F.col('df2_c6').cast(T.StringType()))
    print(df4.printSchema())

    udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)

    df5 = df4.groupBy('df1_c1').apply(udf)
    print('df5.count()', df5.count())
{code}

If you need more details please let me know.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to