[Spark Sql/ UDFs] Spark and Hive UDFs parity

2017-06-14 Thread RD
Hi Spark folks,

Is there any plan to support the richer UDF API that Hive supports for
Spark UDFs ? Hive supports the GenericUDF API which has, among others
methods like initialize(), configure() (called once on the cluster) etc,
which a lot of our users use. We have now a lot of UDFs in Hive which make
use of these methods. We plan to move to UDFs to Spark UDFs but are being
limited by not having similar lifecycle methods.
   Are there plans to address these? Or do people usually adopt some sort
of workaround?

   If we  directly use  the Hive UDFs  in Spark we pay a performance
penalty. I think Spark anyways does a conversion from InternalRow to Row
back to InternalRow for native spark udfs and for Hive it does InternalRow
to Hive Object back to InternalRow but somehow the conversion in native
udfs is more performant.

-Best,
R.


Assign Custom receiver to a scheduler pool

2017-06-14 Thread Rabin Banerjee



Re: [MLLib]: Executor OutOfMemory in BlockMatrix Multiplication

2017-06-14 Thread John Compitello
No problem. It was a big headache for my team as well. One of us already 
reimplemented it from scratch, as seen in this pending PR for our project. 
https://github.com/hail-is/hail/pull/1895

Hopefully you find that useful. We'll hopefully try to PR that into Spark at 
some point. 

Best,

John

Sent from my iPhone

> On Jun 14, 2017, at 8:28 PM, Anthony Thomas  wrote:
> 
> Interesting, thanks! That probably also explains why there seems to be a ton 
> of shuffle for this operation. So what's the best option for truly scalable 
> matrix multiplication on Spark then - implementing from scratch using the 
> coordinate matrix ((i,j), k) format?
> 
>> On Wed, Jun 14, 2017 at 4:29 PM, John Compitello  
>> wrote:
>> Hey Anthony,
>> 
>> You're the first person besides myself I've seen mention this. BlockMatrix 
>> multiply is not the best method. As far as me and my team can tell, the 
>> memory problem stems from the fact that when Spark tries to compute block 
>> (i, j) of the matrix, it tries to manifest all of row i from matrix 1 and 
>> all of column j from matrix 2 in memory at once on one executor. Then after 
>> doing that, it proceeds to combine them with a functional reduce, creating 
>> one additional block for each pair. So you end up manifesting 3n + logn 
>> matrix blocks in memory at once, which is why it sucks so much. 
>> 
>> Sent from my iPhone
>> 
>>> On Jun 14, 2017, at 7:07 PM, Anthony Thomas  wrote:
>>> 
>>> I've been experimenting with MlLib's BlockMatrix for distributed matrix 
>>> multiplication but consistently run into problems with executors being 
>>> killed due to memory constrains. The linked gist (here) has a short example 
>>> of multiplying a 25,000 x 25,000 square matrix taking approximately 5G of 
>>> disk with a vector (also stored as a BlockMatrix). I am running this on a 3 
>>> node (1 master + 2 workers) cluster on Amazon EMR using the m4.xlarge 
>>> instance type. Each instance has 16GB of RAM and 4 CPU. The gist has 
>>> detailed information about the Spark environment.
>>> 
>>> I have tried reducing the block size of the matrix, increasing the number 
>>> of partitions in the underlying RDD, increasing defaultParallelism and 
>>> increasing spark.yarn.executor.memoryOverhead (up to 3GB) - all without 
>>> success. The input matrix should fit comfortably in distributed memory and 
>>> the resulting matrix should be quite small (25,000 x 1) so I'm confused as 
>>> to why Spark seems to want so much memory for this operation, and why Spark 
>>> isn't spilling to disk here if it wants more memory. The job does 
>>> eventually complete successfully, but for larger matrices stages have to be 
>>> repeated several times which leads to long run times. I don't encounter any 
>>> issues if I reduce the matrix size down to about 3GB. Can anyone with 
>>> experience using MLLib's matrix operators provide any suggestions about 
>>> what settings to look at, or what the hard constraints on memory for 
>>> BlockMatrix multiplication are?
>>> 
>>> Thanks,
>>> 
>>> Anthony
> 


Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-14 Thread Michael Armbrust
This a good question. I really like using Kafka as a centralized source for
streaming data in an organization and, with Spark 2.2, we have full support
for reading and writing data to/from Kafka in both streaming and batch
.
I'll focus here on what I think the advantages are of Structured Streaming
over Kafka Streams (a stream processing library that reads from Kafka).

 - *High level productive APIs* - Streaming queries in Spark can be
expressed using DataFrames, Datasets or even plain SQL.  Streaming
DataFrames/SQL are supported in Scala, Java, Python and even R.  This means
that for common operations like filtering, joining, aggregating, you can
use built-in operations.  For complicated custom logic you can use UDFs and
lambda functions. In contrast, Kafka Streams mostly requires you to express
your transformations using lambda functions.
 - *High Performance* - Since it is built on Spark SQL, streaming queries
take advantage of the Catalyst optimizer and the Tungsten execution engine.
This design leads to huge performance wins
,
which means you need less hardware to accomplish the same job.
 - *Ecosystem* - Spark has connectors for working with all kinds of data
stored in a variety of systems.  This means you can join a stream with data
encoded in parquet and stored in S3/HDFS.  Perhaps more importantly, it
also means that if you decide that you don't want to manage a Kafka cluster
anymore and would rather use Kinesis, you can do that too.  We recently
moved a bunch of our pipelines from Kafka to Kinesis and had to only change
a few lines of code! I think its likely that in the future Spark will also
have connectors for Google's PubSub and Azure's streaming offerings.

