Re: Performance Problems Migrating to S3A Committers

2021-08-05 Thread James Yu
See this ticket https://issues.apache.org/jira/browse/HADOOP-17201.  It may 
help your team.

From: Johnny Burns 
Sent: Tuesday, June 22, 2021 3:41 PM
To: user@spark.apache.org 
Cc: data-orchestration-team 
Subject: Performance Problems Migrating to S3A Committers

Hello.

I’m Johnny, I work at Stripe. We’re heavy Spark users and we’ve been exploring 
using s3 committers. Currently we first write the data to HDFS and then upload 
it to S3. However, now with S3 offering strong consistency guarantees, we are 
evaluating if we can write data directly to S3.

We’re having some troubles with performance, so hoping someone might have some 
guidance which can unblock this.

File Format
We are using parquet as the File Format. We do have iceberg tables as well, and 
they are indeed able to commit directly to S3 (with minimal local disk usage). 
We can’t migrate all of our jobs to iceberg right now. Hence, we are looking 
for a committer that is performant and can directly write parquet files to S3 
(with minimal local disk usage).
What have we tried?
We’ve tried using both the “magic” and “directory” committers. We're setting 
the following configs (in addition to the "magic/directory" 
committer.name).


"spark.hadoop.fs.s3a.committer.magic.enabled":"true",


"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a":"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",


"spark.sql.sources.commitProtocolClass":"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",


"spark.sql.parquet.output.committer.class":"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",

Both committers have shown performance regressions on large jobs. We’re 
currently focused on trying to make the directory committer work because we’ve 
seen fewer slowdowns with that one, but I’ll describe the problems with each.

We’ve been testing the committers on a large job with 100k tasks (creating 
7.3TB of output).
Observations for magic committer

Using the magic committer, we see slowdowns in two places:


  *   S3 Writing (inside the task)

  *   The slowdown seems to occur just after the s3 multipart write. The 
finishedWrite
 function tries to do some cleanup and kicks off this 
deleteUnnecessaryFakeDirectories
 
function.


  *   This causes 503’s due to hitting AWS rate limits on 
com.amazonaws.services.s3.model.DeleteObjectsRequest

  *   I'm not sure what directories are actually getting cleaned up here (I 
assume the _magic directories are still needed up until the job commit).


  *   Job Commit

  *   Have not dug down into the details here, but assume it is something 
similar to what we’re seeing in the directory committer case below.

Observations for directory committer

We’ve observed that the “directory” s3committer performance is on-par with our 
existing HDFS commit for task execution and task commit. The slowdowns we’re 
seeing are in the job commit phase.

The job commit happens almost instantaneously in the HDFS case, vs taking about 
an hour for the s3 directory committer.

We’ve enabled DEBUG logging for the s3 committer. It seems like that hour is 
mostly spent doing things which you would expect (completing 100k 
delayedComplete s3 uploads). I've attached an example of some of the logs we 
see repeated over-and-over during the 1 hour job commit (I redacted some of the 
directories and SHAs but the logs are otherwise unchanged).

One thing I notice is that we see object_delete_requests += 1 in the logs. I’m 
not sure if that means it’s doing an s3 delete, or it is deleting the HDFS 
manifest files (to clean up the task).

Alternatives - Should we check out directCommitter?
We’ve also considered using the directCommitter. We understand that the 
directCommitter is discouraged because it does not support speculative 
execution (and for some failure cases). Given that we do not use speculative 
execution at Stripe, would the directCommitter be a viable option for us? What 
are the failure scenarios to consider?

Alternatives - Can S3FileIO work well with parquet files?

Netflix has a tool called s3FileIO. 
We’re wondering if it can be used with spark, or only with Iceburg.


Re: [Spark Core, PySpark] Separate stage level scheduling for consecutive map functions

2021-08-05 Thread Sean Owen
Doesn't a persist break stages?

On Thu, Aug 5, 2021, 11:40 AM Tom Graves 
wrote:

> As Sean mentioned its only available at Stage level but you said you don't
> want to shuffle so splitting into stages doesn't help you.  Without more
> details it seems like you could "hack" this by just requesting an executor
> with 1 GPU (allowing 2 tasks per gpu) and 2 CPUs and the one task would use
> the GPU and the other could just use the CPU.  Perhaps that is to
> simplistic or brittle though.
>
> Tom
> On Saturday, July 31, 2021, 03:56:18 AM CDT, Andreas Kunft <
> andreas.ku...@gmail.com> wrote:
>
>
> I have a setup with two work intensive tasks, one map using GPU followed
> by a map using only CPU.
>
> Using stage level resource scheduling, I request a GPU node, but would
> also like to execute the consecutive CPU map on a different executor so
> that the GPU node is not blocked.
>
> However, spark will always combine the two maps due to the narrow
> dependency, and thus, I can not define two different resource requirements.
>
> So the question is: can I force the two map functions on different
> executors without shuffling or even better is there a plan to enable this
> by assigning different resource requirements.
>
> Best
>


