Interest in adding ability to request GPU's to the spark client?

2018-05-15 Thread Daniel Galvez
Hi all,

Is anyone here interested in adding the ability to request GPUs to Spark's
client (i.e, spark-submit)? As of now, Yarn 3.0's resource manager server
has the ability to schedule GPUs as resources via cgroups, but the Spark
client lacks an ability to request these.

The ability to guarantee GPU resources would be practically useful for my
organization. Right now, the only way to do that is to request the entire
memory (or all CPU's) on a node, which is very kludgey and wastes
resources, especially if your node has more than 1 GPU and your code was
written such that an executor can use only one GPU at a time.

I'm just not sure of a good way to make use of libraries like Databricks' Deep
Learning pipelines  for
GPU-heavy computation otherwise, unless you are luckily in an organization
which is able to virtualize computer nodes such that each node will have
only one GPU. Of course, I realize that many Databricks customers are using
Azure or AWS, which allow you to do this facilely. Is this what people
normally do in industry?

This is something I am interested in working on, unless others out there
have advice on why this is a bad idea.

Unfortunately, I am not familiar enough with Mesos and Kubernetes right now
to know how they schedule gpu resources and whether adding support for
requesting GPU's from them to the spark-submit client would be simple.

Daniel

-- 
Daniel Galvez
http://danielgalvez.me
https://github.com/galv


[structured-streaming][kafka] Will the Kafka readstream timeout after connections.max.idle.ms 540000 ms ?

2018-05-15 Thread karthikjay
Hi all,

We are running into a scenario where the structured streaming job is exiting
after a while specifically when the Kafka topic is not getting any data.
>From the job logs, I see this connections.max.idle.ms = 54. Does that
mean the spark readstream will close when it does not get data for 54
milliseconds ? If yes, how do I override this ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Continuous Processing mode behaves differently from Batch mode

2018-05-15 Thread Yuta Morisawa

Hi all

Now I am using Structured Streaming in Continuous Processing mode and I 
faced a odd problem.


My code is so simple that it is similar to the sample code on the 
documentation.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing


When I send the same text data ten times, for example 10 lines text, in 
Batch mode the result has 100 lines.


But in Continuous Processing mode the result has only 10 lines.
It appears duplicated lines are removed.

The difference of these two codes is only with or without trigger method.

Why these two code behave differently ?


--
Regard,
Yuta


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured Streaming, Reading and Updating a variable

2018-05-15 Thread Lalwani, Jayesh
Do you have a code sample, and detailed error message/exception to show?

From: Martin Engen 
Date: Tuesday, May 15, 2018 at 9:24 AM
To: "user@spark.apache.org" 
Subject: Structured Streaming, Reading and Updating a variable

Hello,

I'm working with Structured Streaming, and I need a method of keeping a running 
average based on last 24hours of data.
To help with this, I can use Exponential Smoothing, which means I really only 
need to store 1 value from a previous calculation into the new, and update this 
variable as calculations carry on.

Implementing this is a much bigger challenge then I ever imagined.


I've tried using Accumulators and to Query/Store data to Cassandra after every 
calculation. Both methods worked somewhat locally , but I don't seem to be able 
to use these in the Spark Worker Nodes,  as I get the error
"java.lang.NoClassDefFoundError: Could not initialize class error" both for the 
accumulator and the cassandra connection libary

How can you read/update a variable while doing calculations using Structured 
Streaming?

Thank you




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: [Arrow][Dremio]

2018-05-15 Thread Xavier Mehaut
thanks bryan for the answer

Envoyé de mon iPhone

> Le 15 mai 2018 à 19:06, Bryan Cutler  a écrit :
> 
> Hi Xavier,
> 
> Regarding Arrow usage in Spark, using Arrow format to transfer data between 
> Python and Java has been the focus so far because this area stood to benefit 
> the most.  It's possible that the scope of Arrow could broaden in the future, 
> but there still needs to be discussions about this.
> 
> Bryan
> 
>> On Mon, May 14, 2018 at 9:55 AM, Pierce Lamb  
>> wrote:
>> Hi Xavier,
>> 
>> Along the lines of connecting to multiple sources of data and replacing ETL 
>> tools you may want to check out Confluent's blog on building a real-time 
>> streaming ETL pipeline on Kafka as well as SnappyData's blog on Real-Time 
>> Streaming ETL with SnappyData where Spark is central to connecting to 
>> multiple data sources, executing SQL on streams etc. These should provide 
>> nice comparisons to your ideas about Dremio + Spark as ETL tools.
>> 
>> Disclaimer: I am a SnappyData employee
>> 
>> Hope this helps,
>> 
>> Pierce
>> 
>>> On Mon, May 14, 2018 at 2:24 AM, xmehaut  wrote:
>>> Hi Michaël,
>>> 
>>> I'm not an expert of Dremio, i just try to evaluate the potential of this
>>> techno and what impacts it could have on spark, and how they can work
>>> together, or how spark could use even further arrow internally along the
>>> existing algorithms.
>>> 
>>> Dremio has already a quite rich api set enabling to access for instance to
>>> metadata, sql queries, or even to create virtual datasets programmatically.
>>> They also have a lot of predefined functions, and I imagine there will be
>>> more an more fucntions in the future, eg machine learning functions like the
>>> ones we may find in azure sql server which enables to mix sql and ml
>>> functions.  Acces to dremio is made through jdbc, and we may imagine to
>>> access virtual datasets through spark and create dynamically new datasets
>>> from the api connected to parquets files stored dynamycally by spark on
>>> hdfs, azure datalake or s3... Of course a more thight integration between
>>> both should be better with a spark read/write connector to dremio :)
>>> 
>>> regards
>>> xavier
>>> 
>>> 
>>> 
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 
>> 
> 