Regarding latency, there has been a lot of discussion about the inherent
latencies of micro-batch.  Fortunately, we were very careful to leave
batching out of the user facing API, and as we demo'ed last week, this
makes it possible for the Spark Streaming to achieve sub-millisecond
latencies .  Watch SPARK-20928
 for more on this effort
to eliminate micro-batch from Spark's execution model.

At the far other end of the latency spectrum...  For those with jobs that
run in the cloud on data that arrives sporadically, you can run streaming
jobs that only execute every few hours or every few days, shutting the
cluster down in between.  This architecture can result in a huge cost
savings for some applications

.

Michael

On Sun, Jun 11, 2017 at 1:12 AM, kant kodali  wrote:

> Hi All,
>
> I am trying hard to figure out what is the real difference between Kafka
> Streaming vs Spark Streaming other than saying one can be used as part of
> Micro services (since Kafka streaming is just a library) and the other is a
> Standalone framework by itself.
>
> If I can accomplish same job one way or other this is a sort of a puzzling
> question for me so it would be great to know what Spark streaming can do
> that Kafka Streaming cannot do efficiently or whatever ?
>
> Thanks!
>
>


Re: [MLLib]: Executor OutOfMemory in BlockMatrix Multiplication

2017-06-14 Thread Anthony Thomas
Interesting, thanks! That probably also explains why there seems to be a
ton of shuffle for this operation. So what's the best option for truly
scalable matrix multiplication on Spark then - implementing from scratch
using the coordinate matrix ((i,j), k) format?

On Wed, Jun 14, 2017 at 4:29 PM, John Compitello 
wrote:

> Hey Anthony,
>
> You're the first person besides myself I've seen mention this. BlockMatrix
> multiply is not the best method. As far as me and my team can tell, the
> memory problem stems from the fact that when Spark tries to compute block
> (i, j) of the matrix, it tries to manifest all of row i from matrix 1 and
> all of column j from matrix 2 in memory at once on one executor. Then after
> doing that, it proceeds to combine them with a functional reduce, creating
> one additional block for each pair. So you end up manifesting 3n + logn
> matrix blocks in memory at once, which is why it sucks so much.
>
> Sent from my iPhone
>
> On Jun 14, 2017, at 7:07 PM, Anthony Thomas  wrote:
>
> I've been experimenting with MlLib's BlockMatrix for distributed matrix
> multiplication but consistently run into problems with executors being
> killed due to memory constrains. The linked gist (here
> ) has
> a short example of multiplying a 25,000 x 25,000 square matrix taking
> approximately 5G of disk with a vector (also stored as a BlockMatrix). I am
> running this on a 3 node (1 master + 2 workers) cluster on Amazon EMR using
> the m4.xlarge instance type. Each instance has 16GB of RAM and 4 CPU. The
> gist has detailed information about the Spark environment.
>
> I have tried reducing the block size of the matrix, increasing the number
> of partitions in the underlying RDD, increasing defaultParallelism and
> increasing spark.yarn.executor.memoryOverhead (up to 3GB) - all without
> success. The input matrix should fit comfortably in distributed memory and
> the resulting matrix should be quite small (25,000 x 1) so I'm confused as
> to why Spark seems to want so much memory for this operation, and why Spark
> isn't spilling to disk here if it wants more memory. The job does
> eventually complete successfully, but for larger matrices stages have to be
> repeated several times which leads to long run times. I don't encounter any
> issues if I reduce the matrix size down to about 3GB. Can anyone with
> experience using MLLib's matrix operators provide any suggestions about
> what settings to look at, or what the hard constraints on memory for
> BlockMatrix multiplication are?
>
> Thanks,
>
> Anthony
>
>


Create dataset from data frame with missing columns

2017-06-14 Thread tokeman24
Is it possible to concisely create a dataset from a dataframe with missing 
columns? Specifically, suppose I create a dataframe with:
val df: DataFrame  = Seq(("v1"),("v2")).toDF("f1")
 
Then, I have a case class for a dataset defined as:
case class CC(f1: String, f2: Option[String] = None)
 
I’d like to use df.as[CC] to get an instance of the case class, but this gives 
me the following error:
org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input 
columns: [f1];
 
Is there a concise way to use the default values as defined by the case class?

Warmest Regards,
Jason Tokayer, PhD
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [MLLib]: Executor OutOfMemory in BlockMatrix Multiplication

2017-06-14 Thread John Compitello
Hey Anthony,

You're the first person besides myself I've seen mention this. BlockMatrix 
multiply is not the best method. As far as me and my team can tell, the memory 
problem stems from the fact that when Spark tries to compute block (i, j) of 
the matrix, it tries to manifest all of row i from matrix 1 and all of column j 
from matrix 2 in memory at once on one executor. Then after doing that, it 
proceeds to combine them with a functional reduce, creating one additional 
block for each pair. So you end up manifesting 3n + logn matrix blocks in 
memory at once, which is why it sucks so much. 

Sent from my iPhone

