Re: More instances = slower Spark job

2017-10-01 Thread Steve Loughran

On 28 Sep 2017, at 15:27, Daniel Siegmann 
> wrote:


Can you kindly explain how Spark uses parallelism for bigger (say 1GB) text 
file? Does it use InputFormat do create multiple splits and creates 1 partition 
per split? Also, in case of S3 or NFS, how does the input split work? I 
understand for HDFS files are already pre-split so Spark can use dfs.blocksize 
to determine partitions. But how does it work other than HDFS?

S3 is similar to HDFS I think. I'm not sure off-hand how exactly it decides to 
split for the local filesystem. But it does. Maybe someone else will be able to 
explain the details.


HDFS files are split into blocks, each with a real block size and location, 
which is that created when the file was written/copied.

 If you have a 100 MB replicated on 3 machines with a block size of 64MB, you 
will have two blocks for the file: 64 and 36, with three replicas of each 
block. Blocks are placed across machines (normally, 2 hosts on one rack, 1 on 
on a different rackgives you better resilience to failures of rack 
switches). There's no attempt to colocate blocks of the same file, *except* 
that HDFS will attempt to write every block onto the host where the program 
generating the data is running. So, space permitting, if the 100MB file is 
created on host 1, then host 1 will have block-1 replica-1, and 
block-2-replica-1, with the others scattered around the cluster.

The code is actually

https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L386