java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes

2018-05-15 Thread Mina Aslani
Hi,

I am trying to test my spark app implemented in Java. In my spark app I
load the logisticRegressionModel that I have already created, trained and
tested using the portion of training data.

Now, when I test my spark app with another set of data and try to predict,
I get below error when trying to read the prediction result and see how it
looks after applied the transform on the new test data :

org.apache.spark.SparkException: Failed to execute user defined
function($anonfun$4: (vector) => double)

java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x:
Vector, y:Vector) was given Vectors with non-matching sizes


I know the cause as the new test data does not have the same vector size as
the trained model. However, I would like to know how I can resolve it? What
is the suggestion/workaround?

I really appreciate your quick response.

Best regards,
Mina


Re: [Arrow][Dremio]

2018-05-15 Thread Bryan Cutler
Hi Xavier,

Regarding Arrow usage in Spark, using Arrow format to transfer data between
Python and Java has been the focus so far because this area stood to
benefit the most.  It's possible that the scope of Arrow could broaden in
the future, but there still needs to be discussions about this.

Bryan

On Mon, May 14, 2018 at 9:55 AM, Pierce Lamb 
wrote:

> Hi Xavier,
>
> Along the lines of connecting to multiple sources of data and replacing
> ETL tools you may want to check out Confluent's blog on building a
> real-time streaming ETL pipeline on Kafka
> 
> as well as SnappyData's blog on Real-Time Streaming ETL with SnappyData
>  where
> Spark is central to connecting to multiple data sources, executing SQL on
> streams etc. These should provide nice comparisons to your ideas about
> Dremio + Spark as ETL tools.
>
> Disclaimer: I am a SnappyData employee
>
> Hope this helps,
>
> Pierce
>
> On Mon, May 14, 2018 at 2:24 AM, xmehaut  wrote:
>
>> Hi Michaël,
>>
>> I'm not an expert of Dremio, i just try to evaluate the potential of this
>> techno and what impacts it could have on spark, and how they can work
>> together, or how spark could use even further arrow internally along the
>> existing algorithms.
>>
>> Dremio has already a quite rich api set enabling to access for instance to
>> metadata, sql queries, or even to create virtual datasets
>> programmatically.
>> They also have a lot of predefined functions, and I imagine there will be
>> more an more fucntions in the future, eg machine learning functions like
>> the
>> ones we may find in azure sql server which enables to mix sql and ml
>> functions.  Acces to dremio is made through jdbc, and we may imagine to
>> access virtual datasets through spark and create dynamically new datasets
>> from the api connected to parquets files stored dynamycally by spark on
>> hdfs, azure datalake or s3... Of course a more thight integration between
>> both should be better with a spark read/write connector to dremio :)
>>
>> regards
>> xavier
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Sklearn model in pyspark prediction