Re: [Spark Core, PySpark] Separate stage level scheduling for consecutive map functions

2021-08-05 Thread Tom Graves
 As Sean mentioned its only available at Stage level but you said you don't 
want to shuffle so splitting into stages doesn't help you.  Without more 
details it seems like you could "hack" this by just requesting an executor with 
1 GPU (allowing 2 tasks per gpu) and 2 CPUs and the one task would use the GPU 
and the other could just use the CPU.  Perhaps that is to simplistic or brittle 
though.
TomOn Saturday, July 31, 2021, 03:56:18 AM CDT, Andreas Kunft 
 wrote:  
 
 I have a setup with two work intensive tasks, one map using GPU followed by a 
map using only CPU.
Using stage level resource scheduling, I request a GPU node, but would also 
like to execute the consecutive CPU map on a different executor so that the GPU 
node is not blocked.
However, spark will always combine the two maps due to the narrow dependency, 
and thus, I can not define two different resource requirements.
So the question is: can I force the two map functions on different executors 
without shuffling or even better is there a plan to enable this by assigning 
different resource requirements.
Best  

Re: How can transform RDD[Seq[String]] to RDD[ROW]

2021-08-05 Thread Artemis User
I am not sure why you need to create an RDD first.  You can create a 
data frame directly from csv file, for instance:


spark.read.format("csv").option("header","true").schema(yourSchema).load(ftpUrl)

-- ND

On 8/5/21 3:14 AM, igyu wrote:
val ftpUrl ="ftp://test:test@ip:21/upload/test/_temporary/0/_temporary/task_2019124756_0002_m_00_0/*; 
val rdd = spark.sparkContext.wholeTextFiles(ftpUrl)

val value = rdd.map(_._2).map(csv=>csv.split(",").toSeq)

val schemas =StructType(List(
 new StructField("id", DataTypes.StringType, true), new StructField("name", 
DataTypes.StringType, true), new StructField("year", DataTypes.IntegerType, true), new 
StructField("city", DataTypes.StringType, true)))
val DF = spark.createDataFrame(value,schemas)
How can I createDataFrame


igyu




Reading SPARK 3.1.x generated parquet in SPARK 2.4.x

2021-08-05 Thread Gourav Sengupta
Hi,

we are trying to migrate some of the data lake pipelines to run in SPARK
3.x, where as the dependent pipelines using those tables will be still
running in SPARK 2.4.x for sometime to come.

Does anyone know of any issues that can happen:
1. when reading Parquet files written in 3.1.x in SPARK 2.4
2. when in the data lake some partitions have parquet files written in
SPARK 2.4.x and some are in SPARK 3.1.x.

Please note that there are no changes in schema, but later on we might end
up adding or removing some columns.

I will be really grateful for your kind help on this.

Regards,
Gourav Sengupta


Re: How can transform RDD[Seq[String]] to RDD[ROW]

2021-08-05 Thread suresh kumar pathak
May be this link will help you.
https://stackoverflow.com/questions/41898144/convert-rddstring-to-rddrow-to-dataframe-spark-scala

On Thu, Aug 5, 2021 at 12:46 PM igyu  wrote:

> val ftpUrl = 
> "ftp://test:test@ip:21/upload/test/_temporary/0/_temporary/task_2019124756_0002_m_00_0/*;
> val rdd = spark.sparkContext.wholeTextFiles(ftpUrl)
> val value = rdd.map(_._2).map(csv=>csv.split(",").toSeq)
>
> val schemas = StructType(List(
> new StructField("id", DataTypes.StringType, true),
> new StructField("name", DataTypes.StringType, true),
> new StructField("year", DataTypes.IntegerType, true),
> new StructField("city", DataTypes.StringType, true)))
> val DF = spark.createDataFrame(value,schemas)
>
> How can I createDataFrame
>
> --
> igyu
>


-- 
Thanks & Regards,
Suresh Kumar Pathak(+918884772233)


How can transform RDD[Seq[String]] to RDD[ROW]

2021-08-05 Thread igyu
val ftpUrl = 
"ftp://test:test@ip:21/upload/test/_temporary/0/_temporary/task_2019124756_0002_m_00_0/*;
val rdd = spark.sparkContext.wholeTextFiles(ftpUrl)
val value = rdd.map(_._2).map(csv=>csv.split(",").toSeq)

val schemas = StructType(List(
new StructField("id", DataTypes.StringType, true),
new StructField("name", DataTypes.StringType, true),
new StructField("year", DataTypes.IntegerType, true),
new StructField("city", DataTypes.StringType, true)))
val DF = spark.createDataFrame(value,schemas)
How can I createDataFrame



igyu