Because it's fixed in HDFS, you get the block size used at creation time; 
different formats may provide their own split information independent of that 
block size though. (This also means if you different block sizes for different 
files in the set of files you process, there may be different splits for each 
file, as well as different locations.

With HDFS replication, you get the bandwidth of all the hard disks serving up 
data. With a 100 MB file split in two, if those blocks were actually saved onto 
different physical Hard disks (say SAS disks with 6 gb/s), then you have 3 x 2 
x 6 gb/s bandwidth, for a max of 24 gb/s. (of course, there's the other work 
competing for disk IO); that's maximum. If spark schedules the work on those 
machines and you have the Hadoop native libraries installed (i.e. you don't get 
told off in the logs for not having them), then the HDFS client running in the 
spark processes can talk direct to the HDFS datanode and get give a native OS 
file handle to read those blocks: there isn't even a network stack to 
interfere. If you are working with remote data, then the network slows things 
down..

The S3A client just makes things up. you can configure the settings to lie 
about block size. If you have 100MB files and want to split the work five ways, 
in that job, set

spark.hadoop.fs.s3a.block.size = 20971520

The other object stores have different options, but it's the same thing really. 
You get to choose client size what Spark is told, which is then used by the 
driver to make its decisions about which splits to give to which drivers for 
processing, the order, etc.

Unlike HDFS, the bandwidth you get off S3 for a single file is fixed, 
irrespective of how many blocks you tell the client there are. Declaring 
setting a lower block size & so allowing more workers at the data isn't going 
to guarantee more performance, you'll just be sharing the same IO rate

...though, talking to S3, a big factor in performance working with the data is 
actually cost of breaking and recreating HTTP connections, which happens a lot 
if you have seek-heavy code reading large files. And the columnar formats, ORC 
and Parquet, are seek heavy, provided they aren't gzipped. Reading these files 
has pretty awful performance until you run Hadoop 2.8+ and tell S3A that you 
are doing random IO (which kills .gz reading, use wisely)

spark.hadoop.fs.s3a.experimental.fadvise random


All this stuff and more is all in the source files —don't be afraid to look 
into it to see what's going on. I always recommend starting with the stack 
traces you get when things aren't working right. If you are using S3, that's 
all in : 
https://github.com/apache/hadoop/tree/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a

-steve




Re: More instances = slower Spark job

2017-10-01 Thread Steve Loughran

> On 28 Sep 2017, at 14:45, ayan guha  wrote:
> 
> Hi
> 
> Can you kindly explain how Spark uses parallelism for bigger (say 1GB) text 
> file? Does it use InputFormat do create multiple splits and creates 1 
> partition per split?

Yes, Input formats give you their splits, this is usually used to decide how to 
break things up, As to how that gets used: you'll have to look at the source as 
I'll only get it wrong. Key point: it's part of the information which can be 
used to partition the work, but the number of available workers is the other 
big factor.



> Also, in case of S3 or NFS, how does the input split work? I understand for 
> HDFS files are already pre-split so Spark can use dfs.blocksize to determine 
> partitions. But how does it work other than HDFS?

there's invariably a config option to allow you tell spark what blocksize to 
work with, e.g fs.s3a.block.size ., which you set in spark defaults to 
something like

spark.hadoop.fs.s3a.block.size 67108864

to set it to 64MB. 

HDFS also provides locality information: where the data is. Other filesytems 
don't do that, they usually just say "localhost", which Spark recognises as 
"anywhere"...it schedules work on different parts of a file wherever there is 
free capacity.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: More instances = slower Spark job

2017-10-01 Thread Jeroen Miller
Vadim's "scheduling within an application" approach turned out to be
excellent, at least on a single node with the CPU usage reaching about
90%. I directly implemented the code template that Vadim kindly
provided:

parallel_collection_paths.foreach(
path => {
val lines = spark.read.textFile(path)
val pattern = "my_simple_regex".r
val filtered_lines = lines.filter(
line => {
val matched = pattern.findFirstMatchIn(line)
matched match {
case Some(m) => true
case None => false
}
}
)
val output_dir = get_output_dir(path)
filtered_lines.write.option("compression", "gzip").text(output_dir)
}
)

For ForkJoinPool(parallelism), I simply used a parallelism value
equals to the number of executors. Each input JSON file was processed
on a single partition, and a unique output part- file was
generated for each input JSON file.

For some reason that I still have to investigate, this does not scale
as well when using multiple instances, though the performances are
still acceptable. Any idea why?

Another approach was to simply repartition the DataSet before filtering:

val lines = spark.read.textFile(path).repartition(n)
val filtered_lines = lines.filter(...)

That helped a lot (at least when running on a single node) but not as
much as Vadim's approach.

A minor remark. I'm filtering the JSON lines with a simple regex:
However, I had to declare the regex in the parallel collection's
foreach() otherwise the Spark task would fail in the Spark shell with:

org.apache.spark.SparkException: Task not serializable

Why can't the scala.util.matching.Regex be serialized?

Jeroen

On Thu, Sep 28, 2017 at 9:16 PM, Vadim Semenov
 wrote:
> Instead of having one job, you can try processing each
> file in a separate job, but run multiple jobs in parallel
> within one SparkContext. Something like this should work for
> you, it'll submit N jobs from the driver, the jobs will run
> independently, but executors will dynamically work on
> different jobs, so you'll utilize executors at full.
>
> ```
> import org.apache.spark.sql.SparkSession
> import scala.collection.parallel.ForkJoinTaskSupport
>
> val spark: SparkSession
> val files: Seq[String]
> val filesParallelCollection = files.toParArray
> val howManyFilesToProcessInParallel = math.min(50, files.length)
>
> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
>   new
> scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcessInParallel)
> )
> filesParallelCollection.foreach(file => {
>   spark.read.text(file).filter(…)…
> })
> ```

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: More instances = slower Spark job

2017-10-01 Thread Gourav Sengupta
Hi Jeroen,

I do not believe that I completely agree with the idea that you will be
spending more time and memory that way.

But if that was also the case why are you not using data frames and UDF?


Regards,
Gourav

On Sun, Oct 1, 2017 at 6:17 PM, Jeroen Miller 
wrote:

> On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta
>  wrote:
> > Why are you not using JSON reader of SPARK?
>
> Since the filter I want to perform is so simple, I do not want to
> spend time and memory to deserialise the JSON lines.
>
> Jeroen
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming - Multiple Spark Contexts (SparkSQL) Performance

2017-10-01 Thread Gerard Maas
Hammad,

The recommended way to implement this logic would be to:

Create a SparkSession.
Create a Streaming Context using the SparkContext embedded in the
SparkSession

Use the single SparkSession instance for the SQL operations within the
foreachRDD.
It's important to note that spark operations can process the complete
dataset. In this case, there's no need to do a perPartition or perElement
operation. (that would be the case if we were directly using the drivers
API and DB connections)

Reorganizing the code in the question a bit, we should have:

 SparkSession sparkSession = SparkSession
.builder()
.setMaster("local[2]").setAppName("TransformerStreamPOC")

.config("spark.some.config.option", "some-value")
.getOrCreate();

JavaStreamingContext jssc = new
JavaStreamingContext(sparkSession.sparkContext,
Durations.seconds(60));

// this dataset doesn't seem to depend on the received data, so we can
load it once.

Dataset baselineData =
sparkSession.read().jdbc(MYSQL_CONNECTION_URL, "table_name",
connectionProperties);

// create dstream

DStream dstream = ...

... operations on dstream...

dstream.foreachRDD { rdd =>

Dataset incomingData = sparkSession.createDataset(rdd)

   ... do something the incoming dataset, eg. join with the baseline ...

   DataFrame joined =  incomingData.join(baselineData, ...)

   ... do something with joined ...

  }


kr, Gerard.

On Sun, Oct 1, 2017 at 7:55 PM, Hammad  wrote:

> Hello,
>
> *Background:*
>
> I have Spark Streaming context;
>
> SparkConf conf = new 
> SparkConf().setMaster("local[2]").setAppName("TransformerStreamPOC");
> conf.set("spark.driver.allowMultipleContexts", "true");   *<== this*
> JavaStreamingContext jssc = new JavaStreamingContext(conf, 
> Durations.seconds(60));
>
>
> that subscribes to certain kafka *topics*;
>
> JavaInputDStream> stream =
> KafkaUtils.createDirectStream(
> jssc,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(*topics*, 
> kafkaParams)
> );
>
> when messages arrive in queue, I recursively process them as follows (below 
> code section will repeat in Question statement)
>
> stream.foreachRDD(rdd -> {
> //process here - below two scenarions code is inserted here
>
> });
>
>
> *Question starts here:*
>
> Since I need to apply SparkSQL to received events in Queue - I create 
> SparkSession with two scenarios;
>
> *1) Per partition one sparkSession (after 
> "spark.driver.allowMultipleContexts" set to true); so all events under this 
> partition are handled by same sparkSession*
>
> rdd.foreachPartition(partition -> {
> SparkSession sparkSession = SparkSession
> .builder()
> .appName("Java Spark SQL basic example")
> .config("spark.some.config.option", "some-value")
> .getOrCreate();
>
> while (partition.hasNext()) {
>   Dataset df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL, 
> "table_name", connectionProperties);
>
> }}
>
> *2) Per event under each session; so each event under each queue under each 
> stream has one sparkSession;*
>
> rdd.foreachPartition(partition -> {while (partition.hasNext()) {
> SparkSession sparkSession = SparkSession.builder().appName("Java Spark SQL 
> basic example").config("spark.some.config.option", 
> "some-value").getOrCreate();
>
> Dataset df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL, 
> "table_name", connectionProperties);
>
> }}
>
>
> Is it good practice to create multiple contexts (lets say 10 or 100)?
> How does number of sparkContext to be allowed vs number of worker nodes
> relate?
> What are performance considerations with respect to scenario1 and
> scenario2?
>
> I am looking for these answers as I feel there is more to what I
> understand of performance w.r.t sparkContexts created by a streaming
> application.
> Really appreciate your support in anticipation.
>
> Hammad
>
>