> On Jun 14, 2017, at 7:07 PM, Anthony Thomas  wrote:
> 
> I've been experimenting with MlLib's BlockMatrix for distributed matrix 
> multiplication but consistently run into problems with executors being killed 
> due to memory constrains. The linked gist (here) has a short example of 
> multiplying a 25,000 x 25,000 square matrix taking approximately 5G of disk 
> with a vector (also stored as a BlockMatrix). I am running this on a 3 node 
> (1 master + 2 workers) cluster on Amazon EMR using the m4.xlarge instance 
> type. Each instance has 16GB of RAM and 4 CPU. The gist has detailed 
> information about the Spark environment.
> 
> I have tried reducing the block size of the matrix, increasing the number of 
> partitions in the underlying RDD, increasing defaultParallelism and 
> increasing spark.yarn.executor.memoryOverhead (up to 3GB) - all without 
> success. The input matrix should fit comfortably in distributed memory and 
> the resulting matrix should be quite small (25,000 x 1) so I'm confused as to 
> why Spark seems to want so much memory for this operation, and why Spark 
> isn't spilling to disk here if it wants more memory. The job does eventually 
> complete successfully, but for larger matrices stages have to be repeated 
> several times which leads to long run times. I don't encounter any issues if 
> I reduce the matrix size down to about 3GB. Can anyone with experience using 
> MLLib's matrix operators provide any suggestions about what settings to look 
> at, or what the hard constraints on memory for BlockMatrix multiplication are?
> 
> Thanks,
> 
> Anthony


Create dataset from dataframe with missing columns

2017-06-14 Thread Tokayer, Jason M.
Is it possible to concisely create a dataset from a dataframe with missing 
columns? Specifically, suppose I create a dataframe with:
val df: DataFrame  = Seq(("v1"),("v2")).toDF("f1")

Then, I have a case class for a dataset defined as:
case class CC(f1: String, f2: Option[String] = None)

I’d like to use df.as[CC] to get an instance of the case class, but this gives 
me the following error:
org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input 
columns: [f1];

Is there a concise way to use the default values as defined by the case class?



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Create dataset from dataframe with missing columns

2017-06-14 Thread Tokayer, Jason M.
Is it possible to concisely create a dataset from a dataframe with missing 
columns? Specifically, suppose I create a dataframe with:
val df: DataFrame  = Seq(("v1"),("v2")).toDF("f1")

Then, I have a case class for a dataset defined as:
case class CC(f1: String, f2: Option[String] = None)

I’d like to use df.as[CC] to get an instance of the case class, but this gives 
me the following error:
org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input 
columns: [f1];

Is there a concise way to use the default values as defined by the case class?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


[MLLib]: Executor OutOfMemory in BlockMatrix Multiplication

2017-06-14 Thread Anthony Thomas
I've been experimenting with MlLib's BlockMatrix for distributed matrix
multiplication but consistently run into problems with executors being
killed due to memory constrains. The linked gist (here
) has a
short example of multiplying a 25,000 x 25,000 square matrix taking
approximately 5G of disk with a vector (also stored as a BlockMatrix). I am
running this on a 3 node (1 master + 2 workers) cluster on Amazon EMR using
the m4.xlarge instance type. Each instance has 16GB of RAM and 4 CPU. The
gist has detailed information about the Spark environment.

I have tried reducing the block size of the matrix, increasing the number
of partitions in the underlying RDD, increasing defaultParallelism and
increasing spark.yarn.executor.memoryOverhead (up to 3GB) - all without
success. The input matrix should fit comfortably in distributed memory and
the resulting matrix should be quite small (25,000 x 1) so I'm confused as
to why Spark seems to want so much memory for this operation, and why Spark
isn't spilling to disk here if it wants more memory. The job does
eventually complete successfully, but for larger matrices stages have to be
repeated several times which leads to long run times. I don't encounter any
issues if I reduce the matrix size down to about 3GB. Can anyone with
experience using MLLib's matrix operators provide any suggestions about
what settings to look at, or what the hard constraints on memory for
BlockMatrix multiplication are?

Thanks,

Anthony


Re: Spark Streaming Design Suggestion

2017-06-14 Thread Shashi Vishwakarma
I agree Jorn and Satish. I think I should starting grouping similar kind of
messages into single topic with some kind of id attached to it which can be
pulled from spark streaming application.

I can try reducing no of topic to significant lower but still at the end I
can expect 50+ topics in cluster. Do you think creating parallel Dstream
will help here ?

Refer below link.

streaming-programming-guide.html


Thanks
Shashi


On Wed, Jun 14, 2017 at 8:12 AM, satish lalam 
wrote:

> Agree with Jörn. Dynamically creating/deleting Topics is nontrivial to
> manage.
> With the limited knowledge about your scenario - it appears that you are
> using topics as some kind of message type enum.
> If that is the case - you might be better off with one (or just a few
> topics) and have a messagetype field in kafka event itself.
> Your streaming job can then match-case incoming events on this field to
> choose the right processor for respective events.
>
> On Tue, Jun 13, 2017 at 1:47 PM, Jörn Franke  wrote:
>
>> I do not fully understand the design here.
>> Why not send all to one topic with some application id in the message and
>> you write to one topic also indicating the application id.
>>
>> Can you elaborate a little bit more on the use case?
>>
>> Especially applications deleting/creating topics dynamically can be a
>> nightmare to operate
>>
>> > On 13. Jun 2017, at 22:03, Shashi Vishwakarma 
>> wrote:
>> >
>> > Hi
>> >
>> > I have to design a spark streaming application with below use case. I
>> am looking for best possible approach for this.
>> >
>> > I have application which pushing data into 1000+ different topics each
>> has different purpose . Spark streaming will receive data from each topic
>> and after processing it will write back to corresponding another topic.
>> >
>> > Ex.
>> >
>> > Input Type 1 Topic  --> Spark Streaming --> Output Type 1 Topic
>> > Input Type 2 Topic  --> Spark Streaming --> Output Type 2 Topic
>> > Input Type 3 Topic  --> Spark Streaming --> Output Type 3 Topic
>> > .
>> > .
>> > .
>> > Input Type N Topic  --> Spark Streaming --> Output Type N Topic  and so
>> on.
>> >
>> > I need to answer following questions.
>> >
>> > 1. Is it a good idea to launch 1000+ spark streaming application per
>> topic basis ? Or I should have one streaming application for all topics as
>> processing logic going to be same ?
>> > 2. If one streaming context , then how will I determine which RDD
>> belongs to which Kafka topic , so that after processing I can write it back
>> to its corresponding OUTPUT Topic?
>> > 3. Client may add/delete topic from Kafka , how do dynamically handle
>> in Spark streaming ?
>> > 4. How do I restart job automatically on failure ?
>> >
>> > Any other issue you guys see here ?
>> >
>> > Highly appreicate your response.
>> >
>> > Thanks
>> > Shashi
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