2018-05-15 Thread HARSH TAKKAR
Hi,

Is there a way to load model saved using sklearn lib in pyspark/ scala
spark for prediction.


Thanks


Re: How to use StringIndexer for multiple input /output columns in Spark Java

2018-05-15 Thread Mina Aslani
Hi,

So, what is the workaround? Should I create multiple indexer(one for each
column), and then create pipeline and set stages to have all the
StringIndexers?
I am using 2.2.1 as I cannot move to 2.3.0. Looks like
oneHotEncoderEstimator is broken, please see my email sent today with
subject:
OneHotEncoderEstimator - java.lang.NoSuchMethodError: org.apache.spark.sql
.Dataset.withColumns

Regards,
Mina

On Tue, May 15, 2018 at 2:37 AM, Nick Pentreath 
wrote:

> Multi column support for StringIndexer didn’t make it into Spark 2.3.0
>
> The PR is still in progress I think - should be available in 2.4.0
>
> On Mon, 14 May 2018 at 22:32, Mina Aslani  wrote:
>
>> Please take a look at the api doc: https://spark.apache.org/
>> docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html
>>
>> On Mon, May 14, 2018 at 4:30 PM, Mina Aslani 
>> wrote:
>>
>>> Hi,
>>>
>>> There is no SetInputCols/SetOutputCols for StringIndexer in Spark java.
>>> How multiple input/output columns can be specified then?
>>>
>>> Regards,
>>> Mina
>>>
>>
>>


Re: Scala's Seq:* equivalent in java

2018-05-15 Thread Koert Kuipers
Isn't _* varargs? So you should be able to use Java array?

On Tue, May 15, 2018, 06:29 onmstester onmstester 
wrote:

> I could not find how to pass a list to isin() filter in java, something
> like this could be done with scala:
>
> val ids = Array(1,2)
> df.filter(df("id").isin(ids:_*)).show
>
> But in java everything that converts java list to scala Seq fails with
> unsupported literal type exception:
> JavaConversions.asScalaBuffer(list).toSeq()
>
> JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq().seq()
>
> Sent using Zoho Mail 
>
>
>
On May 15, 2018 06:29, "onmstester onmstester"  wrote:

I could not find how to pass a list to isin() filter in java, something
like this could be done with scala:

val ids = Array(1,2)
df.filter(df("id").isin(ids:_*)).show

But in java everything that converts java list to scala Seq fails with
unsupported literal type exception:
JavaConversions.asScalaBuffer(list).toSeq()
JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq().seq()

Sent using Zoho Mail 


Re: Spark Structured Streaming is giving error “org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;”

2018-05-15 Thread रविशंकर नायर
Hi Jacek,

If we use RDD instead of Dataframe, can we accomplish the same? I mean, is
joining  between RDDS allowed in Spark streaming ?

Best,
Ravi

On Sun, May 13, 2018 at 11:18 AM Jacek Laskowski  wrote:

> Hi,
>
> The exception message should be self-explanatory and says that you cannot
> join two streaming Datasets. This feature was added in 2.3 if I'm not
> mistaken.
>
> Just to be sure that you work with two streaming Datasets, can you show
> the query plan of the join query?
>
> Jacek
>
> On Sat, 12 May 2018, 16:57 ThomasThomas,  wrote:
>
>> Hi There,
>>
>> Our use case is like this.
>>
>> We have a nested(multiple) JSON message flowing through Kafka Queue.  Read
>> the message from Kafka using Spark Structured Streaming(SSS) and  explode
>> the data and flatten all data into single record using DataFrame joins and
>> land into a relational database table(DB2).
>>
>> But we are getting the following error when we write into db using JDBC.
>>
>> “org.apache.spark.sql.AnalysisException: Inner join between two streaming
>> DataFrames/Datasets is not supported;”
>>
>> Any help would be greatly appreciated.
>>
>> Thanks,
>> Thomas Thomas
>> Mastermind Solutions LLC.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Structured Streaming, Reading and Updating a variable

