read parallel processing spark-cassandra

2018-02-13 Thread sujeet jog
Folks,

I have a time series table with each record being 350 columns.

the primary key is  ((date, bucket), objectid, timestamp)
objective is to  read 1 day worth of data, which comes to around 12k
partitions, each partition has around 25MB of data,
I see only 1 task active during the read operation, on a 5 node cluster, (8
cores each ),  does this mean not enough spark partitions are getting
created ?
i have also set the input.split.size_in_mb to a lower number. like 10 .
Any pointers in this regard would be helpful.,


Thanks,


not able to read git info from Scala Test Suite

2018-02-13 Thread karan alang
Hello - I'm writing a scala unittest for my Spark project
which checks the git information, and somehow it is not working from the
Unit Test

Added in pom.xml
--



pl.project13.maven
git-commit-id-plugin
2.2.4


get-the-git-infos

revision




{g...@github.com}/test.git
flat
true
true
true

true

true






folder structures :

{project_dir}/module/pom.xml
{project_dir}/module/src/main/scala/BuildInfo.scala
  (i'm able to read the git info from this file)
 {project_dir}/module_folder/test/main/scala/BuildInfoSuite.scala
  (i'm NOT able to read the git info from this file)


Any ideas on what i need to do to get this working ?


[Spark GraphX pregel] default value for EdgeDirection not consistent between programming guide and API documentation

2018-02-13 Thread Ramon Bejar Torres

Hi,

I just wanted to notice that in the API doc page for the pregel operator 
(graphX API for spark 2.2.1):


http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)%E2%87%92VD,(EdgeTriplet[VD,ED])%E2%87%92Iterator[(VertexId,A)],(A,A)%E2%87%92A)(ClassTag[A]):Graph[VD,ED] 



It says that the default value for the activeDir parameter is 
EdgeDirection.Either



However, in the GraphX programming guide:

http://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api 



in the type signature for the pregel operator the default value 
indicated is EdgeDirection.Out. For consistency, I think that it should 
be changed in the programming guide to the actual default value (Either).



Regards,

Ramon Béjar

DIEI - UdL


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Inefficient state management in stream to stream join in 2.3

2018-02-13 Thread Yogesh Mahajan
In 2.3, stream to stream joins(both Inner and Outer) are implemented using
symmetric hash join(SHJ) algorithm, and that is a good choice
and I am sure you had compared with other family of algorithms like XJoin
and non-blocking sort based algorithms like progressive merge join (PMJ

)

*From functional point of view - *
1. It considers most of the stream to stream join use cases and all the
considerations around event time and watermarks as joins keys are well
thought trough.
2. It also adopts an effective approach towards join state management is to
exploit 'hard' constraints in the input streams to reduce state rather than
exploiting statistical properties as 'soft' constraints.

*From performance point of view - *
Since SHJ assumes that the entire join state can be kept in main memory,
but the StateStore in Spark is backed by the HDFS compatible file system.
Also looking at the code StreamingSymmetricHashJoinExec here
,
two StateStores(KeyToNumValuesStore, KeyWithIndexToValueStore) are used and
multiple lookups to them in each
StreamExecution(MicroBatch/ContinuousExecution)
per partition per operator will have huge performance penalty even for a
moderate size of state of queries like groupBy “SYMBOL”

To overcome this perf hit, even though we implement our own efficient
in-memory StateStore, there is no way to avoid these multiple lookups
unless and until you have your own StreamingSymmetricHashJoinExec
implementation.

We should consider using efficient main-memory data structures described in
this paper

which are suited for storing sliding windows, with efficient support for
removing tuples that have fallen out of the state.

Other way to reduce unnecessary state using punctuations

(in contrast to existing way where constraints have to be known a priori). A
punctuation is a tuple of patterns specifying a predicate that must
evaluate to false for all future data tuples in the stream and these can be
inserted dynamically.

For example consider two streams join, auctionStream and bidStream. When a
particular auction closes, system inserts a punctuation into the bidStream
to signal that there will be no more bids for that particular auction
and purges
those tuples that cannot possibly join with future arrivals. PJoin
 is one example of stream join
algorithm which exploits punctuations.

Thanks,
http://www.snappydata.io/blog 


Why python cluster mode is not supported in standalone cluster?

2018-02-13 Thread Ashwin Sai Shankar
Hi Spark users!
I noticed that spark doesn't allow python apps to run in cluster mode in
spark standalone cluster. Does anyone know the reason? I checked jira but
couldn't find anything relevant.

Thanks,
Ashwin


Re: [Structured Streaming] Avoiding multiple streaming queries

2018-02-13 Thread Yogesh Mahajan
I had a similar issue and i think that’s where the structured streaming
design lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column
store suited for fast ingestion.