Re: NullPointerException error while saving Scala Dataframe to HBase

2017-10-01 Thread Marco Mistroni
Hi
 The question is getting to the list.
I have no experience in hbase ...though , having seen similar stuff when
saving a df somewhere else...it might have to do with the properties you
need to set to let spark know it is dealing with hbase? Don't u need to set
some properties on the spark context you are using?
Hth
 Marco


On Oct 1, 2017 4:33 AM,  wrote:

Hi Guys- am not sure whether the email is reaching to the community
members. Please can somebody acknowledge

Sent from my iPhone

> On 30-Sep-2017, at 5:02 PM, Debabrata Ghosh  wrote:
>
> Dear All,
>Greetings ! I am repeatedly hitting a NullPointerException
error while saving a Scala Dataframe to HBase. Please can you help
resolving this for me. Here is the code snippet:
>
> scala> def catalog = s"""{
>  ||"table":{"namespace":"default", "name":"table1"},
>  ||"rowkey":"key",
>  ||"columns":{
>  |  |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
>  |  |"col1":{"cf":"cf1", "col":"col1", "type":"string"}
>  ||}
>  |  |}""".stripMargin
> catalog: String
>
> scala> case class HBaseRecord(
>  |col0: String,
>  |col1: String)
> defined class HBaseRecord
>
> scala> val data = (0 to 255).map { i =>  HBaseRecord(i.toString, "extra")}
> data: scala.collection.immutable.IndexedSeq[HBaseRecord] =
Vector(HBaseRecord(0,extra), HBaseRecord(1,extra), HBaseRecord
>
> (2,extra), HBaseRecord(3,extra), HBaseRecord(4,extra),
HBaseRecord(5,extra), HBaseRecord(6,extra), HBaseRecord(7,extra),
>
> HBaseRecord(8,extra), HBaseRecord(9,extra), HBaseRecord(10,extra),
HBaseRecord(11,extra), HBaseRecord(12,extra),
>
> HBaseRecord(13,extra), HBaseRecord(14,extra), HBaseRecord(15,extra),
HBaseRecord(16,extra), HBaseRecord(17,extra),
>
> HBaseRecord(18,extra), HBaseRecord(19,extra), HBaseRecord(20,extra),
HBaseRecord(21,extra), HBaseRecord(22,extra),
>
> HBaseRecord(23,extra), HBaseRecord(24,extra), HBaseRecord(25,extra),
HBaseRecord(26,extra), HBaseRecord(27,extra),
>
> HBaseRecord(28,extra), HBaseRecord(29,extra), HBaseRecord(30,extra),
HBaseRecord(31,extra), HBase...
>
> scala> import org.apache.spark.sql.datasources.hbase
> import org.apache.spark.sql.datasources.hbase
>
>
> scala> import org.apache.spark.sql.datasources.hbase.{HBaseTableCatalog}
> import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
>
> scala> 
> sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog
-> catalog, HBaseTableCatalog.newTable ->
>
> "5")).format("org.apache.hadoop.hbase.spark").save()
>
> java.lang.NullPointerException
>   at org.apache.hadoop.hbase.spark.HBaseRelation.(
DefaultSource.scala:134)
>   at org.apache.hadoop.hbase.spark.DefaultSource.createRelation(
DefaultSource.scala:75)
>   at org.apache.spark.sql.execution.datasources.
DataSource.write(DataSource.scala:426)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>   ... 56 elided
>
>
> Thanks in advance !
>
> Debu
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Fwd: Spark Streaming - Multiple Spark Contexts (SparkSQL) Performance