2018-05-15 Thread Koert Kuipers
You use a windowed aggregation for this

On Tue, May 15, 2018, 09:23 Martin Engen  wrote:

> Hello,
>
>
>
> I'm working with Structured Streaming, and I need a method of keeping a
> running average based on last 24hours of data.
>
> To help with this, I can use Exponential Smoothing, which means I really
> only need to store 1 value from a previous calculation into the new, and
> update this variable as calculations carry on.
>
>
>
> Implementing this is a much bigger challenge then I ever imagined.
>
>
>
>
>
> I've tried using Accumulators and to Query/Store data to Cassandra after
> every calculation. Both methods worked somewhat locally , but I don't seem
> to be able to use these in the Spark Worker Nodes,  as I get the error
> "java.lang.NoClassDefFoundError: Could not initialize class error" both
> for the accumulator and the cassandra connection libary
>
>
>
> How can you read/update a variable while doing calculations using
> Structured Streaming?
>
>
> Thank you
>
>
>
>


OneHotEncoderEstimator - java.lang.NoSuchMethodError: org.apache.spark.sql.Dataset.withColumns

2018-05-15 Thread Mina Aslani
Hi,

I get below error when I try to run oneHotEncoderEstimator example.
https://github.com/apache/spark/blob/b74366481cc87490adf4e69d26389ec737548c15/examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderEstimatorExample.java#L67

Which is this line of the code:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala#L348

Exception in thread "streaming-job-executor-0"
java.lang.NoSuchMethodError:
org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
at 
org.apache.spark.ml.feature.OneHotEncoderModel.transform(OneHotEncoderEstimator.scala:348)


Can you please let me know, what is the cause? Any workaround?

Seeing the example in the repo, looks like that at some point it used be
running fine. And, now it's not working. Also, oneHotEncoder is deprecated.

I really appreciate your quick response.

Regards,
Mina


Re: UDTF registration fails for hiveEnabled SQLContext

2018-05-15 Thread Mick Davies

I am trying to register a UDTF not a UDF.

So I don't think this applies

Mick



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Structured Streaming, Reading and Updating a variable

2018-05-15 Thread Martin Engen
Hello,

I'm working with Structured Streaming, and I need a method of keeping a running 
average based on last 24hours of data.
To help with this, I can use Exponential Smoothing, which means I really only 
need to store 1 value from a previous calculation into the new, and update this 
variable as calculations carry on.

Implementing this is a much bigger challenge then I ever imagined.


I've tried using Accumulators and to Query/Store data to Cassandra after every 
calculation. Both methods worked somewhat locally , but I don't seem to be able 
to use these in the Spark Worker Nodes,  as I get the error
"java.lang.NoClassDefFoundError: Could not initialize class error" both for the 
accumulator and the cassandra connection libary

How can you read/update a variable while doing calculations using Structured 
Streaming?

Thank you




Re: UDTF registration fails for hiveEnabled SQLContext

2018-05-15 Thread Ajay
You can register udf's by using the in-built udf function as well using (import
org.apache.spark.sql.functions._)
Something along the lines of
val flattenUdf = udf(udfutils.flatten)
where udfutils is another object and flatten is a method in it.

On Tue, May 15, 2018 at 3:27 AM Mick Davies 
wrote:

> Hi Gourav,
>
> I don't think you can register UDTFs via sparkSession.udf.register
>
> Mick
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Thanks,
Ajay


Spark structured streaming aggregation within microbatch

2018-05-15 Thread Koert Kuipers
I have a streaming dataframe where I insert a uuid in every row, then join
with a static dataframe (after which uuid column is no longer unique), then
group by uuid and do a simple aggregation.

So I know all rows with same uuid will be in same micro batch, guaranteed,
correct? How do I express it as such in structured streaming? I don't need
an aggregation across batches.