CFP FOR SPARK SUMMIT EUROPE CLOSES FRIDAY

2017-06-14 Thread Scott walent
Share your skills and expertise with the Apache Spark community by speaking
at the upcoming Spark Summit Europe 2017 conference. The deadline to submit
session proposals is quickly approaching – submit your ideas by this
Friday, June 16 to be considered.

spark-summit.org/eu-2017-cfp

Taking place October 24-26 in Dublin, Ireland, Spark Summit Europe 2017
will bring together Apache Spark enthusiasts from around the world for
three days of in-depth training, amazing content, and valuable networking
opportunities. As we plan the agenda, we want to hear your ideas for talks
that will inspire the audience with useful tips, proven solutions,
real-world case studies and cutting-edge research that explore the future
of data science and machine learning.

Proposed tracks for the Dublin conference will include: AI & Machine
Learning, Data Science, Developer, Enterprise Applications, Spark
Ecosystem, Research and Streaming & Continuous Applications. There will
also be a Deep Dive track with 60-minute sessions for highly technical
topics.

Visit the Spark Summit Europe 2017 site to learn more and submit your
proposal. Act soon though! You only have until Friday, June 16th.


Configurable Task level time outs and task failures

2017-06-14 Thread AnilKumar B
Hi,

In some of the data science use cases like Predictions etc, we are using
Spark. Most of the times, we faced data skew ness issues and we have
distributed them using Murmur hashing or round robin assignment and fixed
skew ness issue across the partitions/tasks.

But still, some of the tasks are taking huge time due to it's logical flow
based on the nature of the data for a particular key. For our use cases, we
are OK, if we omit few tasks, if they cannot complete in certain amount of
time.

That's why we have implemented task level time outs and our job is still
successful, even some of the tasks, are not completed in defined time, with
this we are able to define the SLA's for our Spark applications.

Is there any mechanism from Spark framework to define the task level time
outs and making job successful even with x% of the tasks are successful.
(Where x can be configured)? And anyone faced such issues?


Thanks & Regards,
B Anil Kumar.


Re: Use SQL Script to Write Spark SQL Jobs

2017-06-14 Thread bo yang
Hi Nihed,

Interesting to see envelope. The idea is same there! Thanks for the sharing
:)

Best,
Bo


On Wed, Jun 14, 2017 at 12:22 AM, nihed mbarek  wrote:

> Hi
>
> I already saw a project with the same idea.
> https://github.com/cloudera-labs/envelope
>
> Regards,
>
> On Wed, 14 Jun 2017 at 04:32, bo yang  wrote:
>
>> Thanks Benjamin and Ayan for the feedback! You kind of represent two
>> group of people who need such script tool or not. Personally I find the
>> script is very useful for myself to write ETL pipelines and daily jobs.
>> Let's see whether there are other people interested in such project.
>>
>> Best,
>> Bo
>>
>>
>>
>>
>>
>> On Mon, Jun 12, 2017 at 11:26 PM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> IMHO, this approach is not very useful.
>>>
>>> Firstly, 2 use cases mentioned in the project page:
>>>
>>> 1. Simplify spark development - I think the only thing can be done there
>>> is to come up with some boilerplate function, which essentially will take a
>>> sql and come back with a temp table name and a corresponding DF (Remember
>>> the project targets structured data sources only, not streaming or RDD).
>>> Building another mini-DSL on top of already fairly elaborate spark API
>>> never appealed to me.
>>>
>>> 2. Business Analysts using Spark - single word answer is Notebooks. Take
>>> your pick - Jupyter, Zeppelin, Hue.
>>>
>>> The case of "Spark is for Developers", IMHO, stemmed to the
>>> packaging/building overhead of spark apps. For Python users, this barrier
>>> is considerably lower (And maybe that is why I do not see a prominent
>>> need).
>>>
>>> But I can imagine the pain of a SQL developer coming into a scala/java
>>> world. I came from a hardcore SQL/DWH environment where I used to write SQL
>>> and SQL only. So SBT or MVN are still not my friend. Maybe someday they
>>> will. But learned them hard way, just because the value of using spark can
>>> offset the pain long long way. So, I think there is a need of spending time
>>> with the environment to get comfortable with it. And maybe, just maybe,
>>> using Nifi in case you miss drag/drop features too much :)
>>>
>>> But, these are my 2c, and sincerely humble opinion, and I wish you all
>>> the luck for your project.
>>>
>>> On Tue, Jun 13, 2017 at 3:23 PM, Benjamin Kim 
>>> wrote:
>>>
 Hi Bo,

 +1 for your project. I come from the world of data warehouses, ETL, and
 reporting analytics. There are many individuals who do not know or want to
 do any coding. They are content with ANSI SQL and stick to it. ETL
 workflows are also done without any coding using a drag-and-drop user
 interface, such as Talend, SSIS, etc. There is a small amount of scripting
 involved but not too much. I looked at what you are trying to do, and I
 welcome it. This could open up Spark to the masses and shorten development
 times.

 Cheers,
 Ben


 On Jun 12, 2017, at 10:14 PM, bo yang  wrote:

 Hi Aakash,

 Thanks for your willing to help :) It will be great if I could get more
 feedback on my project. For example, is there any other people feeling the
 need of using a script to write Spark job easily? Also, I would explore
 whether it is possible that the Spark project takes some work to build such
 a script based high level DSL.

 Best,
 Bo


 On Mon, Jun 12, 2017 at 12:14 PM, Aakash Basu <
 aakash.spark@gmail.com> wrote:

> Hey,
>
> I work on Spark SQL and would pretty much be able to help you in this.
> Let me know your requirement.
>
> Thanks,
> Aakash.
>
> On 12-Jun-2017 11:00 AM, "bo yang"  wrote:
>
>> Hi Guys,
>>
>> I am writing a small open source project
>>  to use SQL Script to write
>> Spark Jobs. Want to see if there are other people interested to use or
>> contribute to this project.
>>
>> The project is called UberScriptQuery (https://github.com/uber/
>> uberscriptquery). Sorry for the dumb name to avoid conflict with
>> many other names (Spark is registered trademark, thus I could not use 
>> Spark
>> in my project name).
>>
>> In short, it is a high level SQL-like DSL (Domain Specific Language)
>> on top of Spark. People can use that DSL to write Spark jobs without
>> worrying about Spark internal details. Please check README
>>  in the project to get more
>> details.
>>
>> It will be great if I could get any feedback or suggestions!
>>
>> Best,
>> Bo
>>
>>


>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> 
>
>


Re: Java access to internal representation of DataTypes.DateType

2017-06-14 Thread Anton Kravchenko
I switched to java.sql.Date and converted milliseconds to days:

 while (it.hasNext()) {
Row irow = it.next();
long t_long =
irow.getAs("time_col").getTime()/(60*60*1000)))/24;
int t_int = toIntExact(t_long);
 }

Though if there is more efficient way to do it I would be happy to see that.
Anton

On Wed, Jun 14, 2017 at 12:42 AM, Kazuaki Ishizaki 
wrote:

> Does this code help you?
> https://github.com/apache/spark/blob/master/sql/core/
> src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java#L156-L194
>
> Kazuaki Ishizaki
>
>
>
> From:Anton Kravchenko 
> To:"user @spark" 
> Date:2017/06/14 01:16
> Subject:Java access to internal representation of
> DataTypes.DateType
> --
>
>
>
> How one would access to internal representation of DataTypes.DateType from
> Spark (2.0.1) Java API?
>
> From
> *https://github.com/apache/spark/blob/51b1c1551d3a7147403b9e821fcc7c8f57b4824c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala*
> 
> :
> "Internally, this is represented as the number of days from 1970-01-01."
>
>
>
>


Exception when accessing Spark Web UI in yarn-client mode

2017-06-14 Thread satishjohn
When I click on the ApplicationMaster URL I see this exception. 

500013 [qtp1921129349-1619] WARN  o.s.j.server.AbstractHttpConnection -
header full: java.lang.RuntimeException: Header>6144
500014 [qtp1921129349-1619] WARN  o.s.j.server.AbstractHttpConnection -

Re: Read Local File

2017-06-14 Thread Dirceu Semighini Filho
Hello Satish,
Thanks for your answer
*I guess you have already made sure that the paths for your file are
exactly the same on each of your nodes*
Yes
* I'd also check the perms on your path*
The files are all with the same permissions
*Believe the sample code you pasted is only for testing - and you are
already aware that a distributed count on a local file has no benefits.*
Have done this to force the file read.
spark.sqlContext.read.text("file:///pathToFile").count

* Typing the path explicitly resolved*
I'll try this, I'm not sure if I've tested the varargs path option.
*Alternately - if the file size is small, you could do spark-submit with a
--files option which will ship the file to every executor and is available
for all executors. *
Unfortunately I can't do this, because it's a continuous reading process

Thanks.



2017-06-14 4:35 GMT-03:00 satish lalam :

