[ 
https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979049#comment-15979049
 ] 

Barry Becker edited comment on SPARK-20392 at 4/21/17 4:46 PM:
---------------------------------------------------------------

Yes [~kiszk], I was able to create a simple program that will allow you to 
reproduce this problem.
You will need the bockbuster.csv dataset and the parquet pipeline (which are 
both attached to this jira).
The pipeline will need to be unzipped, and there are 2 lines in the below 
program that will need to be updated so that they point to those 2 files on 
your computer. When I ran locally, I used spark "2.1.1-SNAPSHOT".

Here is my test case:
{code}
  /**
   * This test has much slower performance than expected.
   * The blockbuster dataset has only 312 rows but has 421 columns - tiny by 
most standards.
   * This test takes about 1 min 10 seconds on my 4 core laptop,
   * but I have seen similar slow performance on more powerful server
   */
  test("apply persisted parquet model pipeline to blockbuster dataset to get 
prediction on \"DAYPOP\"") {

    // first load the blockbuster data into dataframe
    sqlContext.sql("set spark.sql.caseSensitive=true")
    val blockbusterDf = sqlContext.read
      .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
      .option("header", "true") // Use first line as header
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .option("mode", "FAILFAST")
      .option("parserLib", "commons")
      .option("quote", "\"")
      .option("nullValue", "?")
      .option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss")
      .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
      .csv(sys.env("SPARK_DEV_HOME").replaceAll("\\\\", "/") + 
"/src/test/resources/data/blockbuster.csv") // update

    // simulate cleaning the data
    val cleanDf = cleanData(blockbusterDf)

    // load pipeline from disk (update path to point to unzipped pipeline)
    val pipeline: PipelineModel = 
PipelineModel.load(s"file:///$FILE_PREFIX/models/model_9754.class")
    println("pipeline = " + pipeline.stages.mkString(", "))

    // now apply the persisted parquet pipeline to it
    val startTime = System.currentTimeMillis()
    var newDf = pipeline.transform(cleanDf)
    newDf.show(5)
    println("time to apply the pipeline = " + (System.currentTimeMillis() - 
startTime))  // about 1 minute
  }

  /** normally I do a bunch of cleaning here, but blockbuster does not need it.
    * Basically just creates new *_CLEANED columns and makes int into doubles.
    */
  private def cleanData(data: DataFrame): DataFrame = {
    val CLEAN_SUFFIX = "_CLEANED__"
    val MISSING = StringToIndexTransformer.MISSING
    val NUMERIC_NULL = Double.NaN

    val origCols = data.schema.fields

    var newCols: Seq[Column] =
      origCols.map(column => {
          val colName = column.name
          if (column.dataType == StringType)
            col(colName).as(colName + CLEAN_SUFFIX)
          else col(colName).cast(DoubleType).as(colName + CLEAN_SUFFIX)
        })

    val colsToSelect = newCols ++ origCols.map(c => col(c.name))

    data.select(colsToSelect:_*)
  }
{code}


was (Author: barrybecker4):
Yes [~kiszk], I was able to create a simple program that will allow you to 
reproduce this problem.
You will need the bockbuster.csv dataset and the parquet pipeline (which are 
both attached to this jira).
The pipeline will need to be unzipped, and there are 2 lines in the below 
program that will need to be updated so that they point to those 2 files on 
your computer.

Here is my test case:
{code}
  /**
   * This test has much slower performance than expected.
   * The blockbuster dataset has only 312 rows but has 421 columns - tiny by 
most standards.
   * This test takes about 1 min 10 seconds on my 4 core laptop,
   * but I have seen similar slow performance on more powerful server
   */
  test("apply persisted parquet model pipeline to blockbuster dataset to get 
prediction on \"DAYPOP\"") {

    // first load the blockbuster data into dataframe
    sqlContext.sql("set spark.sql.caseSensitive=true")
    val blockbusterDf = sqlContext.read
      .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
      .option("header", "true") // Use first line as header
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .option("mode", "FAILFAST")
      .option("parserLib", "commons")
      .option("quote", "\"")
      .option("nullValue", "?")
      .option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss")
      .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
      .csv(sys.env("SPARK_DEV_HOME").replaceAll("\\\\", "/") + 
"/src/test/resources/data/blockbuster.csv") // update

    // simulate cleaning the data
    val cleanDf = cleanData(blockbusterDf)

    // load pipeline from disk (update path to point to unzipped pipeline)
    val pipeline: PipelineModel = 
PipelineModel.load(s"file:///$FILE_PREFIX/models/model_9754.class")
    println("pipeline = " + pipeline.stages.mkString(", "))

    // now apply the persisted parquet pipeline to it
    val startTime = System.currentTimeMillis()
    var newDf = pipeline.transform(cleanDf)
    newDf.show(5)
    println("time to apply the pipeline = " + (System.currentTimeMillis() - 
startTime))  // about 1 minute
  }

  /** normally I do a bunch of cleaning here, but blockbuster does not need it.
    * Basically just creates new *_CLEANED columns and makes int into doubles.
    */
  private def cleanData(data: DataFrame): DataFrame = {
    val CLEAN_SUFFIX = "_CLEANED__"
    val MISSING = StringToIndexTransformer.MISSING
    val NUMERIC_NULL = Double.NaN

    val origCols = data.schema.fields

    var newCols: Seq[Column] =
      origCols.map(column => {
          val colName = column.name
          if (column.dataType == StringType)
            col(colName).as(colName + CLEAN_SUFFIX)
          else col(colName).cast(DoubleType).as(colName + CLEAN_SUFFIX)
        })

    val colsToSelect = newCols ++ origCols.map(c => col(c.name))

    data.select(colsToSelect:_*)
  }
{code}