Thanks!


Scala's Seq:* equivalent in java

2018-05-15 Thread onmstester onmstester
I could not find how to pass a list to isin() filter in java, something like 
this could be done with scala:

val ids = Array(1,2) df.filter(df("id").isin(ids:_*)).show
But in java everything that converts java list to scala Seq fails with 
unsupported literal type exception:
JavaConversions.asScalaBuffer(list).toSeq()
JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq().seq()


Sent using Zoho Mail







Re: UDTF registration fails for hiveEnabled SQLContext

2018-05-15 Thread Mick Davies
Hi Gourav,

I don't think you can register UDTFs via sparkSession.udf.register

Mick



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



What to consider when implementing a custom streaming sink?

2018-05-15 Thread kant kodali
Hi All,

I am trying to implement a custom sink and I have few questions mainly on
output modes.

1) How does spark let the sink know that a new row is an update of an
existing row? does it look at all the values of all columns of the new row
and an existing row for an equality match or does it compute some sort of
hash?

2) What else do I need to consider when writing a custom sink?

Thanks!


Re: spark sql StackOverflow

2018-05-15 Thread Jörn Franke
3000 filters don’t look like something reasonable. This is very difficult to 
test and verify as well as impossible to maintain.
Could it be that your filters are another table that you should join with ?
The example is a little bit artificial to understand the underlying business 
case. Can you provide a more realistic example?

Maybe a bloom filter or something similar can make sense for you ?  Basically 
you want  to know if the key pair is in a given set of pairs?

> On 15. May 2018, at 11:48, Alessandro Solimando 
>  wrote:
> 
> From the information you provided I would tackle this as a batch problem, 
> because this way you have access to more sophisticated techniques and you 
> have more flexibility (maybe HDFS and a SparkJob, but also think about a 
> datastore offering good indexes for the kind of data types and values you 
> have for your keys, and benefit from filter push-downs).
> 
> I personally use streaming only when real-time ingestion is needed.
> 
> Hth,
> Alessandro
> 
>> On 15 May 2018 at 09:11, onmstester onmstester  wrote:
>> 
>> How many distinct key1 (resp. key2) values do you have? Are these values 
>> reasonably stable over time?
>> 
>> less than 10 thousands and this filters would change each 2-3 days. They 
>> would be written and loaded from a database
>> 
>> Are these records ingested in real-time or are they loaded from a datastore?
>> 
>> records would be loaded from some text files that would be copied in some 
>> directory over and over
>> 
>> Are you suggesting that i dont need to use spark-streaming?
>> Sent using Zoho Mail
>> 
>> 
>> 
>>  On Tue, 15 May 2018 11:26:42 +0430 Alessandro Solimando 
>>  wrote 
>> 
>> Hi,
>> I am not familiar with ATNConfigSet, but some thoughts that might help.
>> 
>> How many distinct key1 (resp. key2) values do you have? Are these values 
>> reasonably stable over time?
>> 
>> Are these records ingested in real-time or are they loaded from a datastore?
>> 
>> If the latter case the DB might be able to efficiently perform the 
>> filtering, especially if equipped with a proper index over key1/key2 (or a 
>> composite one).
>> 
>> In such case the filter push-down could be very effective (I didn't get if 
>> you just need to count or do something more with the matching record).
>> 
>> Alternatively, you could try to group by (key1,key2), and then filter (it 
>> again depends on the kind of output you have in mind).
>> 
>> If the datastore/stream is distributed and supports partitioning, you could 
>> partition your records by either key1 or key2 (or key1+key2), so they are 
>> already "separated" and can be consumed more efficiently (e.g., the groupby 
>> could then be local to a single partition).
>> 
>> Best regards,
>> Alessandro
>> 
>> On 15 May 2018 at 08:32, onmstester onmstester  wrote:
>> 
>> 
>> Hi, 
>> 
>> I need to run some queries on huge amount input records. Input rate for 
>> records are 100K/seconds.
>> A record is like (key1,key2,value) and the application should report 
>> occurances of kye1 = something && key2 == somethingElse.
>> The problem is there are too many filters in my query: more than 3 thousands 
>> pair of key1 and key2 should be filtered.
>> I was simply puting 1 millions of records in a temptable each time and 
>> running a query sql using spark-sql on temp table:
>> select * from mytemptable where (kye1 = something && key2 == somethingElse) 
>> or (kye1 = someOtherthing && key2 == someAnotherThing) or ...(3thousands 
>> or!!!)
>> And i encounter StackOverFlow at ATNConfigSet.java line 178.
>> 
>> So i have two options IMHO:
>> 1. Either put all key1 and key2 filter pairs in another temp table and do a 
>> join between  two temp table
>> 2. Or use spark-stream that i'm not familiar with and i don't know if it 
>> could handle 3K of filters.
>> 
>> Which way do you suggest? what is the best solution for my problem 
>> 'performance-wise'?
>> 
>> Thanks in advance
>> 
>> 
> 


