Re: More instances = slower Spark job

2017-10-01 Thread Steve Loughran

On 28 Sep 2017, at 15:27, Daniel Siegmann 
> wrote:


Can you kindly explain how Spark uses parallelism for bigger (say 1GB) text 
file? Does it use InputFormat do create multiple splits and creates 1 partition 
per split? Also, in case of S3 or NFS, how does the input split work? I 
understand for HDFS files are already pre-split so Spark can use dfs.blocksize 
to determine partitions. But how does it work other than HDFS?

S3 is similar to HDFS I think. I'm not sure off-hand how exactly it decides to 
split for the local filesystem. But it does. Maybe someone else will be able to 
explain the details.


HDFS files are split into blocks, each with a real block size and location, 
which is that created when the file was written/copied.

 If you have a 100 MB replicated on 3 machines with a block size of 64MB, you 
will have two blocks for the file: 64 and 36, with three replicas of each 
block. Blocks are placed across machines (normally, 2 hosts on one rack, 1 on 
on a different rackgives you better resilience to failures of rack 
switches). There's no attempt to colocate blocks of the same file, *except* 
that HDFS will attempt to write every block onto the host where the program 
generating the data is running. So, space permitting, if the 100MB file is 
created on host 1, then host 1 will have block-1 replica-1, and 
block-2-replica-1, with the others scattered around the cluster.

The code is actually

https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L386