> I guess you have already made sure that the paths for your file are
> exactly the same on each of your nodes. I'd also check the perms on your
> path.
> Believe the sample code you pasted is only for testing - and you are
> already aware that a distributed count on a local file has no benefits.
> Once I ran into a similar issue while copy pasting file paths probably due
> to encoding issues on some text editors. I'd copied a hidden char at the
> end of the path from source file which made my file lookup fail, but the
> code looked perfectly alright. Typing the path explicitly resolved it. But
> this is a corner case.
>
> Alternately - if the file size is small, you could do spark-submit with a
> --files option which will ship the file to every executor and is available
> for all executors.
>
>
>
>
> On Tue, Jun 13, 2017 at 11:02 AM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hi all,
>> I'm trying to read a File from local filesystem, I'd 4 workstations 1
>> Master and 3 slaves, running with Ambari and Yarn with Spark version*
>> 2.1.1.2.6.1.0-129*
>>
>> The code that I'm trying to run is quite simple
>>
>> spark.sqlContext.read.text("file:///pathToFile").count
>>
>> I've copied the file in all 4 workstations and every time that I try to
>> run this I got the following exception:
>> 17/06/13 17:57:37 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID
>> 12, ip, executor 1): java.io.FileNotFoundException: File file:pathToFile
>> does not exist
>> It is possible the underlying files have been updated. You can explicitly
>> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command
>> in SQL or by recreating the Dataset/DataFrame involved.
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$
>> anon$1.nextIterator(FileScanRDD.scala:175)
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$
>> anon$1.hasNext(FileScanRDD.scala:109)
>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> eratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> eratedIterator.processNext(Unknown Source)
>> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(
>> BufferedRowIterator.java:43)
>> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfu
>> n$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.w
>> rite(BypassMergeSortShuffleWriter.java:126)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:96)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:53)
>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 17/06/13 17:57:37 ERROR TaskSetManager: Task 0 in stage 6.0 failed 4
>> times; aborting job
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 6.0 (TID 15, ip, executor 1): java.io.FileNotFoundException: File
>> file:file:pathToFile does not exist
>> It is possible the underlying files have been updated. You can explicitly
>> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command
>> in SQL or by recreating the Dataset/DataFrame involved.
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$
>> anon$1.nextIterator(FileScanRDD.scala:175)
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$
>> anon$1.hasNext(FileScanRDD.scala:109)
>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> eratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> 

Re: UDF percentile_approx

2017-06-14 Thread Andrés Ivaldi
Hello,
Riccardo I was able to make it run, the problem is that HiveContext doesn't
exists any more in Spark 2.0.2, as far I can see. But exists the method
enableHiveSupport to add the hive functionality to SparkSession. To enable
this the spark-hive_2.11 dependency is needed.

In the Spark API Docs this is not well explained, only says that SqlContext
and HiveContext are now part of SparkSession

"SparkSession is now the new entry point of Spark that replaces the old
SQLContext and HiveContext. Note that the old SQLContext and HiveContext
are kept for backward compatibility. A new catalog interface is accessible
from SparkSession - existing API on databases and tables access such as
listTables, createExternalTable, dropTempView, cacheTable are moved here."

I think would be a good idea document enableHiveSupport also.

Thanks,

On Wed, Jun 14, 2017 at 5:13 AM, Takeshi Yamamuro 
wrote:

> You can use the function w/o hive and you can try:
>
> scala> Seq(1.0, 8.0).toDF("a").selectExpr("percentile_approx(a,
> 0.5)").show
>
> ++
>
> |percentile_approx(a, CAST(0.5 AS DOUBLE), 1)|
>
> ++
>
> | 8.0|
>
> ++
>
>
> // maropu
>
>
>
> On Wed, Jun 14, 2017 at 5:04 PM, Riccardo Ferrari 
> wrote:
>
>> Hi Andres,
>>
>> I can't find the refrence, last time I searched for that I found that
>> 'percentile_approx' is only available via hive context. You should register
>> a temp table and use it from there.
>>
>> Best,
>>
>> On Tue, Jun 13, 2017 at 8:52 PM, Andrés Ivaldi 
>> wrote:
>>
>>> Hello, I`m trying to user percentile_approx  on my SQL query, but It's
>>> like spark context can´t find the function
>>>
>>> I'm using it like this
>>> import org.apache.spark.sql.functions._
>>> import org.apache.spark.sql.DataFrameStatFunctions
>>>
>>> val e = expr("percentile_approx(Cantidadcon0234514)")
>>> df.agg(e).show()
>>>
>>> and exception is
>>>
>>> org.apache.spark.sql.AnalysisException: Undefined function:
>>> 'percentile_approx'. This function is neither a registered temporary
>>> function nor a permanent function registered
>>>
>>> I've also tryid with callUDF
>>>
>>> Regards.
>>>
>>> --
>>> Ing. Ivaldi Andres
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
Ing. Ivaldi Andres


Re: UDF percentile_approx

2017-06-14 Thread Takeshi Yamamuro
You can use the function w/o hive and you can try:

scala> Seq(1.0, 8.0).toDF("a").selectExpr("percentile_approx(a, 0.5)").show

++

|percentile_approx(a, CAST(0.5 AS DOUBLE), 1)|

++

| 8.0|

++


// maropu



On Wed, Jun 14, 2017 at 5:04 PM, Riccardo Ferrari 
wrote:

> Hi Andres,
>
> I can't find the refrence, last time I searched for that I found that
> 'percentile_approx' is only available via hive context. You should register
> a temp table and use it from there.
>
> Best,
>
> On Tue, Jun 13, 2017 at 8:52 PM, Andrés Ivaldi  wrote:
>
>> Hello, I`m trying to user percentile_approx  on my SQL query, but It's
>> like spark context can´t find the function
>>
>> I'm using it like this
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.DataFrameStatFunctions
>>
>> val e = expr("percentile_approx(Cantidadcon0234514)")
>> df.agg(e).show()
>>
>> and exception is
>>
>> org.apache.spark.sql.AnalysisException: Undefined function:
>> 'percentile_approx'. This function is neither a registered temporary
>> function nor a permanent function registered
>>
>> I've also tryid with callUDF
>>
>> Regards.
>>
>> --
>> Ing. Ivaldi Andres
>>
>
>


-- 
---
Takeshi Yamamuro


Re: UDF percentile_approx

2017-06-14 Thread Riccardo Ferrari
Hi Andres,

I can't find the refrence, last time I searched for that I found that
'percentile_approx' is only available via hive context. You should register
a temp table and use it from there.