Re: spark sql StackOverflow

2018-05-15 Thread Alessandro Solimando
>From the information you provided I would tackle this as a batch problem,
because this way you have access to more sophisticated techniques and you
have more flexibility (maybe HDFS and a SparkJob, but also think about a
datastore offering good indexes for the kind of data types and values you
have for your keys, and benefit from filter push-downs).

I personally use streaming only when real-time ingestion is needed.

Hth,
Alessandro

On 15 May 2018 at 09:11, onmstester onmstester  wrote:

>
> How many distinct key1 (resp. key2) values do you have? Are these values
> reasonably stable over time?
>
> less than 10 thousands and this filters would change each 2-3 days. They
> would be written and loaded from a database
>
> Are these records ingested in real-time or are they loaded from a
> datastore?
>
>
> records would be loaded from some text files that would be copied in some
> directory over and over
>
> Are you suggesting that i dont need to use spark-streaming?
>
> Sent using Zoho Mail 
>
>
>  On Tue, 15 May 2018 11:26:42 +0430 *Alessandro Solimando
> >* wrote
> 
>
> Hi,
> I am not familiar with ATNConfigSet, but some thoughts that might help.
>
> How many distinct key1 (resp. key2) values do you have? Are these values
> reasonably stable over time?
>
> Are these records ingested in real-time or are they loaded from a
> datastore?
>
> If the latter case the DB might be able to efficiently perform the
> filtering, especially if equipped with a proper index over key1/key2 (or a
> composite one).
>
> In such case the filter push-down could be very effective (I didn't get if
> you just need to count or do something more with the matching record).
>
> Alternatively, you could try to group by (key1,key2), and then filter (it
> again depends on the kind of output you have in mind).
>
> If the datastore/stream is distributed and supports partitioning, you
> could partition your records by either key1 or key2 (or key1+key2), so they
> are already "separated" and can be consumed more efficiently (e.g., the
> groupby could then be local to a single partition).
>
> Best regards,
> Alessandro
>
> On 15 May 2018 at 08:32, onmstester onmstester 
> wrote:
>
>
> Hi,
>
> I need to run some queries on huge amount input records. Input rate for
> records are 100K/seconds.
> A record is like (key1,key2,value) and the application should report
> occurances of kye1 = something && key2 == somethingElse.
> The problem is there are too many filters in my query: more than 3
> thousands pair of key1 and key2 should be filtered.
> I was simply puting 1 millions of records in a temptable each time and
> running a query sql using spark-sql on temp table:
> select * from mytemptable where (kye1 = something && key2 ==
> somethingElse) or (kye1 = someOtherthing && key2 == someAnotherThing) or
> ...(3thousands or!!!)
> And i encounter StackOverFlow at ATNConfigSet.java line 178.
>
> So i have two options IMHO:
> 1. Either put all key1 and key2 filter pairs in another temp table and do
> a join between  two temp table
> 2. Or use spark-stream that i'm not familiar with and i don't know if it
> could handle 3K of filters.
>
> Which way do you suggest? what is the best solution for my problem
> 'performance-wise'?
>
> Thanks in advance
>
>
>
>


Spark streaming with kafka input stuck in (Re-)joing group because of group rebalancing