Because it's fixed in HDFS, you get the block size used at creation time; 
different formats may provide their own split information independent of that 
block size though. (This also means if you different block sizes for different 
files in the set of files you process, there may be different splits for each 
file, as well as different locations.

With HDFS replication, you get the bandwidth of all the hard disks serving up 
data. With a 100 MB file split in two, if those blocks were actually saved onto 
different physical Hard disks (say SAS disks with 6 gb/s), then you have 3 x 2 
x 6 gb/s bandwidth, for a max of 24 gb/s. (of course, there's the other work 
competing for disk IO); that's maximum. If spark schedules the work on those 
machines and you have the Hadoop native libraries installed (i.e. you don't get 
told off in the logs for not having them), then the HDFS client running in the 
spark processes can talk direct to the HDFS datanode and get give a native OS 
file handle to read those blocks: there isn't even a network stack to 
interfere. If you are working with remote data, then the network slows things 
down..

The S3A client just makes things up. you can configure the settings to lie 
about block size. If you have 100MB files and want to split the work five ways, 
in that job, set

spark.hadoop.fs.s3a.block.size = 20971520

The other object stores have different options, but it's the same thing really. 
You get to choose client size what Spark is told, which is then used by the 
driver to make its decisions about which splits to give to which drivers for 
processing, the order, etc.

Unlike HDFS, the bandwidth you get off S3 for a single file is fixed, 
irrespective of how many blocks you tell the client there are. Declaring 
setting a lower block size & so allowing more workers at the data isn't going 
to guarantee more performance, you'll just be sharing the same IO rate

...though, talking to S3, a big factor in performance working with the data is 
actually cost of breaking and recreating HTTP connections, which happens a lot 
if you have seek-heavy code reading large files. And the columnar formats, ORC 
and Parquet, are seek heavy, provided they aren't gzipped. Reading these files 
has pretty awful performance until you run Hadoop 2.8+ and tell S3A that you 
are doing random IO (which kills .gz reading, use wisely)

spark.hadoop.fs.s3a.experimental.fadvise random


All this stuff and more is all in the source files —don't be afraid to look 
into it to see what's going on. I always recommend starting with the stack 
traces you get when things aren't working right. If you are using S3, that's 
all in : 
https://github.com/apache/hadoop/tree/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a

-steve




Re: More instances = slower Spark job

2017-10-01 Thread Steve Loughran

> On 28 Sep 2017, at 14:45, ayan guha  wrote:
> 
> Hi
> 
> Can you kindly explain how Spark uses parallelism for bigger (say 1GB) text 
> file? Does it use InputFormat do create multiple splits and creates 1 
> partition per split?

Yes, Input formats give you their splits, this is usually used to decide how to 
break things up, As to how that gets used: you'll have to look at the source as 
I'll only get it wrong. Key point: it's part of the information which can be 
used to partition the work, but the number of available workers is the other 
big factor.



> Also, in case of S3 or NFS, how does the input split work? I understand for 
> HDFS files are already pre-split so Spark can use dfs.blocksize to determine 
> partitions. But how does it work other than HDFS?

there's invariably a config option to allow you tell spark what blocksize to 
work with, e.g fs.s3a.block.size ., which you set in spark defaults to 
something like

spark.hadoop.fs.s3a.block.size 67108864

to set it to 64MB. 

HDFS also provides locality information: where the data is. Other filesytems 
don't do that, they usually just say "localhost", which Spark recognises as 
"anywhere"...it schedules work on different parts of a file wherever there is 
free capacity.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: More instances = slower Spark job

2017-10-01 Thread Jeroen Miller
Vadim's "scheduling within an application" approach turned out to be
excellent, at least on a single node with the CPU usage reaching about
90%. I directly implemented the code template that Vadim kindly
provided:

parallel_collection_paths.foreach(
path => {
val lines = spark.read.textFile(path)
val pattern = "my_simple_regex".r
val filtered_lines = lines.filter(
line => {
val matched = pattern.findFirstMatchIn(line)
matched match {
case Some(m) => true
case None => false
}
}
)
val output_dir = get_output_dir(path)
filtered_lines.write.option("compression", "gzip").text(output_dir)
}
)

For ForkJoinPool(parallelism), I simply used a parallelism value
equals to the number of executors. Each input JSON file was processed
on a single partition, and a unique output part- file was
generated for each input JSON file.

For some reason that I still have to investigate, this does not scale
as well when using multiple instances, though the performances are
still acceptable. Any idea why?

Another approach was to simply repartition the DataSet before filtering:

val lines = spark.read.textFile(path).repartition(n)
val filtered_lines = lines.filter(...)

That helped a lot (at least when running on a single node) but not as
much as Vadim's approach.

A minor remark. I'm filtering the JSON lines with a simple regex:
However, I had to declare the regex in the parallel collection's
foreach() otherwise the Spark task would fail in the Spark shell with:

org.apache.spark.SparkException: Task not serializable

Why can't the scala.util.matching.Regex be serialized?

Jeroen

On Thu, Sep 28, 2017 at 9:16 PM, Vadim Semenov
 wrote:
> Instead of having one job, you can try processing each
> file in a separate job, but run multiple jobs in parallel
> within one SparkContext. Something like this should work for
> you, it'll submit N jobs from the driver, the jobs will run
> independently, but executors will dynamically work on
> different jobs, so you'll utilize executors at full.
>
> ```
> import org.apache.spark.sql.SparkSession
> import scala.collection.parallel.ForkJoinTaskSupport
>
> val spark: SparkSession
> val files: Seq[String]
> val filesParallelCollection = files.toParArray
> val howManyFilesToProcessInParallel = math.min(50, files.length)
>
> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
>   new
> scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcessInParallel)
> )
> filesParallelCollection.foreach(file => {
>   spark.read.text(file).filter(…)…
> })
> ```

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: More instances = slower Spark job

2017-10-01 Thread Gourav Sengupta
Hi Jeroen,

I do not believe that I completely agree with the idea that you will be
spending more time and memory that way.

But if that was also the case why are you not using data frames and UDF?


Regards,
Gourav

On Sun, Oct 1, 2017 at 6:17 PM, Jeroen Miller 
wrote:

> On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta
>  wrote:
> > Why are you not using JSON reader of SPARK?
>
> Since the filter I want to perform is so simple, I do not want to
> spend time and memory to deserialise the JSON lines.
>
> Jeroen
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: More instances = slower Spark job

2017-10-01 Thread Jeroen Miller
On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta
 wrote:
> Why are you not using JSON reader of SPARK?

Since the filter I want to perform is so simple, I do not want to
spend time and memory to deserialise the JSON lines.

Jeroen

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: More instances = slower Spark job

2017-09-29 Thread Vadim Semenov
Hi Jeroen,

> However, am I correct in assuming that all the filtering will be then
performed on the driver (since the .gz files are not splittable), albeit in
several threads?

Filtering will not happen on the driver, it'll happen on executors, since
`spark.read.json(…).filter(…).write(…)` is a separate job. But you have to
submit each job in a separate thread, because each thread will get locked
until the corresponding job finishes, so that's why you have to use
`parallel collections`, you could also just use Futures, but it's just
easier to use a `ParArray`.

Internally it will work this way: once one task finishes decompressing a
file, many tasks will get scheduled (based on `spark.default.parallelism`),
and the executor that decompressed the file will start processing lines
using all available threads, and after some time additional executors may
join (based on the locality levels), and then after filtering, you would
have to repartition back to 1 partition, so you could write just one
`.gzip` file.

And for each file, there will be a separate job, but because they all run
within one Spark Context, executors will stay with the job, and will work
on all files simultaneously.
See more about scheduling within one application:
https://spark.apache.org/docs/2.2.0/job-scheduling.html#
scheduling-within-an-application

On Fri, Sep 29, 2017 at 12:58 PM, Jeroen Miller 
wrote:

> On Thu, Sep 28, 2017 at 11:55 PM, Jeroen Miller 
> wrote:
> > On Thu, Sep 28, 2017 at 9:16 PM, Vadim Semenov
> >  wrote:
> >> Instead of having one job, you can try processing each file in a
> separate
> >> job, but run multiple jobs in parallel within one SparkContext.
>
> Hello Vadim,
>
> Today was a bit busy and I did not have the time to play with your
> idea. However, am I correct in assuming that all the filtering will be
> then performed on the driver (since the .gz files are not splittable),
> albeit in several threads?
>
> If this is correct, then I guess the proper way to tackle this task
> would be to run without any executors, but using all the cores and
> memory of the machine for the driver?
>
> I will keep you posted on my progress,
>
> Thanks,
>
> Jeroen
>


Re: More instances = slower Spark job

2017-09-29 Thread Gourav Sengupta
I think that the best option is to see whether data frames option of
reading JSON files works or not.



On Fri, Sep 29, 2017 at 3:53 PM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> Does each gzip file look like this:
>
> {json1}
> {json2}
> {json3}
>
> meaning that each line is a separate json object?
>
> I proccess a similar large file batch and what I do is this:
>
> input.txt # each line in input.txt represents a path to a gzip file each
> containing a json object every line
> my_rdd = sc.parallelize(input.txt) # creats a rdd with each file_path as
> a row
> my_rdd = my_rdd.flatmap(open_files) # opens the files and yields them line
> by line
> my_rdd = my_rdd.map(do_something_with_files) # now do something with each
> line
>
> the important part at least in python is the yield, because it makes the
> function memory efficient. You could even go further and only yield a json
> if it matches the regex criteria saving you the map(). Maybe yield a
> (path,json) pair to later reconstruct which line goes into which file.
> Reduce the rdd and write out the file.
>
> If all files in input.txt are to big to be processed at once consider
> dividing input.txt into smaller chunks and process each chunk individually.
>
> On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> I think that Vadim's response makes a lot of sense in terms of utilizing
>> SPARK. Why are you not using JSON reader of SPARK? Your input has to follow
>> a particular JSON style, but then it would be interesting to know whether
>> you have looked into it at all.
>>
>> If you are going to read them only once then there is really no need to
>> convert them and then read them.
>>
>> I will be really interested to hear in case you were able to using json
>> reader natively available in SPARK.
>>
>>
>> Regards,
>> Gourav
>>
>> On Thu, Sep 28, 2017 at 8:16 PM, Vadim Semenov <
>> vadim.seme...@datadoghq.com> wrote:
>>
>>> Instead of having one job, you can try processing each file in a
>>> separate job, but run multiple jobs in parallel within one SparkContext.
>>> Something like this should work for you, it'll submit N jobs from the
>>> driver, the jobs will run independently, but executors will dynamically
>>> work on different jobs, so you'll utilize executors at full.
>>>
>>> ```
>>> import org.apache.spark.sql.SparkSession
>>>
>>> import scala.collection.parallel.ForkJoinTaskSupport
>>>
>>> val spark: SparkSession
>>> val files: Seq[String]
>>> val filesParallelCollection = files.toParArray
>>> val howManyFilesToProcessInParallel = math.min(50, files.length)
>>>
>>> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
>>>   new scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcess
>>> InParallel)
>>> )
>>> filesParallelCollection.foreach(file => {
>>>   spark.read.text(file).filter(…)…
>>> })
>>> ```
>>>
>>> On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller 
>>> wrote:
>>>
 More details on what I want to achieve. Maybe someone can suggest a
 course of action.

 My processing is extremely simple: reading .json.gz text
 files, filtering each line according a regex, and saving the surviving
 lines in a similarly named .gz file.

 Unfortunately changing the data format is impossible (we are dealing
 with terabytes here) so I need to process .gz files.

 Ideally, I would like to process a large number of such files in
 parallel, that is using n_e executors which would each take care of a
 fraction 1/n_e of all the files. The trouble is that I don't know how
 to process files in parallel without loading them in the driver first,
 which would result in a major bottleneck.

 Here was my naive approach in Scala-like pseudo-code:

 //
 // This happens on the driver
 //
 val files = List("s3://bckt/file-1.json.gz", ...,
 "s3://bckt/file-N.json.gz")
 val files_rdd = sc.parallelize(files, num_partitions)
 //
 // Now files_rdd (which only holds file names) is distributed across
 several executors
 // and/or nodes
 //

 files_rdd.foreach(
 //
 // It is my understanding that what is performed within the foreach
 method
 // will be parallelized on several executors / nodes
 //
 file => {
 //
 // This would happen on a given executor: a given input file
 is processed
 // entirely on a given executor
 //
 val lines = sc.read.text(file)
 val filtered_lines = lines.filter( // filter based on regex // )
 filtered_lines.write.option("compression",
 "gzip").text("a_filename_tbd")
 }
 )

 Unfortunately this is not possible since the Spark context sc is
 defined in the driver and cannot be shared.

 My problem would be entirely solved if I could manage to read files
 not from the driver, 

Re: More instances = slower Spark job

2017-09-29 Thread Alexander Czech
Does each gzip file look like this:

{json1}
{json2}
{json3}

meaning that each line is a separate json object?

I proccess a similar large file batch and what I do is this:

input.txt # each line in input.txt represents a path to a gzip file each
containing a json object every line
my_rdd = sc.parallelize(input.txt) # creats a rdd with each file_path as a
row
my_rdd = my_rdd.flatmap(open_files) # opens the files and yields them line
by line
my_rdd = my_rdd.map(do_something_with_files) # now do something with each
line

the important part at least in python is the yield, because it makes the
function memory efficient. You could even go further and only yield a json
if it matches the regex criteria saving you the map(). Maybe yield a
(path,json) pair to later reconstruct which line goes into which file.
Reduce the rdd and write out the file.

If all files in input.txt are to big to be processed at once consider
dividing input.txt into smaller chunks and process each chunk individually.

On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta  wrote:

> I think that Vadim's response makes a lot of sense in terms of utilizing
> SPARK. Why are you not using JSON reader of SPARK? Your input has to follow
> a particular JSON style, but then it would be interesting to know whether
> you have looked into it at all.
>
> If you are going to read them only once then there is really no need to
> convert them and then read them.
>
> I will be really interested to hear in case you were able to using json
> reader natively available in SPARK.
>
>
> Regards,
> Gourav
>
> On Thu, Sep 28, 2017 at 8:16 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> Instead of having one job, you can try processing each file in a separate
>> job, but run multiple jobs in parallel within one SparkContext.
>> Something like this should work for you, it'll submit N jobs from the
>> driver, the jobs will run independently, but executors will dynamically
>> work on different jobs, so you'll utilize executors at full.
>>
>> ```
>> import org.apache.spark.sql.SparkSession
>>
>> import scala.collection.parallel.ForkJoinTaskSupport
>>
>> val spark: SparkSession
>> val files: Seq[String]
>> val filesParallelCollection = files.toParArray
>> val howManyFilesToProcessInParallel = math.min(50, files.length)
>>
>> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
>>   new scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcess
>> InParallel)
>> )
>> filesParallelCollection.foreach(file => {
>>   spark.read.text(file).filter(…)…
>> })
>> ```
>>
>> On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller 
>> wrote:
>>
>>> More details on what I want to achieve. Maybe someone can suggest a
>>> course of action.
>>>
>>> My processing is extremely simple: reading .json.gz text
>>> files, filtering each line according a regex, and saving the surviving
>>> lines in a similarly named .gz file.
>>>
>>> Unfortunately changing the data format is impossible (we are dealing
>>> with terabytes here) so I need to process .gz files.
>>>
>>> Ideally, I would like to process a large number of such files in
>>> parallel, that is using n_e executors which would each take care of a
>>> fraction 1/n_e of all the files. The trouble is that I don't know how
>>> to process files in parallel without loading them in the driver first,
>>> which would result in a major bottleneck.
>>>
>>> Here was my naive approach in Scala-like pseudo-code:
>>>
>>> //
>>> // This happens on the driver
>>> //
>>> val files = List("s3://bckt/file-1.json.gz", ...,
>>> "s3://bckt/file-N.json.gz")
>>> val files_rdd = sc.parallelize(files, num_partitions)
>>> //
>>> // Now files_rdd (which only holds file names) is distributed across
>>> several executors
>>> // and/or nodes
>>> //
>>>
>>> files_rdd.foreach(
>>> //
>>> // It is my understanding that what is performed within the foreach
>>> method
>>> // will be parallelized on several executors / nodes
>>> //
>>> file => {
>>> //
>>> // This would happen on a given executor: a given input file
>>> is processed
>>> // entirely on a given executor
>>> //
>>> val lines = sc.read.text(file)
>>> val filtered_lines = lines.filter( // filter based on regex // )
>>> filtered_lines.write.option("compression",
>>> "gzip").text("a_filename_tbd")
>>> }
>>> )
>>>
>>> Unfortunately this is not possible since the Spark context sc is
>>> defined in the driver and cannot be shared.
>>>
>>> My problem would be entirely solved if I could manage to read files
>>> not from the driver, but from a given executor.
>>>
>>> Another possibility would be to read each .gz file in the driver
>>> (about 2GB each), materializing the whole resulting RDD on the driver
>>> (around 12GB) and then calling repartition on that RDD, but only the
>>> regex part would be parallelized, and the data shuffling will probably
>>> ruin the performance.

Re: More instances = slower Spark job

2017-09-28 Thread Gourav Sengupta
I think that Vadim's response makes a lot of sense in terms of utilizing
SPARK. Why are you not using JSON reader of SPARK? Your input has to follow
a particular JSON style, but then it would be interesting to know whether
you have looked into it at all.

If you are going to read them only once then there is really no need to
convert them and then read them.

I will be really interested to hear in case you were able to using json
reader natively available in SPARK.


Regards,
Gourav

On Thu, Sep 28, 2017 at 8:16 PM, Vadim Semenov 
wrote:

> Instead of having one job, you can try processing each file in a separate
> job, but run multiple jobs in parallel within one SparkContext.
> Something like this should work for you, it'll submit N jobs from the
> driver, the jobs will run independently, but executors will dynamically
> work on different jobs, so you'll utilize executors at full.
>
> ```
> import org.apache.spark.sql.SparkSession
>
> import scala.collection.parallel.ForkJoinTaskSupport
>
> val spark: SparkSession
> val files: Seq[String]
> val filesParallelCollection = files.toParArray
> val howManyFilesToProcessInParallel = math.min(50, files.length)
>
> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
>   new scala.concurrent.forkjoin.ForkJoinPool(
> howManyFilesToProcessInParallel)
> )
> filesParallelCollection.foreach(file => {
>   spark.read.text(file).filter(…)…
> })
> ```
>
> On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller 
> wrote:
>
>> More details on what I want to achieve. Maybe someone can suggest a
>> course of action.
>>
>> My processing is extremely simple: reading .json.gz text
>> files, filtering each line according a regex, and saving the surviving
>> lines in a similarly named .gz file.
>>
>> Unfortunately changing the data format is impossible (we are dealing
>> with terabytes here) so I need to process .gz files.
>>
>> Ideally, I would like to process a large number of such files in
>> parallel, that is using n_e executors which would each take care of a
>> fraction 1/n_e of all the files. The trouble is that I don't know how
>> to process files in parallel without loading them in the driver first,
>> which would result in a major bottleneck.
>>
>> Here was my naive approach in Scala-like pseudo-code:
>>
>> //
>> // This happens on the driver
>> //
>> val files = List("s3://bckt/file-1.json.gz", ...,
>> "s3://bckt/file-N.json.gz")
>> val files_rdd = sc.parallelize(files, num_partitions)
>> //
>> // Now files_rdd (which only holds file names) is distributed across
>> several executors
>> // and/or nodes
>> //
>>
>> files_rdd.foreach(
>> //
>> // It is my understanding that what is performed within the foreach
>> method
>> // will be parallelized on several executors / nodes
>> //
>> file => {
>> //
>> // This would happen on a given executor: a given input file
>> is processed
>> // entirely on a given executor
>> //
>> val lines = sc.read.text(file)
>> val filtered_lines = lines.filter( // filter based on regex // )
>> filtered_lines.write.option("compression",
>> "gzip").text("a_filename_tbd")
>> }
>> )
>>
>> Unfortunately this is not possible since the Spark context sc is
>> defined in the driver and cannot be shared.
>>
>> My problem would be entirely solved if I could manage to read files
>> not from the driver, but from a given executor.
>>
>> Another possibility would be to read each .gz file in the driver
>> (about 2GB each), materializing the whole resulting RDD on the driver
>> (around 12GB) and then calling repartition on that RDD, but only the
>> regex part would be parallelized, and the data shuffling will probably
>> ruin the performance.
>>
>> Any idea?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: More instances = slower Spark job

2017-09-28 Thread Jeroen Miller
On Thu, Sep 28, 2017 at 9:02 PM, Jörn Franke  wrote:
> It looks to me a little bit strange. First json.gz files are single threaded, 
> ie each file can only be processed by one thread (so it is good to have many 
> files of around 128 MB to 512 MB size each).

Indeed. Unfortunately, the files I have to work with are quite a bit larger.

> Then what you do in the code is already done by the data source. There is no 
> need to read the file directory and parallelize. Just provide the directory 
> containing the files to the data source and Spark automatically takes care to 
> read them from different executors.

Very true. My motivation behind my contrived idea is that I need to
replicate the same file tree structure after filtering -- that does
not seems easy if I build a huge RDD from all input files.

> In order  improve write Performance check if you can store them in Avro (or 
> parquet or orc) using their internal compression feature. Then you can have 
> even many threads/file.

Indeed, 50% of my processing time is spent uploaded the results to S3.

Thank you for your input.

Jeroen

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: More instances = slower Spark job

2017-09-28 Thread Vadim Semenov
Instead of having one job, you can try processing each file in a separate
job, but run multiple jobs in parallel within one SparkContext.
Something like this should work for you, it'll submit N jobs from the
driver, the jobs will run independently, but executors will dynamically
work on different jobs, so you'll utilize executors at full.

```
import org.apache.spark.sql.SparkSession

import scala.collection.parallel.ForkJoinTaskSupport

val spark: SparkSession
val files: Seq[String]
val filesParallelCollection = files.toParArray
val howManyFilesToProcessInParallel = math.min(50, files.length)

filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
  new
scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcessInParallel)
)
filesParallelCollection.foreach(file => {
  spark.read.text(file).filter(…)…
})
```

On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller 
wrote:

> More details on what I want to achieve. Maybe someone can suggest a
> course of action.
>
> My processing is extremely simple: reading .json.gz text
> files, filtering each line according a regex, and saving the surviving
> lines in a similarly named .gz file.
>
> Unfortunately changing the data format is impossible (we are dealing
> with terabytes here) so I need to process .gz files.
>
> Ideally, I would like to process a large number of such files in
> parallel, that is using n_e executors which would each take care of a
> fraction 1/n_e of all the files. The trouble is that I don't know how
> to process files in parallel without loading them in the driver first,
> which would result in a major bottleneck.
>
> Here was my naive approach in Scala-like pseudo-code:
>
> //
> // This happens on the driver
> //
> val files = List("s3://bckt/file-1.json.gz", ...,
> "s3://bckt/file-N.json.gz")
> val files_rdd = sc.parallelize(files, num_partitions)
> //
> // Now files_rdd (which only holds file names) is distributed across
> several executors
> // and/or nodes
> //
>
> files_rdd.foreach(
> //
> // It is my understanding that what is performed within the foreach
> method
> // will be parallelized on several executors / nodes
> //
> file => {
> //
> // This would happen on a given executor: a given input file
> is processed
> // entirely on a given executor
> //
> val lines = sc.read.text(file)
> val filtered_lines = lines.filter( // filter based on regex // )
> filtered_lines.write.option("compression",
> "gzip").text("a_filename_tbd")
> }
> )
>
> Unfortunately this is not possible since the Spark context sc is
> defined in the driver and cannot be shared.
>
> My problem would be entirely solved if I could manage to read files
> not from the driver, but from a given executor.
>
> Another possibility would be to read each .gz file in the driver
> (about 2GB each), materializing the whole resulting RDD on the driver
> (around 12GB) and then calling repartition on that RDD, but only the
> regex part would be parallelized, and the data shuffling will probably
> ruin the performance.
>
> Any idea?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: More instances = slower Spark job

2017-09-28 Thread Jörn Franke
It looks to me a little bit strange. First json.gz files are single threaded, 
ie each file can only be processed by one thread (so it is good to have many 
files of around 128 MB to 512 MB size each).

Then what you do in the code is already done by the data source. There is no 
need to read the file directory and parallelize. Just provide the directory 
containing the files to the data source and Spark automatically takes care to 
read them from different executors.

In order  improve write Performance check if you can store them in Avro (or 
parquet or orc) using their internal compression feature. Then you can have 
even many threads/file. 
If you need to store them as json.gz make sure that the files are between 128 
and 512 MB.

> On 28. Sep 2017, at 20:50, Jeroen Miller  wrote:
> 
> More details on what I want to achieve. Maybe someone can suggest a
> course of action.
> 
> My processing is extremely simple: reading .json.gz text
> files, filtering each line according a regex, and saving the surviving
> lines in a similarly named .gz file.
> 
> Unfortunately changing the data format is impossible (we are dealing
> with terabytes here) so I need to process .gz files.
> 
> Ideally, I would like to process a large number of such files in
> parallel, that is using n_e executors which would each take care of a
> fraction 1/n_e of all the files. The trouble is that I don't know how
> to process files in parallel without loading them in the driver first,
> which would result in a major bottleneck.
> 
> Here was my naive approach in Scala-like pseudo-code:
> 
> //
> // This happens on the driver
> //
> val files = List("s3://bckt/file-1.json.gz", ..., "s3://bckt/file-N.json.gz")
> val files_rdd = sc.parallelize(files, num_partitions)
> //
> // Now files_rdd (which only holds file names) is distributed across
> several executors
> // and/or nodes
> //
> 
> files_rdd.foreach(
>//
>// It is my understanding that what is performed within the foreach method
>// will be parallelized on several executors / nodes
>//
>file => {
>//
>// This would happen on a given executor: a given input file
> is processed
>// entirely on a given executor
>//
>val lines = sc.read.text(file)
>val filtered_lines = lines.filter( // filter based on regex // )
>filtered_lines.write.option("compression",
> "gzip").text("a_filename_tbd")
>}
> )
> 
> Unfortunately this is not possible since the Spark context sc is
> defined in the driver and cannot be shared.
> 
> My problem would be entirely solved if I could manage to read files
> not from the driver, but from a given executor.
> 
> Another possibility would be to read each .gz file in the driver
> (about 2GB each), materializing the whole resulting RDD on the driver
> (around 12GB) and then calling repartition on that RDD, but only the
> regex part would be parallelized, and the data shuffling will probably
> ruin the performance.
> 
> Any idea?
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: More instances = slower Spark job

2017-09-28 Thread Jeroen Miller
More details on what I want to achieve. Maybe someone can suggest a
course of action.

My processing is extremely simple: reading .json.gz text
files, filtering each line according a regex, and saving the surviving
lines in a similarly named .gz file.

Unfortunately changing the data format is impossible (we are dealing
with terabytes here) so I need to process .gz files.

Ideally, I would like to process a large number of such files in
parallel, that is using n_e executors which would each take care of a
fraction 1/n_e of all the files. The trouble is that I don't know how
to process files in parallel without loading them in the driver first,
which would result in a major bottleneck.

Here was my naive approach in Scala-like pseudo-code:

//
// This happens on the driver
//
val files = List("s3://bckt/file-1.json.gz", ..., "s3://bckt/file-N.json.gz")
val files_rdd = sc.parallelize(files, num_partitions)
//
// Now files_rdd (which only holds file names) is distributed across
several executors
// and/or nodes
//

files_rdd.foreach(
//
// It is my understanding that what is performed within the foreach method
// will be parallelized on several executors / nodes
//
file => {
//
// This would happen on a given executor: a given input file
is processed
// entirely on a given executor
//
val lines = sc.read.text(file)
val filtered_lines = lines.filter( // filter based on regex // )
filtered_lines.write.option("compression",
"gzip").text("a_filename_tbd")
}
)

Unfortunately this is not possible since the Spark context sc is
defined in the driver and cannot be shared.

My problem would be entirely solved if I could manage to read files
not from the driver, but from a given executor.

Another possibility would be to read each .gz file in the driver
(about 2GB each), materializing the whole resulting RDD on the driver
(around 12GB) and then calling repartition on that RDD, but only the
regex part would be parallelized, and the data shuffling will probably
ruin the performance.

Any idea?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: More instances = slower Spark job

2017-09-28 Thread Daniel Siegmann
On Thu, Sep 28, 2017 at 7:23 AM, Gourav Sengupta 
wrote:

>
> I will be very surprised if someone tells me that a 1 GB CSV text file is
> automatically split and read by multiple executors in SPARK. It does not
> matter whether it stays in HDFS, S3 or any other system.
>

I can't speak to *any* system, but I can confirm for HDFS, S3, and local
filesystems a 1 GB CSV file would be split.


>
> Now if someone tells me that in case I have a smaller CSV file of 100MB
> size and that will be split while being read, that will also be surprising.
>

I'm not sure what the default is. It may be 128 MB, in which case that file
would not be split.

Keep in mind gzipped files cannot be split. If you have very large text
files and you want to compress them, and they will be > a few hundred MB
compressed, you should probably use bzip2 instead (which can be split).


Re: More instances = slower Spark job

2017-09-28 Thread Daniel Siegmann
> Can you kindly explain how Spark uses parallelism for bigger (say 1GB)
> text file? Does it use InputFormat do create multiple splits and creates 1
> partition per split? Also, in case of S3 or NFS, how does the input split
> work? I understand for HDFS files are already pre-split so Spark can use
> dfs.blocksize to determine partitions. But how does it work other than HDFS?
>

S3 is similar to HDFS I think. I'm not sure off-hand how exactly it decides
to split for the local filesystem. But it does. Maybe someone else will be
able to explain the details.


Re: More instances = slower Spark job

2017-09-28 Thread Gourav Sengupta
Hi,

I will be very surprised if someone tells me that a 1 GB CSV text file is
automatically split and read by multiple executors in SPARK. It does not
matter whether it stays in HDFS, S3 or any other system.

Now if someone tells me that in case I have a smaller CSV file of 100MB
size and that will be split while being read, that will also be surprising.

Once SPARK has loaded it into its cache, things are ofcourse different.


Regards,
Gourav

On Thu, Sep 28, 2017 at 2:45 PM, ayan guha  wrote:

> Hi
>
> Can you kindly explain how Spark uses parallelism for bigger (say 1GB)
> text file? Does it use InputFormat do create multiple splits and creates 1
> partition per split? Also, in case of S3 or NFS, how does the input split
> work? I understand for HDFS files are already pre-split so Spark can use
> dfs.blocksize to determine partitions. But how does it work other than HDFS?
>
> On Thu, Sep 28, 2017 at 11:26 PM, Daniel Siegmann <
> dsiegm...@securityscorecard.io> wrote:
>
>>
>> no matter what you do and how many nodes you start, in case you have a
>>> single text file, it will not use parallelism.
>>>
>>
>> This is not true, unless the file is small or is gzipped (gzipped files
>> cannot be split).
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: More instances = slower Spark job

2017-09-28 Thread ayan guha
Hi

Can you kindly explain how Spark uses parallelism for bigger (say 1GB) text
file? Does it use InputFormat do create multiple splits and creates 1
partition per split? Also, in case of S3 or NFS, how does the input split
work? I understand for HDFS files are already pre-split so Spark can use
dfs.blocksize to determine partitions. But how does it work other than HDFS?

On Thu, Sep 28, 2017 at 11:26 PM, Daniel Siegmann <
dsiegm...@securityscorecard.io> wrote:

>
> no matter what you do and how many nodes you start, in case you have a
>> single text file, it will not use parallelism.
>>
>
> This is not true, unless the file is small or is gzipped (gzipped files
> cannot be split).
>



-- 
Best Regards,
Ayan Guha


Re: More instances = slower Spark job

2017-09-28 Thread Daniel Siegmann
> no matter what you do and how many nodes you start, in case you have a
> single text file, it will not use parallelism.
>

This is not true, unless the file is small or is gzipped (gzipped files
cannot be split).


RE: More instances = slower Spark job

2017-09-28 Thread JG Perrin
As the others have mentioned, your loading time might kill your benchmark… I am 
in a similar process right now, but I time each operation, load, process 1, 
process 2, etc. not always easy with lazy operators, but you can try to force 
operations with false collect and cache (for benchmarking purpose).

Also, give processing more importance (unless you really only want to have this 
light processing). Heavy computation (ML for example) should show a difference, 
but it may not be your use case.

From: Sonal Goyal [mailto:sonalgoy...@gmail.com]
Sent: Thursday, September 28, 2017 4:30 AM
To: Tejeshwar J1 <tejeshwar...@globallogic.com.invalid>
Cc: Jeroen Miller <bluedasya...@gmail.com>; user@spark.apache.org
Subject: Re: More instances = slower Spark job

Also check if the compression algorithm you use is splittable?

Thanks,
Sonal
Nube Technologies<http://www.nubetech.co>




On Thu, Sep 28, 2017 at 2:17 PM, Tejeshwar J1 
<tejeshwar...@globallogic.com.invalid<mailto:tejeshwar...@globallogic.com.invalid>>
 wrote:
Hi Miller,

Try using
1.coalesce(numberOfPartitions) to reduce the number of partitions in order to 
avoid idle cores .
2.Try reducing executor memory as you increase the number of executors.
3. Try performing GC or changing naïve java serialization to kryo serialization.


Thanks,
Tejeshwar


From: Jeroen Miller 
[mailto:bluedasya...@gmail.com<mailto:bluedasya...@gmail.com>]
Sent: Thursday, September 28, 2017 2:11 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: More instances = slower Spark job

Hello,

I am experiencing a disappointing performance issue with my Spark jobs
as I scale up the number of instances.

The task is trivial: I am loading large (compressed) text files from S3,
filtering out lines that do not match a regex, counting the numbers
of remaining lines and saving the resulting datasets as (compressed)
text files on S3. Nothing that a simple grep couldn't do, except that
the files are too large to be downloaded and processed locally.

On a single instance, I can process X GBs per hour. When scaling up
to 10 instances, I noticed that processing the /same/ amount of data
actually takes /longer/.

This is quite surprising as the task is really simple: I was expecting
a significant speed-up. My naive idea was that each executors would
process a fraction of the input file, count the remaining lines /locally/,
and save their part of the processed file /independently/, thus no data
shuffling would occur.

Obviously, this is not what is happening.

Can anyone shed some light on this or provide pointers to relevant
information?

Regards,

Jeroen




Re: More instances = slower Spark job

2017-09-28 Thread Gourav Sengupta
Hi,

no matter what you do and how many nodes you start, in case you have a
single text file, it will not use parallelism.

Therefore there are options of transferring the textfile to parquet, and
other formats, or just splitting the text file itself into several
individual files.

Please do let me know how things are progressing.

Regards,
Gourav Sengupita

On Thu, Sep 28, 2017 at 9:41 AM, Jeroen Miller 
wrote:

> Hello,
>
> I am experiencing a disappointing performance issue with my Spark jobs
> as I scale up the number of instances.
>
> The task is trivial: I am loading large (compressed) text files from S3,
> filtering out lines that do not match a regex, counting the numbers
> of remaining lines and saving the resulting datasets as (compressed)
> text files on S3. Nothing that a simple grep couldn't do, except that
> the files are too large to be downloaded and processed locally.
>
> On a single instance, I can process X GBs per hour. When scaling up
> to 10 instances, I noticed that processing the /same/ amount of data
> actually takes /longer/.
>
> This is quite surprising as the task is really simple: I was expecting
> a significant speed-up. My naive idea was that each executors would
> process a fraction of the input file, count the remaining lines /locally/,
> and save their part of the processed file /independently/, thus no data
> shuffling would occur.
>
> Obviously, this is not what is happening.
>
> Can anyone shed some light on this or provide pointers to relevant
> information?
>
> Regards,
>
> Jeroen
>
>


Re: More instances = slower Spark job

2017-09-28 Thread Sonal Goyal
Also check if the compression algorithm you use is splittable?

Thanks,
Sonal
Nube Technologies 





On Thu, Sep 28, 2017 at 2:17 PM, Tejeshwar J1 <
tejeshwar...@globallogic.com.invalid> wrote:

> Hi Miller,
>
>
>
> Try using
>
> 1.*coalesce(numberOfPartitions*) to reduce the number of partitions in
> order to avoid idle cores .
>
> 2.Try reducing executor memory as you increase the number of executors.
>
> 3. Try performing GC or changing naïve java serialization to *kryo*
> serialization.
>
>
>
>
>
> Thanks,
>
> Tejeshwar
>
>
>
>
>
> *From:* Jeroen Miller [mailto:bluedasya...@gmail.com]
> *Sent:* Thursday, September 28, 2017 2:11 PM
> *To:* user@spark.apache.org
> *Subject:* More instances = slower Spark job
>
>
>
> Hello,
>
>
>
> I am experiencing a disappointing performance issue with my Spark jobs
>
> as I scale up the number of instances.
>
>
>
> The task is trivial: I am loading large (compressed) text files from S3,
>
> filtering out lines that do not match a regex, counting the numbers
>
> of remaining lines and saving the resulting datasets as (compressed)
>
> text files on S3. Nothing that a simple grep couldn't do, except that
>
> the files are too large to be downloaded and processed locally.
>
>
>
> On a single instance, I can process X GBs per hour. When scaling up
>
> to 10 instances, I noticed that processing the /same/ amount of data
>
> actually takes /longer/.
>
>
>
> This is quite surprising as the task is really simple: I was expecting
>
> a significant speed-up. My naive idea was that each executors would
>
> process a fraction of the input file, count the remaining lines /locally/,
>
> and save their part of the processed file /independently/, thus no data
>
> shuffling would occur.
>
>
>
> Obviously, this is not what is happening.
>
>
>
> Can anyone shed some light on this or provide pointers to relevant
>
> information?
>
>
>
> Regards,
>
>
>
> Jeroen
>
>
>


Re: More instances = slower Spark job

2017-09-28 Thread Steve Loughran

On 28 Sep 2017, at 09:41, Jeroen Miller 
> wrote:

Hello,

I am experiencing a disappointing performance issue with my Spark jobs
as I scale up the number of instances.

The task is trivial: I am loading large (compressed) text files from S3,
filtering out lines that do not match a regex, counting the numbers
of remaining lines and saving the resulting datasets as (compressed)
text files on S3. Nothing that a simple grep couldn't do, except that
the files are too large to be downloaded and processed locally.

On a single instance, I can process X GBs per hour. When scaling up
to 10 instances, I noticed that processing the /same/ amount of data
actually takes /longer/.

This is quite surprising as the task is really simple: I was expecting
a significant speed-up. My naive idea was that each executors would
process a fraction of the input file, count the remaining lines /locally/,
and save their part of the processed file /independently/, thus no data
shuffling would occur.

Obviously, this is not what is happening.

Can anyone shed some light on this or provide pointers to relevant
information?


Expect the bandwidth of file input to be shared across all workers trying to 
read different parts of the same file. Two workers reading a file: half the B/W 
each. HDFS avoids this by splitting files into blocks and replicating each one 
by three: max bandwidth of a file is 3 * blocks(file). For S3, if one reader 
has bandwidth B, two readers get bandwidth B/2.


There are some subtleties related to multipart upload - know that if you can do 
multipart upload/copy with an upload size M, and set the emulated block size on 
the clients to be same value then you should get better paralallism, as the 
read bandwidth is really per uploaded block


fs.s3a.block.size 64M
fs.s3a.multipart.size 64M
fs.s3a.multipart.threshold 64M


The other scale issue is that AWS throttles S3 IO per shard of the data, sends 
503 responses back to the clients, which then sleep a bit to back off. Add more 
clients, more throttling, more sleep, same in-shard bandwidth.

a single bucket can have data on >1 shard, where different parts of the tree 
are on different shards

http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html

Sadly, the normal layout of directory trees in big data sources 
(year/month/day) biases code to all try and work with the same shard.

The currently shipping s3a: client doesn't handle throttling, so that won't be 
the cause of your problem. If you are using EMR and its s3: connector, it may 
be.



RE: More instances = slower Spark job

2017-09-28 Thread Tejeshwar J1
Hi Miller,



Try using

1.*coalesce(numberOfPartitions*) to reduce the number of partitions in
order to avoid idle cores .

2.Try reducing executor memory as you increase the number of executors.

3. Try performing GC or changing naïve java serialization to *kryo*
serialization.





Thanks,

Tejeshwar





*From:* Jeroen Miller [mailto:bluedasya...@gmail.com]
*Sent:* Thursday, September 28, 2017 2:11 PM
*To:* user@spark.apache.org
*Subject:* More instances = slower Spark job



Hello,



I am experiencing a disappointing performance issue with my Spark jobs

as I scale up the number of instances.



The task is trivial: I am loading large (compressed) text files from S3,

filtering out lines that do not match a regex, counting the numbers

of remaining lines and saving the resulting datasets as (compressed)

text files on S3. Nothing that a simple grep couldn't do, except that

the files are too large to be downloaded and processed locally.



On a single instance, I can process X GBs per hour. When scaling up

to 10 instances, I noticed that processing the /same/ amount of data

actually takes /longer/.



This is quite surprising as the task is really simple: I was expecting

a significant speed-up. My naive idea was that each executors would

process a fraction of the input file, count the remaining lines /locally/,

and save their part of the processed file /independently/, thus no data

shuffling would occur.



Obviously, this is not what is happening.



Can anyone shed some light on this or provide pointers to relevant

information?



Regards,



Jeroen