2017-10-01 Thread Hammad
Hello,

*Background:*

I have Spark Streaming context;

SparkConf conf = new
SparkConf().setMaster("local[2]").setAppName("TransformerStreamPOC");
conf.set("spark.driver.allowMultipleContexts", "true");   *<== this*
JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(60));


that subscribes to certain kafka *topics*;

JavaInputDStream> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(*topics*,
kafkaParams)
);

when messages arrive in queue, I recursively process them as follows
(below code section will repeat in Question statement)

stream.foreachRDD(rdd -> {
//process here - below two scenarions code is inserted here

});


*Question starts here:*

Since I need to apply SparkSQL to received events in Queue - I create
SparkSession with two scenarios;

*1) Per partition one sparkSession (after
"spark.driver.allowMultipleContexts" set to true); so all events under
this partition are handled by same sparkSession*

rdd.foreachPartition(partition -> {
SparkSession sparkSession = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();

while (partition.hasNext()) {
  Dataset df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL,
"table_name", connectionProperties);

}}

*2) Per event under each session; so each event under each queue under
each stream has one sparkSession;*

rdd.foreachPartition(partition -> {while (partition.hasNext()) {
 SparkSession sparkSession = SparkSession.builder().appName("Java
Spark SQL basic example").config("spark.some.config.option",
"some-value").getOrCreate();

Dataset df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL,
"table_name", connectionProperties);

}}


Is it good practice to create multiple contexts (lets say 10 or 100)?
How does number of sparkContext to be allowed vs number of worker nodes
relate?
What are performance considerations with respect to scenario1 and scenario2?

I am looking for these answers as I feel there is more to what I understand
of performance w.r.t sparkContexts created by a streaming application.
Really appreciate your support in anticipation.

Hammad


Re: More instances = slower Spark job

2017-10-01 Thread Jeroen Miller
On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta
 wrote:
> Why are you not using JSON reader of SPARK?

Since the filter I want to perform is so simple, I do not want to
spend time and memory to deserialise the JSON lines.

Jeroen

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to find the temporary views' DDL

2017-10-01 Thread Sun, Keith
Hello,

Is there a way to find the DDL of the “temporary” view created in current 
session with spark sql:

For example :
create or replace temporary view
tmp_v as
select
c1 from table table_x;

“Show create table “ does not work for this case as it is not a table .
“Describe” could  show the columns while not the ddl.


Thanks very much.
Keith

From: Anastasios Zouzias [mailto:zouz...@gmail.com]
Sent: Sunday, October 1, 2017 3:05 PM
To: Kanagha Kumar 
Cc: user @spark 
Subject: Re: Error - Spark reading from HDFS via dataframes - Java

Hi,

Set the inferschema option to true in spark-csv. you may also want to set the 
mode option. See readme below

https://github.com/databricks/spark-csv/blob/master/README.md

Best,
Anastasios

Am 01.10.2017 07:58 schrieb "Kanagha Kumar" 
>:
Hi,

I'm trying to read data from HDFS in spark as dataframes. Printing the schema, 
I see all columns are being read as strings. I'm converting it to RDDs and 
creating another dataframe by passing in the correct schema ( how the rows 
should be interpreted finally).

I'm getting the following error:

Caused by: java.lang.RuntimeException: java.lang.String is not a valid external 
type for schema of bigint


Spark read API:

Dataset hdfs_dataset = new SQLContext(spark).read().option("header", 
"false").csv("hdfs:/inputpath/*");

Dataset ds = new 
SQLContext(spark).createDataFrame(hdfs_dataset.toJavaRDD(), conversionSchema);
This is the schema to be converted to:
StructType(StructField(COL1,StringType,true),
StructField(COL2,StringType,true),
StructField(COL3,LongType,true),
StructField(COL4,StringType,true),
StructField(COL5,StringType,true),
StructField(COL6,LongType,true))

This is the original schema obtained once read API was invoked
StructType(StructField(_c1,StringType,true),
StructField(_c2,StringType,true),
StructField(_c3,StringType,true),
StructField(_c4,StringType,true),
StructField(_c5,StringType,true),
StructField(_c6,StringType,true))

My interpretation is even when a JavaRDD is cast to dataframe by passing in the 
new schema, values are not getting type casted.
This is occurring because the above read API reads data as string types from 
HDFS.

How can I  convert an RDD to dataframe by passing in the correct schema once it 
is read?
How can the values by type cast correctly during this RDD to dataframe 
conversion?

Or how can I read data from HDFS with an input schema in java?
Any suggestions are helpful. Thanks!




Re: Error - Spark reading from HDFS via dataframes - Java

2017-10-01 Thread Anastasios Zouzias
Hi,

Set the inferschema option to true in spark-csv. you may also want to set
the mode option. See readme below

https://github.com/databricks/spark-csv/blob/master/README.md

Best,
Anastasios

Am 01.10.2017 07:58 schrieb "Kanagha Kumar" :

Hi,

I'm trying to read data from HDFS in spark as dataframes. Printing the
schema, I see all columns are being read as strings. I'm converting it to
RDDs and creating another dataframe by passing in the correct schema ( how
the rows should be interpreted finally).

I'm getting the following error:

Caused by: java.lang.RuntimeException: java.lang.String is not a valid
external type for schema of bigint



Spark read API:

Dataset hdfs_dataset = new SQLContext(spark).read().option("header",
"false").csv("hdfs:/inputpath/*");

Dataset ds = new
SQLContext(spark).createDataFrame(hdfs_dataset.toJavaRDD(),
conversionSchema);
This is the schema to be converted to:
StructType(StructField(COL1,StringType,true),
StructField(COL2,StringType,true),
StructField(COL3,LongType,true),
StructField(COL4,StringType,true),
StructField(COL5,StringType,true),
StructField(COL6,LongType,true))

This is the original schema obtained once read API was invoked
StructType(StructField(_c1,StringType,true),
StructField(_c2,StringType,true),
StructField(_c3,StringType,true),
StructField(_c4,StringType,true),
StructField(_c5,StringType,true),
StructField(_c6,StringType,true))

My interpretation is even when a JavaRDD is cast to dataframe by passing in
the new schema, values are not getting type casted.
This is occurring because the above read API reads data as string types
from HDFS.

How can I  convert an RDD to dataframe by passing in the correct schema
once it is read?
How can the values by type cast correctly during this RDD to dataframe
conversion?

Or how can I read data from HDFS with an input schema in java?
Any suggestions are helpful. Thanks!