Shuffle write explosion

2018-11-04 Thread Yichen Zhou
Hi All,

When running a spark job, I have 100MB+ input and get more than 700GB
shuffle write, which is really weird. And this job finally end up with the
OOM error. Does anybody know why this happened?
[image: Screen Shot 2018-11-05 at 15.20.35.png]
My code is like:

> JavaPairRDD inputRDD = sc.sequenceFile(inputPath, Text.class,
> Text.class);

 
inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration());


Environment:

*CPU 32 core; Memory 256G; Storage 7.5GCentOS 7.5*
*java version "1.8.0_162"*
*Spark 2.1.2*

Any help is greatly appreciated.

Regards,
Yichen


Re: [Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

2018-11-04 Thread Bhaskar Ebbur
Here's some sample code.

self.session = SparkSession \
.builder \
.appName(self.app_name) \
.config("spark.dynamicAllocation.enabled", "false") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("mapreduce.fileoutputcommitter.algorithm.version", "2")
\
.config("hive.load.dynamic.partitions.thread", "10") \
.config("hive.mv.files.thread", "30") \
.config("fs.trash.interval", "0") \
.enableHiveSupport()

columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5,
partition_col1, partition_col2"
source_data_df_to_write = self.session.sql(
 "SELECT %s, %s, %s as %s, %s as %s FROM TEMP_VIEW" %
(columns_with_default))
 source_data_df_to_write\
 .coalesce(50)\
 .createOrReplaceTempView("TEMP_VIEW")

table_name_abs = "%s.%s" % (self.database, self.target_table)
self.session.sql(
"INSERT OVERWRITE TABLE %s "
"PARTITION (%s) "
"SELECT %s FROM TEMP_VIEW" % (
table_name_abs, "partition_col1, partition_col2",
columns_with_default))


On Sun, Nov 4, 2018 at 11:08 PM Jörn Franke  wrote:

> Can you share some relevant source code?
>
>
> > Am 05.11.2018 um 07:58 schrieb ehbhaskar :
> >
> > I have a pyspark job that inserts data into hive partitioned table using
> > `Insert Overwrite` statement.
> >
> > Spark job loads data quickly (in 15 mins) to temp directory
> (~/.hive-***) in
> > S3. But, it's very slow in moving data from temp directory to the target
> > path, it takes more than 40 mins to move data from temp to target path.
> >
> > I set the option mapreduce.fileoutputcommitter.algorithm.version=2
> (default
> > is 1) but still I see no change.
> >
> > *Are there any ways to improve the performance of hive INSERT OVERWRITE
> > query from spark?*
> >
> > Also, I noticed that this behavior is even worse (i.e. job takes even
> more
> > time) with hive table that has too many existing partitions. i.e. The
> data
> > loads relatively fast into table that have less existing partitions.
> >
> > *Some additional details:*
> > * Table is a dynamic partitioned table.
> > * Spark version - 2.3.0
> > * Hive version - 2.3.2-amzn-2
> > * Hadoop version - 2.8.3-amzn-0
> >
> > PS: Other config options I have tried that didn't have much effect on the
> > job performance.
> > * "hive.load.dynamic.partitions.thread - "10"
> > * "hive.mv.files.thread" - "30"
> > * "fs.trash.interval" - "0".
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: [Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

2018-11-04 Thread Jörn Franke
Can you share some relevant source code?


> Am 05.11.2018 um 07:58 schrieb ehbhaskar :
> 
> I have a pyspark job that inserts data into hive partitioned table using
> `Insert Overwrite` statement.
> 
> Spark job loads data quickly (in 15 mins) to temp directory (~/.hive-***) in
> S3. But, it's very slow in moving data from temp directory to the target
> path, it takes more than 40 mins to move data from temp to target path.
> 
> I set the option mapreduce.fileoutputcommitter.algorithm.version=2 (default
> is 1) but still I see no change.
> 
> *Are there any ways to improve the performance of hive INSERT OVERWRITE
> query from spark?*
> 
> Also, I noticed that this behavior is even worse (i.e. job takes even more
> time) with hive table that has too many existing partitions. i.e. The data
> loads relatively fast into table that have less existing partitions.
> 
> *Some additional details:*
> * Table is a dynamic partitioned table. 
> * Spark version - 2.3.0
> * Hive version - 2.3.2-amzn-2
> * Hadoop version - 2.8.3-amzn-0
> 
> PS: Other config options I have tried that didn't have much effect on the
> job performance.
> * "hive.load.dynamic.partitions.thread - "10"
> * "hive.mv.files.thread" - "30"
> * "fs.trash.interval" - "0".
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

2018-11-04 Thread ehbhaskar
I have a pyspark job that inserts data into hive partitioned table using
`Insert Overwrite` statement.

Spark job loads data quickly (in 15 mins) to temp directory (~/.hive-***) in
S3. But, it's very slow in moving data from temp directory to the target
path, it takes more than 40 mins to move data from temp to target path.

I set the option mapreduce.fileoutputcommitter.algorithm.version=2 (default
is 1) but still I see no change.

*Are there any ways to improve the performance of hive INSERT OVERWRITE
query from spark?*

Also, I noticed that this behavior is even worse (i.e. job takes even more
time) with hive table that has too many existing partitions. i.e. The data
loads relatively fast into table that have less existing partitions.

*Some additional details:*
* Table is a dynamic partitioned table. 
* Spark version - 2.3.0
* Hive version - 2.3.2-amzn-2
* Hadoop version - 2.8.3-amzn-0

PS: Other config options I have tried that didn't have much effect on the
job performance.
* "hive.load.dynamic.partitions.thread - "10"
* "hive.mv.files.thread" - "30"
* "fs.trash.interval" - "0".



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how to use cluster sparkSession like localSession

2018-11-04 Thread Sumedh Wale

  
  
Hi,

I think what you need is to have a long running Spark cluster to
which you can submit jobs dynamically.

For SQL, you can start Spark's HiveServer2:
https://spark.apache.org/docs/latest/sql-programming-guide.html#distributed-sql-engine
This will start a long running Spark cluster with a fixed
configuration (executors, cores etc) and allows Spark to act more
like a regular database. Then you can create jdbc:hive2:// JDBC
connections from your app and run SQL queries/DDLs.

For other components (or even SQL), you can start a Spark jobserver:
https://github.com/spark-jobserver/spark-jobserver
This will again start a long running Spark cluster. It also allows
you create new SparkContexts on-the-fly though that should not be
done from a web app rather configured separately by admin if
required. It will require you to implement your job as a
SparkJob/SparkSessionJob that will be provided pre-created
SparkContext/SparkSession, and these take parameters that can be
read dynamically in your implementation. You register your classes
in jars separately before-hand. Then you can call those methods
using REST API from your application providing it the required
parameters like a remote procedure call.

  Or you can try SnappyData that provides both of these (and much
  more) out of the box.
  
  Regards,
  Sumedh Wale
  SnappyData (http://www.snappydata.io)
  Documentation
  Download
  

On 02/11/18 11:22, 崔苗(数据与人工智能产品开发部)
  wrote:


  
  
  


 
   then how about spark sql and spark MLlib , we use
  them at most time 
  

   
  

  
  

  0049003208

  
  

  0049003...@znv.com

  

  
 


  签名由 网易邮箱大师 定制 
  


  On
11/2/2018 11:58,Daniel de Oliveira
  Mantovani
wrote: 


  
Please, read about Spark Streaming or Spark
  Structured Streaming. Your web application can easily
  communicate through some API and you won’t have the
  overhead of start a new spark job, which is pretty heavy.
  
  

  On Thu, Nov 1, 2018 at 23:01
崔苗(数据与人工智能产品开发部) <0049003...@znv.com>
wrote:
  
  

  


 
  Hi, 
  we want to execute spark code with out submit
application.jar,like this code:
  
  
  public static void main(String args[]) throws
Exception{
  
  
  
          SparkSession spark = SparkSession
  
                  .builder()
  
                  .master("local[*]")
  
                  .appName("spark test")
  
                  .getOrCreate();
  
        
          Dataset testData =
spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv");
  
          testData.printSchema();
  
          testData.show();
  
      }
  
  
  the above code can work well with idea , do
not need to generate jar file and submit , but
if we replace master("local[*]")
  with master("yarn")
  , it can't work , so is there a way to use
  cluster sparkSession like local sparkSession ?
   we need to dynamically execute spark code in
  web server according to the different request
  ,  such as filter request will call
  dataset.filter() , so there is no
  application.jar to submit .
 
  

   
  
   

RE: how to use cluster sparkSession like localSession

2018-11-04 Thread Sun, Keith
Hello,

I think you can try with  below , the reason is only yarn-cllient mode is 
supported for your scenario.

master("yarn-client")



Thanks very much.
Keith
From: 张万新 
Sent: Thursday, November 1, 2018 11:36 PM
To: 崔苗(数据与人工智能产品开发部) <0049003...@znv.com>
Cc: user 
Subject: Re: how to use cluster sparkSession like localSession

I think you should investigate apache zeppelin and livy
崔苗(数据与人工智能产品开发部) <0049003...@znv.com>于2018年11月2日 
周五11:01写道:

Hi,
we want to execute spark code with out submit application.jar,like this code:

public static void main(String args[]) throws Exception{
SparkSession spark = SparkSession
.builder()
.master("local[*]")
.appName("spark test")
.getOrCreate();

Dataset testData = 
spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv");
testData.printSchema();
testData.show();
}

the above code can work well with idea , do not need to generate jar file and 
submit , but if we replace master("local[*]") with master("yarn") , it can't 
work , so is there a way to use cluster sparkSession like local sparkSession ?  
we need to dynamically execute spark code in web server according to the 
different request ,  such as filter request will call dataset.filter() , so 
there is no application.jar to submit .

[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]

0049003208

0049003...@znv.com

签名由 
网易邮箱大师
 定制
- To 
unsubscribe e-mail: 
user-unsubscr...@spark.apache.org


Spark 2.4.0 artifact in Maven repository

2018-11-04 Thread Bartosz Konieczny
Hi,

Today I wanted to set up a development environment for GraphX and when I
visited Maven central repository (
https://mvnrepository.com/artifact/org.apache.spark/spark-graphx) I saw
that it was already available in 2.4.0 version. Does it mean that the new
version of Apache Spark was released ? It seems quite surprising for me
because I didn't find any release information and the 2.4 artifact was
deployed 29/10/2018. Maybe somebody here has some explanation for that ?

Best regards,
Bartosz Konieczny.