Best,

On Tue, Jun 13, 2017 at 8:52 PM, Andrés Ivaldi  wrote:

> Hello, I`m trying to user percentile_approx  on my SQL query, but It's
> like spark context can´t find the function
>
> I'm using it like this
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.DataFrameStatFunctions
>
> val e = expr("percentile_approx(Cantidadcon0234514)")
> df.agg(e).show()
>
> and exception is
>
> org.apache.spark.sql.AnalysisException: Undefined function:
> 'percentile_approx'. This function is neither a registered temporary
> function nor a permanent function registered
>
> I've also tryid with callUDF
>
> Regards.
>
> --
> Ing. Ivaldi Andres
>


Re: Java access to internal representation of DataTypes.DateType

2017-06-14 Thread Kazuaki Ishizaki
Does this code help you?
https://github.com/apache/spark/blob/master/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java#L156-L194

Kazuaki Ishizaki



From:   Anton Kravchenko 
To: "user @spark" 
Date:   2017/06/14 01:16
Subject:Java access to internal representation of 
DataTypes.DateType



How one would access to internal representation of DataTypes.DateType from 
Spark (2.0.1) Java API?

From 
https://github.com/apache/spark/blob/51b1c1551d3a7147403b9e821fcc7c8f57b4824c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
:
"Internally, this is represented as the number of days from 1970-01-01."






Re: Read Local File

2017-06-14 Thread satish lalam
I guess you have already made sure that the paths for your file are exactly
the same on each of your nodes. I'd also check the perms on your path.
Believe the sample code you pasted is only for testing - and you are
already aware that a distributed count on a local file has no benefits.
Once I ran into a similar issue while copy pasting file paths probably due
to encoding issues on some text editors. I'd copied a hidden char at the
end of the path from source file which made my file lookup fail, but the
code looked perfectly alright. Typing the path explicitly resolved it. But
this is a corner case.

Alternately - if the file size is small, you could do spark-submit with a
--files option which will ship the file to every executor and is available
for all executors.




On Tue, Jun 13, 2017 at 11:02 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi all,
> I'm trying to read a File from local filesystem, I'd 4 workstations 1
> Master and 3 slaves, running with Ambari and Yarn with Spark version*
> 2.1.1.2.6.1.0-129*
>
> The code that I'm trying to run is quite simple
>
> spark.sqlContext.read.text("file:///pathToFile").count
>
> I've copied the file in all 4 workstations and every time that I try to
> run this I got the following exception:
> 17/06/13 17:57:37 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 12,
> ip, executor 1): java.io.FileNotFoundException: File file:pathToFile does
> not exist
> It is possible the underlying files have been updated. You can explicitly
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command
> in SQL or by recreating the Dataset/DataFrame involved.
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:175)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:109)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(
> BypassMergeSortShuffleWriter.java:126)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> 17/06/13 17:57:37 ERROR TaskSetManager: Task 0 in stage 6.0 failed 4
> times; aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 6.0 (TID 15, ip, executor 1): java.io.FileNotFoundException: File
> file:file:pathToFile does not exist
> It is possible the underlying files have been updated. You can explicitly
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command
> in SQL or by recreating the Dataset/DataFrame involved.
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:175)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:109)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(
> BypassMergeSortShuffleWriter.java:126)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>   at 

Re: Use SQL Script to Write Spark SQL Jobs

2017-06-14 Thread nihed mbarek
Hi

I already saw a project with the same idea.
https://github.com/cloudera-labs/envelope

Regards,

On Wed, 14 Jun 2017 at 04:32, bo yang  wrote:

> Thanks Benjamin and Ayan for the feedback! You kind of represent two group
> of people who need such script tool or not. Personally I find the script is
> very useful for myself to write ETL pipelines and daily jobs. Let's see
> whether there are other people interested in such project.
>
> Best,
> Bo
>
>
>
>
>
> On Mon, Jun 12, 2017 at 11:26 PM, ayan guha  wrote:
>
>> Hi
>>
>> IMHO, this approach is not very useful.
>>
>> Firstly, 2 use cases mentioned in the project page:
>>
>> 1. Simplify spark development - I think the only thing can be done there
>> is to come up with some boilerplate function, which essentially will take a
>> sql and come back with a temp table name and a corresponding DF (Remember
>> the project targets structured data sources only, not streaming or RDD).
>> Building another mini-DSL on top of already fairly elaborate spark API
>> never appealed to me.
>>
>> 2. Business Analysts using Spark - single word answer is Notebooks. Take
>> your pick - Jupyter, Zeppelin, Hue.
>>
>> The case of "Spark is for Developers", IMHO, stemmed to the
>> packaging/building overhead of spark apps. For Python users, this barrier
>> is considerably lower (And maybe that is why I do not see a prominent
>> need).
>>
>> But I can imagine the pain of a SQL developer coming into a scala/java
>> world. I came from a hardcore SQL/DWH environment where I used to write SQL
>> and SQL only. So SBT or MVN are still not my friend. Maybe someday they
>> will. But learned them hard way, just because the value of using spark can
>> offset the pain long long way. So, I think there is a need of spending time
>> with the environment to get comfortable with it. And maybe, just maybe,
>> using Nifi in case you miss drag/drop features too much :)
>>
>> But, these are my 2c, and sincerely humble opinion, and I wish you all
>> the luck for your project.
>>
>> On Tue, Jun 13, 2017 at 3:23 PM, Benjamin Kim  wrote:
>>
>>> Hi Bo,
>>>
>>> +1 for your project. I come from the world of data warehouses, ETL, and
>>> reporting analytics. There are many individuals who do not know or want to
>>> do any coding. They are content with ANSI SQL and stick to it. ETL
>>> workflows are also done without any coding using a drag-and-drop user
>>> interface, such as Talend, SSIS, etc. There is a small amount of scripting
>>> involved but not too much. I looked at what you are trying to do, and I
>>> welcome it. This could open up Spark to the masses and shorten development
>>> times.
>>>
>>> Cheers,
>>> Ben
>>>
>>>
>>> On Jun 12, 2017, at 10:14 PM, bo yang  wrote:
>>>
>>> Hi Aakash,
>>>
>>> Thanks for your willing to help :) It will be great if I could get more
>>> feedback on my project. For example, is there any other people feeling the
>>> need of using a script to write Spark job easily? Also, I would explore
>>> whether it is possible that the Spark project takes some work to build such
>>> a script based high level DSL.
>>>
>>> Best,
>>> Bo
>>>
>>>
>>> On Mon, Jun 12, 2017 at 12:14 PM, Aakash Basu <
>>> aakash.spark@gmail.com> wrote:
>>>
 Hey,

 I work on Spark SQL and would pretty much be able to help you in this.
 Let me know your requirement.

 Thanks,
 Aakash.

 On 12-Jun-2017 11:00 AM, "bo yang"  wrote:

> Hi Guys,
>
> I am writing a small open source project
>  to use SQL Script to write
> Spark Jobs. Want to see if there are other people interested to use or
> contribute to this project.
>
> The project is called UberScriptQuery (
> https://github.com/uber/uberscriptquery). Sorry for the dumb name to
> avoid conflict with many other names (Spark is registered trademark, thus 
> I
> could not use Spark in my project name).
>
> In short, it is a high level SQL-like DSL (Domain Specific Language)
> on top of Spark. People can use that DSL to write Spark jobs without
> worrying about Spark internal details. Please check README
>  in the project to get more
> details.
>
> It will be great if I could get any feedback or suggestions!
>
> Best,
> Bo
>
>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
> --

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




Re: Spark Streaming Design Suggestion

2017-06-14 Thread satish lalam
Agree with Jörn. Dynamically creating/deleting Topics is nontrivial to
manage.
With the limited knowledge about your scenario - it appears that you are
using topics as some kind of message type enum.
If that is the case - you might be better off with one (or just a few
topics) and have a messagetype field in kafka event itself.
Your streaming job can then match-case incoming events on this field to
choose the right processor for respective events.

On Tue, Jun 13, 2017 at 1:47 PM, Jörn Franke  wrote:

> I do not fully understand the design here.
> Why not send all to one topic with some application id in the message and
> you write to one topic also indicating the application id.
>
> Can you elaborate a little bit more on the use case?
>
> Especially applications deleting/creating topics dynamically can be a
> nightmare to operate
>
> > On 13. Jun 2017, at 22:03, Shashi Vishwakarma 
> wrote:
> >
> > Hi
> >
> > I have to design a spark streaming application with below use case. I am
> looking for best possible approach for this.
> >
> > I have application which pushing data into 1000+ different topics each
> has different purpose . Spark streaming will receive data from each topic
> and after processing it will write back to corresponding another topic.
> >
> > Ex.
> >
> > Input Type 1 Topic  --> Spark Streaming --> Output Type 1 Topic
> > Input Type 2 Topic  --> Spark Streaming --> Output Type 2 Topic
> > Input Type 3 Topic  --> Spark Streaming --> Output Type 3 Topic
> > .
> > .
> > .
> > Input Type N Topic  --> Spark Streaming --> Output Type N Topic  and so
> on.
> >
> > I need to answer following questions.
> >
> > 1. Is it a good idea to launch 1000+ spark streaming application per
> topic basis ? Or I should have one streaming application for all topics as
> processing logic going to be same ?
> > 2. If one streaming context , then how will I determine which RDD
> belongs to which Kafka topic , so that after processing I can write it back
> to its corresponding OUTPUT Topic?
> > 3. Client may add/delete topic from Kafka , how do dynamically handle in
> Spark streaming ?
> > 4. How do I restart job automatically on failure ?
> >
> > Any other issue you guys see here ?
> >
> > Highly appreicate your response.
> >
> > Thanks
> > Shashi
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


having trouble using structured streaming with file sink (parquet)

2017-06-14 Thread AssafMendelson
Hi all,

I have recently started assessing structured streaming and ran into a little 
snag from the beginning.

Basically I wanted to read some data, do some basic aggregation and write the 
result to file:

import org.apache.spark.sql.functions.avg
import org.apache.spark.sql.streaming.ProcessingTime
val rawRecords = spark.readStream.schema(myschema).parquet("/mytest")
val q = rawRecords.withColumn("g",$"id" % 100).groupBy("g").agg(avg($"id"))
val res = q.writeStream.outputMode("complete").trigger(ProcessingTime("10 
seconds")).format("parquet").option("path", 
"/test2").option("checkpointLocation", "/mycheckpoint").start

The problem is that it tells me that parquet does not support the complete mode 
(or update for that matter).
So how would I do a streaming with aggregation to file?
In general, my goal is to have a single (slow) streaming process which would 
write some profile and then have a second streaming process which would load 
the current dataframe to be used in join (I would stop the second streaming 
process and reload the dataframe periodically).

Any help would be appreciated.

Thanks,
  Assaf.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/having-trouble-using-structured-streaming-with-file-sink-parquet-tp28760.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.