> Slow performance when calling fit on ML pipeline for dataset with many 
> columns but few rows
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-20392
>                 URL: https://issues.apache.org/jira/browse/SPARK-20392
>             Project: Spark
>          Issue Type: Bug
>          Components: ML
>    Affects Versions: 2.1.0
>            Reporter: Barry Becker
>         Attachments: blockbuster.csv, 
> giant_query_plan_for_fitting_pipeline.txt
>
>
> This started as a [question on stack 
> overflow|http://stackoverflow.com/questions/43484006/why-is-it-slow-to-apply-a-spark-pipeline-to-dataset-with-many-columns-but-few-ro],
>  but it seems like a bug.
> I am testing spark pipelines using a simple dataset (attached) with 312 
> (mostly numeric) columns, but only 421 rows. It is small, but it takes 3 
> minutes to apply my ML pipeline to it on a 24 core server with 60G of memory. 
> This seems much to long for such a tiny dataset. Similar pipelines run 
> quickly on datasets that have fewer columns and more rows. It's something 
> about the number of columns that is causing the slow performance.
> Here are a list of the stages in my pipeline:
> {code}
> 000_strIdx_5708525b2b6c
> 001_strIdx_ec2296082913
> 002_bucketizer_3cbc8811877b
> 003_bucketizer_5a01d5d78436
> 004_bucketizer_bf290d11364d
> 005_bucketizer_c3296dfe94b2
> 006_bucketizer_7071ca50eb85
> 007_bucketizer_27738213c2a1
> 008_bucketizer_bd728fd89ba1
> 009_bucketizer_e1e716f51796
> 010_bucketizer_38be665993ba
> 011_bucketizer_5a0e41e5e94f
> 012_bucketizer_b5a3d5743aaa
> 013_bucketizer_4420f98ff7ff
> 014_bucketizer_777cc4fe6d12
> 015_bucketizer_f0f3a3e5530e
> 016_bucketizer_218ecca3b5c1
> 017_bucketizer_0b083439a192
> 018_bucketizer_4520203aec27
> 019_bucketizer_462c2c346079
> 020_bucketizer_47435822e04c
> 021_bucketizer_eb9dccb5e6e8
> 022_bucketizer_b5f63dd7451d
> 023_bucketizer_e0fd5041c841
> 024_bucketizer_ffb3b9737100
> 025_bucketizer_e06c0d29273c
> 026_bucketizer_36ee535a425f
> 027_bucketizer_ee3a330269f1
> 028_bucketizer_094b58ea01c0
> 029_bucketizer_e93ea86c08e2
> 030_bucketizer_4728a718bc4b
> 031_bucketizer_08f6189c7fcc
> 032_bucketizer_11feb74901e6
> 033_bucketizer_ab4add4966c7
> 034_bucketizer_4474f7f1b8ce
> 035_bucketizer_90cfa5918d71
> 036_bucketizer_1a9ff5e4eccb
> 037_bucketizer_38085415a4f4
> 038_bucketizer_9b5e5a8d12eb
> 039_bucketizer_082bb650ecc3
> 040_bucketizer_57e1e363c483
> 041_bucketizer_337583fbfd65
> 042_bucketizer_73e8f6673262
> 043_bucketizer_0f9394ed30b8
> 044_bucketizer_8530f3570019
> 045_bucketizer_c53614f1e507
> 046_bucketizer_8fd99e6ec27b
> 047_bucketizer_6a8610496d8a
> 048_bucketizer_888b0055c1ad
> 049_bucketizer_974e0a1433a6
> 050_bucketizer_e848c0937cb9
> 051_bucketizer_95611095a4ac
> 052_bucketizer_660a6031acd9
> 053_bucketizer_aaffe5a3140d
> 054_bucketizer_8dc569be285f
> 055_bucketizer_83d1bffa07bc
> 056_bucketizer_0c6180ba75e6
> 057_bucketizer_452f265a000d
> 058_bucketizer_38e02ddfb447
> 059_bucketizer_6fa4ad5d3ebd
> 060_bucketizer_91044ee766ce
> 061_bucketizer_9a9ef04a173d
> 062_bucketizer_3d98eb15f206
> 063_bucketizer_c4915bb4d4ed
> 064_bucketizer_8ca2b6550c38
> 065_bucketizer_417ee9b760bc
> 066_bucketizer_67f3556bebe8
> 067_bucketizer_0556deb652c6
> 068_bucketizer_067b4b3d234c
> 069_bucketizer_30ba55321538
> 070_bucketizer_ad826cc5d746
> 071_bucketizer_77676a898055
> 072_bucketizer_05c37a38ce30
> 073_bucketizer_6d9ae54163ed
> 074_bucketizer_8cd668b2855d
> 075_bucketizer_d50ea1732021
> 076_bucketizer_c68f467c9559
> 077_bucketizer_ee1dfc840db1
> 078_bucketizer_83ec06a32519
> 079_bucketizer_741d08c1b69e
> 080_bucketizer_b7402e4829c7
> 081_bucketizer_8adc590dc447
> 082_bucketizer_673be99bdace
> 083_bucketizer_77693b45f94c
> 084_bucketizer_53529c6b1ac4
> 085_bucketizer_6a3ca776a81e
> 086_bucketizer_6679d9588ac1
> 087_bucketizer_6c73af456f65
> 088_bucketizer_2291b2c5ab51
> 089_bucketizer_cb3d0fe669d8
> 090_bucketizer_e71f913c1512
> 091_bucketizer_156528f65ce7
> 092_bucketizer_f3ec5dae079b
> 093_bucketizer_809fab77eee1
> 094_bucketizer_6925831511e6
> 095_bucketizer_c5d853b95707
> 096_bucketizer_e677659ca253
> 097_bucketizer_396e35548c72
> 098_bucketizer_78a6410d7a84
> 099_bucketizer_e3ae6e54bca1
> 100_bucketizer_9fed5923fe8a
> 101_bucketizer_8925ba4c3ee2
> 102_bucketizer_95750b6942b8
> 103_bucketizer_6e8b50a1918b
> 104_bucketizer_36cfcc13d4ba
> 105_bucketizer_2716d0455512
> 106_bucketizer_9bcf2891652f
> 107_bucketizer_8c3d352915f7
> 108_bucketizer_0786c17d5ef9
> 109_bucketizer_f22df23ef56f
> 110_bucketizer_bad04578bd20
> 111_bucketizer_35cfbde7e28f
> 112_bucketizer_cf89177a528b
> 113_bucketizer_183a0d393ef0
> 114_bucketizer_467c78156a67
> 115_bucketizer_380345e651ab
> 116_bucketizer_0f39f6de1625
> 117_bucketizer_d8500b2c0c2f
> 118_bucketizer_dc5f1fd09ff1
> 119_bucketizer_eeaf9e6cdaef
> 120_bucketizer_5614cd4533d7
> 121_bucketizer_2f1230e2871e
> 122_bucketizer_f8bf9d47e57e
> 123_bucketizer_2df774393575
> 124_bucketizer_259320b7fc86
> 125_bucketizer_e334afc63030
> 126_bucketizer_f17d4d6b4d94
> 127_bucketizer_da7834230ecd
> 128_bucketizer_8dbb503f658e
> 129_bucketizer_e09e2eb2b181
> 130_bucketizer_faa04fa16f3c
> 131_bucketizer_d0bd348a5613
> 132_bucketizer_de6da796e294
> 133_bucketizer_0395526346ce
> 134_bucketizer_ea3b5eb6058f
> 135_bucketizer_ad83472038f7
> 136_bucketizer_4a17c440fd16
> 137_bucketizer_d468637d4b86
> 138_bucketizer_4fc473a72f1d
> 139_vecAssembler_bd87cd105650
> 140_nb_f134e0890a0d
> 141_sql_a8590b83c826
> {code}
> There are 2 string columns that are converted to ints with 
> StringIndexerModel. Then there are bucketizers that bin all the numeric 
> columns into 2 or 3 mins each. Is there a way to bin many columns at once 
> with a single stage? I did not see a way. Next there is a VectorAssembler to 
> combine all the columns into one for the NaiveBayes classifier. Lastly, there 
> is a simple SQLTransformer to cast one the prection column to an int.
> Here is what the metadata for the two StringIndexerModelss looks like:
> {code}
> {"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1492551461778,"sparkVersion":"2.1.1","uid":"strIdx_5708525b2b6c","paramMap":{"outputCol":"ADI_IDX__","handleInvalid":"skip","inputCol":"ADI_CLEANED__"}}
> {"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1492551462004,"sparkVersion":"2.1.1","uid":"strIdx_ec2296082913","paramMap":{"outputCol":"State_IDX__","inputCol":"State_CLEANED__","handleInvalid":"skip"}}
> {code}
> The bucketizers all look very similar. Here is what the meta data for few of 
> them look like:
> {code}
> {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462636,"sparkVersion":"2.1.1","uid":"bucketizer_bd728fd89ba1","paramMap":{"outputCol":"HH_02_BINNED__","inputCol":"HH_02_CLEANED__","handleInvalid":"keep","splits":["-Inf",7521.0,12809.5,20299.0,"Inf"]}}
> {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462711,"sparkVersion":"2.1.1","uid":"bucketizer_e1e716f51796","paramMap":{"splits":["-Inf",6698.0,13690.5,"Inf"],"handleInvalid":"keep","outputCol":"HH_97_BINNED__","inputCol":"HH_97_CLEANED__"}}
> {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462784,"sparkVersion":"2.1.1","uid":"bucketizer_38be665993ba","paramMap":{"splits":["-Inf",4664.0,7242.5,11770.0,14947.0,"Inf"],"outputCol":"HH_90_BINNED__","handleInvalid":"keep","inputCol":"HH_90_CLEANED__"}}
> {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462858,"sparkVersion":"2.1.1","uid":"bucketizer_5a0e41e5e94f","paramMap":{"splits":["-Inf",6107.5,10728.5,"Inf"],"outputCol":"HH_80_BINNED__","inputCol":"HH_80_CLEANED__","handleInvalid":"keep"}}
> {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462931,"sparkVersion":"2.1.1","uid":"bucketizer_b5a3d5743aaa","paramMap":{"outputCol":"HHPG9702_BINNED__","splits":["-Inf",8.895000457763672,"Inf"],"handleInvalid":"keep","inputCol":"HHPG9702_CLEANED__"}}
> {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551463004,"sparkVersion":"2.1.1","uid":"bucketizer_4420f98ff7ff","paramMap":{"splits":["-Inf",54980.5,"Inf"],"outputCol":"MEDHI97_BINNED__","handleInvalid":"keep","inputCol":"MEDHI97_CLEANED__"}}
> {code}
> Here is the metadata for the NaiveBayes model:
> {code}
> {"class":"org.apache.spark.ml.classification.NaiveBayesModel","timestamp":1492551472568,"sparkVersion":"2.1.1","uid":"nb_f134e0890a0d","paramMap":{"modelType":"multinomial","probabilityCol":"_class_probability_column__","smoothing":1.0,"predictionCol":"_prediction_column_","rawPredictionCol":"rawPrediction","featuresCol":"_features_column__","labelCol":"DAYPOP_BINNED__"}}
> {code}
> and for the final SQLTransformer
> {code}
> {"class":"org.apache.spark.ml.feature.SQLTransformer","timestamp":1492551472804,"sparkVersion":"2.1.1","uid":"sql_a8590b83c826","paramMap":{"statement":"SELECT
>  *, CAST(_prediction_column_ AS INT) AS `_*_prediction_label_column_*__` FROM 
> __THIS__"}}
> {code}
> Why is it that the duration gets extremely slow when more than a couple 
> hundred columns (and only a few rows), but having millions of rows (with 
> fewer columns) performs fine? In addition to it being slow when applying this 
> pipeline, it is also slow to create it. The fit and evaluate steps take a few 
> minutes each. Is there anything that can be done to make it faster?
> I get similar results using 2.1.1RC, 2.1.2(tip) and 2.2.0(tip). Spark 2.1.0 
> gives a Janino 64k limit error when trying to build this pipeline (see 
> https://issues.apache.org/jira/browse/SPARK-16845).
> I stepped through in the debugger when pipeline.fit was called and noticed 
> that the queryPlan is a huge nested structure. I don't know how to interpret 
> this plan, but it is likely related to the performance problem. It is 
> attached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to