I have a Kafka stream coming from one topic, and I need to classify the
stream based on schema.
For example, a Kafka topic can have three different types of schema
messages and I would like to ingest into the three different column
tables(having different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries
reading the same topic and ingesting to respective column tables using
their Sink implementations.
These three streaming queries create underlying three IncrementalExecutions
and three KafkaSources, and three queries reading the same data from the
same Kafka topic.
Even with CachedKafkaConsumers at partition level, this is not an efficient
way to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all
the messages in a Kafka partition, unfortunately this is not in our control
and customers cannot change it due to their dependencies on other
subsystems.

Thanks,
http://www.snappydata.io/blog 

On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava  wrote:

> I have a structured streaming query which sinks to Kafka.  This query has
> a complex aggregation logic.
>
>
> I would like to sink the output DF of this query to multiple Kafka topics
> each partitioned on a different ‘key’ column.  I don’t want to have
> multiple Kafka sinks for each of the different Kafka topics because that
> would mean running multiple streaming queries - one for each Kafka topic,
> especially since my aggregation logic is complex.
>
>
> Questions:
>
> 1.  Is there a way to output the results of a structured streaming query
> to multiple Kafka topics each with a different key column but without
> having to execute multiple streaming queries?
>
>
> 2.  If not,  would it be efficient to cascade the multiple queries such
> that the first query does the complex aggregation and writes output
> to Kafka and then the other queries just read the output of the first query
> and write their topics to Kafka thus avoiding doing the complex aggregation
> again?
>
>
> Thanks in advance for any help.
>
>
> Priyank
>
>
>


Re: org.apache.kafka.clients.consumer.OffsetOutOfRangeException

2018-02-13 Thread dcam
Hi Mina

I believe this is different for Structured Streaming from Kafka,
specifically. I'm assuming you are using structured streaming based on the
name of the dependency ("spark-streaming-kafka"). There is a note in the
docs here:
https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#kafka-specific-configurations

So, instead of setting Kafka's client property, instead set the Spark config
"startingOffsets" to "earliest". There are examples here:
https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries

Setting "startingOffsets" has worked for me. I have not tried setting the
kafka property directly.

Cheers,
Dave



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 2.2.1 EMR 5.11.1 Encrypted S3 bucket overwriting parquet file

2018-02-13 Thread Stephen Robinson
Hi All,


I am using the latest version of EMR to overwrite Parquet files to an S3 bucket 
encrypted with a KMS key. I am seeing the attached error whenever I Overwrite a 
parquet file. For example the below code produces the attached error and 
stacktrace:


List(1,2,3).toDF().write.mode("Overwrite").parquet("s3://some-encrypted-bucket/some-object")
List(1,2,3,4).toDF().write.mode("Overwrite").parquet("s3://some-encrypted-bucket/some-object")

The first call succeeds but the second fails.

If I change the s3:// part to the s3a:// protocal I do not see the error. I 
believe this to be an EMR error but mentioning it here just in case anyone else 
has seen this or if it might be a spark bug.

Thanks,

Steve



Stephen Robinson

steve.robin...@aquilainsight.com
+441312902300

[http://www.aquilainsight.com/wp-content/uploads/2018/01/Aquila_Merkle_Stacked_RGB2.jpg][https://aquilainsight.sharepoint.com/Phoenix/_layouts/15/guestaccess.aspx?docid=09bc3deabab834330b118c699d68811f3=AT811IVQ0fDqbqXikpeo8j4][https://aquilainsight.sharepoint.com/Phoenix/_layouts/15/guestaccess.aspx?guestaccesstoken=XWaweiSSd7YO1IFgfwqm3AAn7KKCsmBf%2f73IlT3d0zE%3d=0cea80d160d954b9584aef7090a5c4ef5=1]

www.aquilainsight.com
[https://aquilainsight.sharepoint.com/Phoenix/_layouts/15/guestaccess.aspx?guestaccesstoken=N79xtBiBY4r5ry1TCu0P%2bce%2f%2b3HFTwwamnQ47PieOoo%3d=03f7d1040c43f4fa0bcdf7f17fa89dfcc=1]linkedin.com/aquilainsight
[https://aquilainsight.sharepoint.com/Phoenix/_layouts/15/guestaccess.aspx?guestaccesstoken=fdX1gHdkBdEZ%2bOap1Nr7kTrjMoFxgTZI4RfHFw0R7mw%3d=0869faaa87f6c402fa845a320c225e213=1]twitter.com/aquilainsight


This email and any attachments transmitted with it are intended for use by the 
intended recipient(s) only. If you have received this email in error, please 
notify the sender immediately and then delete it. If you are not the intended 
recipient, you must not keep, use, disclose, copy or distribute this email 
without the author's prior permission. We take precautions to minimize the risk 
of transmitting software viruses, but we advise you to perform your own virus 
checks on any attachment to this message. We cannot accept liability for any 
loss or damage caused by software viruses. The information contained in this 
communication may be confidential and may be subject to the attorney-client 
privilege.
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; 
Request ID: ???)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
  at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1700)
  at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.PutObjectCall.performCall(PutObjectCall.java:34)
  at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.PutObjectCall.performCall(PutObjectCall.java:9)
  at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.AbstractUploadingS3Call.perform(AbstractUploadingS3Call.java:62)
  at 
com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:80)
  at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
  at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.putObject(AmazonS3LiteClient.java:104)
  at 
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.storeEmptyFile(Jets3tNativeFileSystemStore.java:199)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 

Re: [Structured Streaming] Avoiding multiple streaming queries

2018-02-13 Thread dcam
Hi Priyank

I have a similar structure, although I am reading from Kafka and sinking to
multiple MySQL tables. My input stream has multiple message types and each
is headed for a different MySQL table.

I've looked for a solution for a few months, and have only come up with two
alternatives:

1. Since I'm already using a ForeachSink, because there is no native MySQL
sink, I could sink each batch to the different tables in one sink. But,
having only one spark job doing all the sinking seems like it will be
confusing, and the sink itself will be fairly complex.

2. The same as your second option: have one job sort through the stream and
persist the sorted stream to HDFS. Read the sorted streams in individual
jobs and sink in to the appropriate tables.

I haven't implemented it yet, but it seems to me that the code for 2 will be
simpler, and operationally things will be clearer. If a job fails, I have a
better understanding of what state it is in.

Reading Manning's Big Data book from Nathan Marz and James Warren has been
influencing how I structure Spark jobs recently. They don't shy away from
persisting intermediate data sets, and I am embracing that right now in my
thinking.

Cheers!
Dave



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Retrieve batch metadata via the spark monitoring api

2018-02-13 Thread Hendrik Dev
I use Spark 2.2.1 with streaming and when i open the Spark Streaming
UI i can see input metadata for each of my batches. In my case i
stream from Kafka and in the metadata section i find useful
informations about my topic, partitions and offsets.

Assume the url for this batch looks like
http://localhost:4040/streaming/batch/?id=156653011

When i now query spark with the monitoring API there seems to be no
way to retrieve the above mentioned metadata.

Have i overlooked something or is there currently with spark 2.2.1
really no way to retrieve the input metadata via the REST API?

Thx
Hendrik

-- 
Hendrik Saly (salyh, hendrikdev22)
@hendrikdev22
PGP: 0x22D7F6EC

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Run Multiple Spark jobs. Reduce Execution time.

2018-02-13 Thread akshay naidu
Hello,
I'm try to run multiple spark jobs on cluster running in yarn.
Master is 24GB server with 6 Slaves of 12GB

fairscheduler.xml settings are -

FAIR
10
2


I am running 8 jobs simultaneously , jobs are running parallelly but not
all.
at a time only 7 of then runs simultaneously while the 8th one is in queue
WAITING for a job to stop.

also, out of the 7 running jobs, 4 runs comparatively much faster than
remaining three (maybe resources are not distributed properly) .

I want to run n number of jobs at a time and make them run faster , Right
now, one job is taking more than three minutes while processing a max of
1GB data .

Kindly assist me. what am I missing.

Thanks.


Re: can udaf's return complex types?

2018-02-13 Thread Matteo Cossu
Hello,

yes, sure they can return complex types. For example, the functions
collect_list and collect_set return an ArrayType.

On 10 February 2018 at 14:28, kant kodali  wrote:

> Hi All,
>
> Can UDAF's return complex types? like say a Map with key as an Integer and
> the value as an Array of strings?
>
> For Example say I have the following *input dataframe*
>
> id | name | amount
> -
> 1 |  foo | 10
> 2 |  bar | 15
> 1 |  car | 20
> 1 |  bus | 20
>
> and my *target/output data frame* is
>
> id | my_sum_along_with_names
> -
> 1  | Map(key -> 50, value -> [foo, car, bus])
> 2  | Map(key -> 15, value -> [bar])
>
> I am looking for a UDAF solution so I can use it in my raw sql query.
>
> Thanks!
>
>
>
>
>
>
>
>
>


[Spark-Listener] [How-to] Listen only to specific events

2018-02-13 Thread Naved Alam
I have a spark application which creates multiple sessions. Each of these 
sessions can run jobs in parallel. I want to log some details about the 
execution of these jobs, but want to the tag them with the session they were 
called from.

I tried creating a listener from within each session (instantiations of the 
same class, but with session specific information), but whenever an event is 
emmited, it is intercepted by all the listeners (which makes sense since the 
listeners are attached to the shared context). Is there a way I can ensure that 
the events are only intercepted by the listener from the same session. If not, 
then is it possible to add any identifiers to the events which can be used to 
figure the session which triggered a job?

Thanks
Naved