2018-05-15 Thread JF Chen
When I terminate a spark streaming application and restart it, it always
stuck in this step:
>
> Revoking previously assigned partitions [] for group [mygroup]
> (Re-)joing group [mygroup]


If I use a new group id, even though it works fine, I may lose the data
from the last time I read the previous group id.

So how to solve it?


Regard,
Junfeng Chen


Re: spark sql StackOverflow

2018-05-15 Thread Alessandro Solimando
Hi,
I am not familiar with ATNConfigSet, but some thoughts that might help.

How many distinct key1 (resp. key2) values do you have? Are these values
reasonably stable over time?

Are these records ingested in real-time or are they loaded from a datastore?

If the latter case the DB might be able to efficiently perform the
filtering, especially if equipped with a proper index over key1/key2 (or a
composite one).

In such case the filter push-down could be very effective (I didn't get if
you just need to count or do something more with the matching record).

Alternatively, you could try to group by (key1,key2), and then filter (it
again depends on the kind of output you have in mind).

If the datastore/stream is distributed and supports partitioning, you could
partition your records by either key1 or key2 (or key1+key2), so they are
already "separated" and can be consumed more efficiently (e.g., the groupby
could then be local to a single partition).

Best regards,
Alessandro

On 15 May 2018 at 08:32, onmstester onmstester  wrote:

> Hi,
>
> I need to run some queries on huge amount input records. Input rate for
> records are 100K/seconds.
> A record is like (key1,key2,value) and the application should report
> occurances of kye1 = something && key2 == somethingElse.
> The problem is there are too many filters in my query: more than 3
> thousands pair of key1 and key2 should be filtered.
> I was simply puting 1 millions of records in a temptable each time and
> running a query sql using spark-sql on temp table:
> select * from mytemptable where (kye1 = something && key2 ==
> somethingElse) or (kye1 = someOtherthing && key2 == someAnotherThing) or
> ...(3thousands or!!!)
> And i encounter StackOverFlow at ATNConfigSet.java line 178.
>
> So i have two options IMHO:
> 1. Either put all key1 and key2 filter pairs in another temp table and do
> a join between  two temp table
> 2. Or use spark-stream that i'm not familiar with and i don't know if it
> could handle 3K of filters.
>
> Which way do you suggest? what is the best solution for my problem
> 'performance-wise'?
>
> Thanks in advance
>
>


Re: How to use StringIndexer for multiple input /output columns in Spark Java

2018-05-15 Thread Nick Pentreath
Multi column support for StringIndexer didn’t make it into Spark 2.3.0

The PR is still in progress I think - should be available in 2.4.0

On Mon, 14 May 2018 at 22:32, Mina Aslani  wrote:

> Please take a look at the api doc:
> https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html
>
> On Mon, May 14, 2018 at 4:30 PM, Mina Aslani  wrote:
>
>> Hi,
>>
>> There is no SetInputCols/SetOutputCols for StringIndexer in Spark java.
>> How multiple input/output columns can be specified then?
>>
>> Regards,
>> Mina
>>
>
>


spark sql StackOverflow

2018-05-15 Thread onmstester onmstester
Hi, 



I need to run some queries on huge amount input records. Input rate for records 
are 100K/seconds.

A record is like (key1,key2,value) and the application should report occurances 
of kye1 = something  key2 == somethingElse.

The problem is there are too many filters in my query: more than 3 thousands 
pair of key1 and key2 should be filtered.

I was simply puting 1 millions of records in a temptable each time and running 
a query sql using spark-sql on temp table:

select * from mytemptable where (kye1 = something  key2 == 
somethingElse) or (kye1 = someOtherthing  key2 == someAnotherThing) 
or ...(3thousands or!!!)

And i encounter StackOverFlow at ATNConfigSet.java line 178.



So i have two options IMHO:

1. Either put all key1 and key2 filter pairs in another temp table and do a 
join between  two temp table

2. Or use spark-stream that i'm not familiar with and i don't know if it could 
handle 3K of filters.



Which way do you suggest? what is the best solution for my problem 
'performance-wise'?



Thanks in advance