Re: Pyspark Structured Streaming Error

2018-07-12 Thread Arbab Khalil
Remove the kafka-clients package and add starting offset to options.
df = spark.readStream\
.format("kafka")\
.option("zookeeper.connect", "localhost:2181")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "ingest")\
.option("failOnDataLoss", "false")\
.option("startingOffsets", "latest")\
.load()

Try this out, hope it would solve your problem.

On Thu, Jul 12, 2018 at 11:53 PM, umargeek 
wrote:

> Hi All,
>
> I am trying to test structured streaming using pyspark mentioned below
> spark
> submit commands and packages used
> *
> pyspark2 --master=yarn --packages
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --packages
> org.apache.kafka:kafka-clients:0.10.0.1*
>
>  but getting following error (in bold),
>
>
>
> sing Python version 3.6.5 (default, Apr 10 2018 17:08:37)
> SparkSession available as 'spark'.
> >>> df = spark.readStream.format("kafka").option("kafka.
> bootstrap.servers",
> >>> "pds1:9092,pds2:9092,pds3:9092").option("subscribe", "ingest").load()
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.
> p0.316101/lib/spark2/python/pyspark/sql/streaming.py",
> line 403, in load
> return self._df(self._jreader.load())
>   File
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.
> p0.316101/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
> line 1160, in __call__
>   File
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.
> p0.316101/lib/spark2/python/pyspark/sql/utils.py",
> line 63, in deco
> return f(*a, **kw)
>   File
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.
> p0.316101/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py",
> line 320, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o64.load.
> *: java.lang.ClassNotFoundException: Failed to find data source: kafka.
> Please find packages at http://spark.apache.org/third-party-projects.html*
> at
> org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(
> DataSource.scala:635)
> at
> org.apache.spark.sql.streaming.DataStreamReader.
> load(DataStreamReader.scala:159)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(
> ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.
> java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$
> anonfun$apply$15.apply(DataSource.scala:618)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$
> anonfun$apply$15.apply(DataSource.scala:618)
> at scala.util.Try$.apply(Try.scala:192)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(
> DataSource.scala:618)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(
> DataSource.scala:618)
> at scala.util.Try.orElse(Try.scala:84)
> at
> org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(
> DataSource.scala:618)
> ... 12 more
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Regards,
Arbab Khalil
Software Design Engineer


Re: How to avoid duplicate column names after join with multiple conditions

2018-07-12 Thread Prem Sure
Yes Nirav, we can probably request dev for a config param enablement to
take care of this automatically (internally) - additional care required
while specifying column names and joining from users

Thanks,
Prem

On Thu, Jul 12, 2018 at 10:53 PM Nirav Patel  wrote:

> Hi Prem, dropping column, renaming column are working for me as a
> workaround. I thought it just nice to have generic api that can handle that
> for me. or some intelligence that since both columns are same it shouldn't
> complain in subsequent Select clause that it doesn't know if I mean a#12 or
> a#81. They are both same just pick one.
>
> On Thu, Jul 12, 2018 at 9:38 AM, Prem Sure  wrote:
>
>> Hi Nirav, did you try
>> .drop(df1(a) after join
>>
>> Thanks,
>> Prem
>>
>> On Thu, Jul 12, 2018 at 9:50 PM Nirav Patel 
>> wrote:
>>
>>> Hi Vamshi,
>>>
>>> That api is very restricted and not generic enough. It imposes that all
>>> conditions of joins has to have same column on both side and it also has to
>>> be equijoin. It doesn't serve my usecase where some join predicates don't
>>> have same column names.
>>>
>>> Thanks
>>>
>>> On Sun, Jul 8, 2018 at 7:39 PM, Vamshi Talla 
>>> wrote:
>>>
 Nirav,

 Spark does not create a duplicate column when you use the below join
 expression,  as an array of column(s) like below but that requires the
 column name to be same in both the data frames.

 Example: *df1.join(df2, [‘a’])*

 Thanks.
 Vamshi Talla

 On Jul 6, 2018, at 4:47 PM, Gokula Krishnan D 
 wrote:

 Nirav,

 withColumnRenamed() API might help but it does not different column and
 renames all the occurrences of the given column. either use select() API
 and rename as you want.



 Thanks & Regards,
 Gokula Krishnan* (Gokul)*

 On Mon, Jul 2, 2018 at 5:52 PM, Nirav Patel 
 wrote:

> Expr is `df1(a) === df2(a) and df1(b) === df2(c)`
>
> How to avoid duplicate column 'a' in result? I don't see any api that
> combines both. Rename manually?
>
>
>
> [image: What's New with Xactly]
> 
>
>
> 
>
> 
>
> 
>
> 
>
> 




>>>
>>>
>>>
>>> [image: What's New with Xactly] 
>>>
>>> 
>>> 
>>>    
>>> 
>>
>>
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 


Re: [Structured Streaming] Custom StateStoreProvider

2018-07-12 Thread Jungtaek Lim
Girish,

I think reading through implementation of HDFSBackedStateStoreProvider as
well as relevant traits should bring the idea to you how to implement
custom one. HDFSBackedStateStoreProvider is not that complicated to read
and understand. You just need to deal with your underlying storage engine.

Tathagata,

Is it planned to turn StateStore and relevant traits into public API? We
have two annotations (InterfaceStability and Experimental) to represent
evolving public API, and state store provider can be plugged-in so sounds
better to make it being public API but marking as evolving.

2018년 7월 11일 (수) 오후 12:40, Tathagata Das 님이 작성:

> Note that this is not public API yet. Hence this is not very documented.
> So use it at your own risk :)
>
> On Tue, Jul 10, 2018 at 11:04 AM, subramgr 
> wrote:
>
>> Hi,
>>
>> This looks very daunting *trait* is there some blog post or some articles
>> which explains on how to implement this *trait*
>>
>> Thanks
>> Girish
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Reading multiple files in Spark / which pattern to use

2018-07-12 Thread Marco Mistroni
hi all
 i have mutliple files stored in S3 in the following pattern

-MM-DD-securities.txt

I want to read multiple files at the same time..
I am attempting to use this pattern, for example

2016-01*securities.txt,2016-02*securities.txt,2016-03*securities.txt

But it does not seem to work
Could anyone help out?

kind regards
 marco


Re: How to validate orc vectorization is working within spark application?

2018-07-12 Thread umargeek
Hello Jorn,

I am unable to post the entire code due to some data sharing related issues.

Use Case: I am performing aggregations after reading data from HDFS file
every min would like to understand how to perform using vectorisation 
enabled and what are pre requisite to successfully to enable the same.

Thanks,
Umar



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Upgrading spark history server, no logs showing.

2018-07-12 Thread bbarks
Hi, 

We have multiple installations of spark installed on our clusters. They
reside in different directories which the jobs point to when they run.

For a couple of years now, we've run our history server off spark 2.0.2. We
have 2.1.2, 2.2.1 and 2.3.0 installed as well. I've tried upgrading to run
the server out of the 2.3.0 install. The UI loads, but will not show logs.

For fun, I then tried 2.2.1, same deal. However, when I ran 2.1.2, it works
(albeit with a JS error about missing data in some table cell or row).

Are there any special steps to upgrading the history server between spark
versions? I've combed over settings multiple times, it all seems fine.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pyspark Structured Streaming Error

2018-07-12 Thread umargeek
Hi All,

I am trying to test structured streaming using pyspark mentioned below spark
submit commands and packages used
*
pyspark2 --master=yarn --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --packages
org.apache.kafka:kafka-clients:0.10.0.1*

 but getting following error (in bold),



sing Python version 3.6.5 (default, Apr 10 2018 17:08:37)
SparkSession available as 'spark'.
>>> df = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
>>> "pds1:9092,pds2:9092,pds3:9092").option("subscribe", "ingest").load()
Traceback (most recent call last):
  File "", line 1, in 
  File
"/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/streaming.py",
line 403, in load
return self._df(self._jreader.load())
  File
"/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
line 1160, in __call__
  File
"/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/sql/utils.py",
line 63, in deco
return f(*a, **kw)
  File
"/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py",
line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o64.load.
*: java.lang.ClassNotFoundException: Failed to find data source: kafka.
Please find packages at http://spark.apache.org/third-party-projects.html*
at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at scala.util.Try$.apply(Try.scala:192)
at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at scala.util.Try.orElse(Try.scala:84)
at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
... 12 more



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark ML online serving

2018-07-12 Thread Maximiliano Felice
Hi!

I know I'm late, but just to point some highlights of our usecase. We
currently:


   - Use Spark as an ETL tool, followed by
   - a Python (numpy/pandas based) pipeline to preprocess information and
   - use Tensorflow for training our Neural Networks


What we'd love to, and why we don't:


   - Start using Spark for our full preprocessing pipeline. Because type
   safety. And distributed computation. And catalyst. Buy mainly because
   *not-python.*
   Our main issue:
  - We want to use the same code for online serving. We're not willing
  to duplicate the preprocessing operations. Spark is not
  *serving-friendly*.
  - If we want it to preprocess online, we need to copy/paste our
  custom transformations to MLeap.
  - It's an issue to communicate with a Tensorflow API to give it the
  preprocessed data to serve.
   - Use Spark to do hyperparameter tunning.
   We'd need:
  - GPU Integration with Spark, letting us achieve finer tuning.
  - Better TensorFlow integration


Would love to know other usecases, and if others relate to the same issues
than us.

El mié., 6 jun. 2018 a las 21:10, Holden Karau ()
escribió:

> At Spark Summit some folks were talking about model serving and we wanted
> to collect requirements from the community.
> --
> Twitter: https://twitter.com/holdenkarau
>


Re: Interest in adding ability to request GPU's to the spark client?

2018-07-12 Thread Mich Talebzadeh
I agree.

Adding GPU capability to Spark in my opinion is a must for Advanced
Analytics.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 12 Jul 2018 at 19:14, Maximiliano Felice <
maximilianofel...@gmail.com> wrote:

> Hi,
>
> I've been meaning to reply to this email for a while now, sorry for taking
> so much time.
>
> I personally think that adding GPU resource management will allow us to
> boost some ETL performance a lot. For the last year, I've worked in
> transforming some Machine Learning pipelines from Python in Numpy/Pandas to
> Spark. Adding GPU capabilities to Spark would:
>
>
>- Accelerate many matrix and batch computations we currently have in
>Tensorflow
>- Allow us to use spark for the whole pipeline (combined with possibly
>better online serving)
>- Let us trigger better Hyperparameter selection directly from Spark
>
>
> There will be many more aspects of this that we could explode. What do the
> rest of the list think?
>
> See you
>
> El mié., 16 may. 2018 a las 2:58, Daniel Galvez ()
> escribió:
>
>> Hi all,
>>
>> Is anyone here interested in adding the ability to request GPUs to
>> Spark's client (i.e, spark-submit)? As of now, Yarn 3.0's resource manager
>> server has the ability to schedule GPUs as resources via cgroups, but the
>> Spark client lacks an ability to request these.
>>
>> The ability to guarantee GPU resources would be practically useful for my
>> organization. Right now, the only way to do that is to request the entire
>> memory (or all CPU's) on a node, which is very kludgey and wastes
>> resources, especially if your node has more than 1 GPU and your code was
>> written such that an executor can use only one GPU at a time.
>>
>> I'm just not sure of a good way to make use of libraries like Databricks' 
>> Deep
>> Learning pipelines 
>> for GPU-heavy computation otherwise, unless you are luckily in an
>> organization which is able to virtualize computer nodes such that each node
>> will have only one GPU. Of course, I realize that many Databricks customers
>> are using Azure or AWS, which allow you to do this facilely. Is this what
>> people normally do in industry?
>>
>> This is something I am interested in working on, unless others out there
>> have advice on why this is a bad idea.
>>
>> Unfortunately, I am not familiar enough with Mesos and Kubernetes right
>> now to know how they schedule gpu resources and whether adding support for
>> requesting GPU's from them to the spark-submit client would be simple.
>>
>> Daniel
>>
>> --
>> Daniel Galvez
>> http://danielgalvez.me
>> https://github.com/galv
>>
>


Re: Interest in adding ability to request GPU's to the spark client?

2018-07-12 Thread Maximiliano Felice
Hi,

I've been meaning to reply to this email for a while now, sorry for taking
so much time.

I personally think that adding GPU resource management will allow us to
boost some ETL performance a lot. For the last year, I've worked in
transforming some Machine Learning pipelines from Python in Numpy/Pandas to
Spark. Adding GPU capabilities to Spark would:


   - Accelerate many matrix and batch computations we currently have in
   Tensorflow
   - Allow us to use spark for the whole pipeline (combined with possibly
   better online serving)
   - Let us trigger better Hyperparameter selection directly from Spark


There will be many more aspects of this that we could explode. What do the
rest of the list think?

See you

El mié., 16 may. 2018 a las 2:58, Daniel Galvez ()
escribió:

> Hi all,
>
> Is anyone here interested in adding the ability to request GPUs to Spark's
> client (i.e, spark-submit)? As of now, Yarn 3.0's resource manager server
> has the ability to schedule GPUs as resources via cgroups, but the Spark
> client lacks an ability to request these.
>
> The ability to guarantee GPU resources would be practically useful for my
> organization. Right now, the only way to do that is to request the entire
> memory (or all CPU's) on a node, which is very kludgey and wastes
> resources, especially if your node has more than 1 GPU and your code was
> written such that an executor can use only one GPU at a time.
>
> I'm just not sure of a good way to make use of libraries like Databricks' Deep
> Learning pipelines 
> for GPU-heavy computation otherwise, unless you are luckily in an
> organization which is able to virtualize computer nodes such that each node
> will have only one GPU. Of course, I realize that many Databricks customers
> are using Azure or AWS, which allow you to do this facilely. Is this what
> people normally do in industry?
>
> This is something I am interested in working on, unless others out there
> have advice on why this is a bad idea.
>
> Unfortunately, I am not familiar enough with Mesos and Kubernetes right
> now to know how they schedule gpu resources and whether adding support for
> requesting GPU's from them to the spark-submit client would be simple.
>
> Daniel
>
> --
> Daniel Galvez
> http://danielgalvez.me
> https://github.com/galv
>


Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-12 Thread Arun Mahadevan
What I meant was the number of partitions cannot be varied with ForeachWriter 
v/s if you were to write to each sink using independent queries. Maybe this is 
obvious.

I am not sure about the difference you highlight about the performance part. 
The commit happens once per micro batch and "close(null)" is invoked. You can 
batch your writes in the process and/or in the close. The guess the writes can 
still be atomic and decided by if “close” returns successfully or throws an 
exception.

Thanks,
Arun

From:  chandan prakash 
Date:  Thursday, July 12, 2018 at 10:37 AM
To:  Arun Iyer 
Cc:  Tathagata Das , "ymaha...@snappydata.io" 
, "priy...@asperasoft.com" , 
"user @spark" 
Subject:  Re: [Structured Streaming] Avoiding multiple streaming queries

Thanks a lot Arun for your response. 
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for the 
sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because 
write will happen to a sink per record basis (after deciding a record belongs 
to which particular sink), where as in the current implementation all data 
under a RDD partition gets committed to the sink atomically in one go. Please 
correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan  wrote:
Yes ForeachWriter [1] could be an option If you want to write to different 
sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you 
need to write the custom logic yourself and you cannot scale the partitions for 
the sinks independently.

[1] 
https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html

From: chandan prakash 
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das , "ymaha...@snappydata.io" 
, "priy...@asperasoft.com" , 
"user @spark" 
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Hi, 
Did anyone of you thought  about writing a custom foreach sink writer which can 
decided which record should go to which sink (based on some marker in record, 
which we can possibly annotate during transformation) and then accordingly 
write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many 
there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to 
multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as 
they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das  
wrote:
Of course, you can write to multiple Kafka topics from a single query. If your 
dataframe that you want to write has a column named "topic" (along with "key", 
and "value" columns), it will write the contents of a row to the topic in that 
row. This automatically works. So the only thing you need to figure out is how 
to generate the value of that column. 

This is documented - 
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan  wrote:
I had a similar issue and i think that’s where the structured streaming design 
lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store 
suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream 
based on schema. 
For example, a Kafka topic can have three different types of schema messages 
and I would like to ingest into the three different column tables(having 
different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading 
the same topic and ingesting to respective column tables using their Sink 
implementations. 
These three streaming queries create underlying three IncrementalExecutions and 
three KafkaSources, and three queries reading the same data from the same Kafka 
topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way 
to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the 
messages in a Kafka partition, unfortunately this is not in our control and 
customers cannot change it due to their dependencies on other subsystems.

Thanks,
http://www.snappydata.io/blog

On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava  
wrote:
I have a structured streaming query which sinks to Kafka.  This query has a 
complex aggregation 

Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-12 Thread chandan prakash
Thanks a lot Arun for your response.
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for
the sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because
write will happen to a sink per record basis (after deciding a record
belongs to which particular sink), where as in the current implementation
all data under a RDD partition gets committed to the sink atomically in one
go. Please correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan  wrote:

> Yes ForeachWriter [1] could be an option If you want to write to different
> sinks. You can put your custom logic to split the data into different sinks.
>
> The drawback here is that you cannot plugin existing sinks like Kafka and
> you need to write the custom logic yourself and you cannot scale the
> partitions for the sinks independently.
>
> [1]
> https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html
>
> From: chandan prakash 
> Date: Thursday, July 12, 2018 at 2:38 AM
> To: Tathagata Das , "ymaha...@snappydata.io"
> , "priy...@asperasoft.com" ,
> "user @spark" 
> Subject: Re: [Structured Streaming] Avoiding multiple streaming queries
>
> Hi,
> Did anyone of you thought  about writing a custom foreach sink writer
> which can decided which record should go to which sink (based on some
> marker in record, which we can possibly annotate during transformation) and
> then accordingly write to specific sink.
> This will mean that:
> 1. every custom sink writer will have connections to as many sinks as many
> there are types of sink where records can go.
> 2.  every record will be read once in the single query but can be written
> to multiple sinks
>
> Do you guys see any drawback in this approach ?
> One drawback off course there is that sink is supposed to write the
> records as they are but we are inducing some intelligence here in the sink.
> Apart from that any other issues do you see with this approach?
>
> Regards,
> Chandan
>
>
> On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das 
> wrote:
>
>> Of course, you can write to multiple Kafka topics from a single query. If
>> your dataframe that you want to write has a column named "topic" (along
>> with "key", and "value" columns), it will write the contents of a row to
>> the topic in that row. This automatically works. So the only thing you need
>> to figure out is how to generate the value of that column.
>>
>> This is documented -
>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka
>>
>> Or am i misunderstanding the problem?
>>
>> TD
>>
>>
>>
>>
>> On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan 
>> wrote:
>>
>>> I had a similar issue and i think that’s where the structured streaming
>>> design lacks.
>>> Seems like Question#2 in your email is a viable workaround for you.
>>>
>>> In my case, I have a custom Sink backed by an efficient in-memory column
>>> store suited for fast ingestion.
>>>
>>> I have a Kafka stream coming from one topic, and I need to classify the
>>> stream based on schema.
>>> For example, a Kafka topic can have three different types of schema
>>> messages and I would like to ingest into the three different column
>>> tables(having different schema) using my custom Sink implementation.
>>>
>>> Right now only(?) option I have is to create three streaming queries
>>> reading the same topic and ingesting to respective column tables using
>>> their Sink implementations.
>>> These three streaming queries create underlying three
>>> IncrementalExecutions and three KafkaSources, and three queries reading the
>>> same data from the same Kafka topic.
>>> Even with CachedKafkaConsumers at partition level, this is not an
>>> efficient way to handle a simple streaming use case.
>>>
>>> One workaround to overcome this limitation is to have same schema for
>>> all the messages in a Kafka partition, unfortunately this is not in our
>>> control and customers cannot change it due to their dependencies on other
>>> subsystems.
>>>
>>> Thanks,
>>> http://www.snappydata.io/blog 
>>>
>>> On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <
>>> priy...@asperasoft.com> wrote:
>>>
 I have a structured streaming query which sinks to Kafka.  This query
 has a complex aggregation logic.


 I would like to sink the output DF of this query to
 multiple Kafka topics each partitioned on a different ‘key’ column.  I
 don’t want to have multiple Kafka sinks for each of the
 different Kafka topics because that would mean running multiple streaming
 queries - one for each Kafka topic, especially since my aggregation logic
 is complex.


 Questions:

 1.  Is there a way to output the results of a structured streaming
 query 

Re: How to avoid duplicate column names after join with multiple conditions

2018-07-12 Thread Nirav Patel
Hi Prem, dropping column, renaming column are working for me as a
workaround. I thought it just nice to have generic api that can handle that
for me. or some intelligence that since both columns are same it shouldn't
complain in subsequent Select clause that it doesn't know if I mean a#12 or
a#81. They are both same just pick one.

On Thu, Jul 12, 2018 at 9:38 AM, Prem Sure  wrote:

> Hi Nirav, did you try
> .drop(df1(a) after join
>
> Thanks,
> Prem
>
> On Thu, Jul 12, 2018 at 9:50 PM Nirav Patel  wrote:
>
>> Hi Vamshi,
>>
>> That api is very restricted and not generic enough. It imposes that all
>> conditions of joins has to have same column on both side and it also has to
>> be equijoin. It doesn't serve my usecase where some join predicates don't
>> have same column names.
>>
>> Thanks
>>
>> On Sun, Jul 8, 2018 at 7:39 PM, Vamshi Talla 
>> wrote:
>>
>>> Nirav,
>>>
>>> Spark does not create a duplicate column when you use the below join
>>> expression,  as an array of column(s) like below but that requires the
>>> column name to be same in both the data frames.
>>>
>>> Example: *df1.join(df2, [‘a’])*
>>>
>>> Thanks.
>>> Vamshi Talla
>>>
>>> On Jul 6, 2018, at 4:47 PM, Gokula Krishnan D 
>>> wrote:
>>>
>>> Nirav,
>>>
>>> withColumnRenamed() API might help but it does not different column and
>>> renames all the occurrences of the given column. either use select() API
>>> and rename as you want.
>>>
>>>
>>>
>>> Thanks & Regards,
>>> Gokula Krishnan* (Gokul)*
>>>
>>> On Mon, Jul 2, 2018 at 5:52 PM, Nirav Patel 
>>> wrote:
>>>
 Expr is `df1(a) === df2(a) and df1(b) === df2(c)`

 How to avoid duplicate column 'a' in result? I don't see any api that
 combines both. Rename manually?



 [image: What's New with Xactly]
 


 

 

 

 

 
>>>
>>>
>>>
>>>
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>> 
>> 
>>    
>> 
>
>

-- 


 

 
   
   
      



Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-12 Thread Arun Mahadevan
Yes ForeachWriter [1] could be an option If you want to write to different 
sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you 
need to write the custom logic yourself and you cannot scale the partitions for 
the sinks independently.

[1] 
https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html

From:  chandan prakash 
Date:  Thursday, July 12, 2018 at 2:38 AM
To:  Tathagata Das , "ymaha...@snappydata.io" 
, "priy...@asperasoft.com" , 
"user @spark" 
Subject:  Re: [Structured Streaming] Avoiding multiple streaming queries

Hi, 
Did anyone of you thought  about writing a custom foreach sink writer which can 
decided which record should go to which sink (based on some marker in record, 
which we can possibly annotate during transformation) and then accordingly 
write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many 
there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to 
multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as 
they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das  
wrote:
Of course, you can write to multiple Kafka topics from a single query. If your 
dataframe that you want to write has a column named "topic" (along with "key", 
and "value" columns), it will write the contents of a row to the topic in that 
row. This automatically works. So the only thing you need to figure out is how 
to generate the value of that column. 

This is documented - 
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan  wrote:
I had a similar issue and i think that’s where the structured streaming design 
lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store 
suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream 
based on schema. 
For example, a Kafka topic can have three different types of schema messages 
and I would like to ingest into the three different column tables(having 
different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading 
the same topic and ingesting to respective column tables using their Sink 
implementations. 
These three streaming queries create underlying three IncrementalExecutions and 
three KafkaSources, and three queries reading the same data from the same Kafka 
topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way 
to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the 
messages in a Kafka partition, unfortunately this is not in our control and 
customers cannot change it due to their dependencies on other subsystems.

Thanks,
http://www.snappydata.io/blog

On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava  
wrote:
I have a structured streaming query which sinks to Kafka.  This query has a 
complex aggregation logic.



I would like to sink the output DF of this query to multiple Kafka topics each 
partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka 
sinks for each of the different Kafka topics because that would mean running 
multiple streaming queries - one for each Kafka topic, especially since my 
aggregation logic is complex.



Questions:

1.  Is there a way to output the results of a structured streaming query to 
multiple Kafka topics each with a different key column but without having to 
execute multiple streaming queries? 



2.  If not,  would it be efficient to cascade the multiple queries such that 
the first query does the complex aggregation and writes output to Kafka and 
then the other queries just read the output of the first query and write their 
topics to Kafka thus avoiding doing the complex aggregation again?



Thanks in advance for any help.



Priyank







-- 
Chandan Prakash




Re: How to avoid duplicate column names after join with multiple conditions

2018-07-12 Thread Prem Sure
Hi Nirav, did you try
.drop(df1(a) after join

Thanks,
Prem

On Thu, Jul 12, 2018 at 9:50 PM Nirav Patel  wrote:

> Hi Vamshi,
>
> That api is very restricted and not generic enough. It imposes that all
> conditions of joins has to have same column on both side and it also has to
> be equijoin. It doesn't serve my usecase where some join predicates don't
> have same column names.
>
> Thanks
>
> On Sun, Jul 8, 2018 at 7:39 PM, Vamshi Talla  wrote:
>
>> Nirav,
>>
>> Spark does not create a duplicate column when you use the below join
>> expression,  as an array of column(s) like below but that requires the
>> column name to be same in both the data frames.
>>
>> Example: *df1.join(df2, [‘a’])*
>>
>> Thanks.
>> Vamshi Talla
>>
>> On Jul 6, 2018, at 4:47 PM, Gokula Krishnan D 
>> wrote:
>>
>> Nirav,
>>
>> withColumnRenamed() API might help but it does not different column and
>> renames all the occurrences of the given column. either use select() API
>> and rename as you want.
>>
>>
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>>
>> On Mon, Jul 2, 2018 at 5:52 PM, Nirav Patel 
>> wrote:
>>
>>> Expr is `df1(a) === df2(a) and df1(b) === df2(c)`
>>>
>>> How to avoid duplicate column 'a' in result? I don't see any api that
>>> combines both. Rename manually?
>>>
>>>
>>>
>>> [image: What's New with Xactly]
>>> 
>>>
>>>
>>> 
>>>
>>> 
>>>
>>> 
>>>
>>> 
>>>
>>> 
>>
>>
>>
>>
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 


Running Spark on Kubernetes behind a HTTP proxy

2018-07-12 Thread Lalwani, Jayesh
We are trying to run a Spark job on Kubernetes cluster. The Spark job needs to 
talk to some services external to the Kubernetes cluster through a proxy 
server. We are setting the proxy by setting the extraJavaOptions like this

--conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost 
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \
--conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost 
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \

The problem is that this forces all communication to go through the proxy. This 
includes the communication between the pods, and between pod and K8s master. 
The proxy is blocking the the traffic that is supposed to stay within the 
cluster, and the job fails

We should be setting noproxy so intra-k8s communication doesn’t go through the 
proxy. The problem is that we don’t know what to put in noproxy. Has anyone run 
Spark on K8s behind proxy?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: How to avoid duplicate column names after join with multiple conditions

2018-07-12 Thread Nirav Patel
Hi Vamshi,

That api is very restricted and not generic enough. It imposes that all
conditions of joins has to have same column on both side and it also has to
be equijoin. It doesn't serve my usecase where some join predicates don't
have same column names.

Thanks

On Sun, Jul 8, 2018 at 7:39 PM, Vamshi Talla  wrote:

> Nirav,
>
> Spark does not create a duplicate column when you use the below join
> expression,  as an array of column(s) like below but that requires the
> column name to be same in both the data frames.
>
> Example: *df1.join(df2, [‘a’])*
>
> Thanks.
> Vamshi Talla
>
> On Jul 6, 2018, at 4:47 PM, Gokula Krishnan D  wrote:
>
> Nirav,
>
> withColumnRenamed() API might help but it does not different column and
> renames all the occurrences of the given column. either use select() API
> and rename as you want.
>
>
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>
> On Mon, Jul 2, 2018 at 5:52 PM, Nirav Patel  wrote:
>
>> Expr is `df1(a) === df2(a) and df1(b) === df2(c)`
>>
>> How to avoid duplicate column 'a' in result? I don't see any api that
>> combines both. Rename manually?
>>
>>
>>
>> [image: What's New with Xactly]
>> 
>>
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>
>
>
>

-- 


 

 
   
   
      



Re: how to specify external jars in program with SparkConf

2018-07-12 Thread Prem Sure
I think JVM is initiated with available classpath by the time your conf
execution comes... I faced this earlier during Spark1.6 and ended up moving
to Spark Submit using --jars
found it was not part of runtime config changes..
May I know the advantage you are trying to get programmatically

On Thu, Jul 12, 2018 at 8:19 PM, mytramesh 
wrote:

> Context :- In EMR class path has old version of jar, want to refer new
> version of jar in my code.
>
> through bootstrap while spinning new nodes , copied necessary jars to local
> folder from S3.
>
> In spark-submit command by using extra class path parameter my code able
> refer new version jar which is in custom location .
>
> --conf="spark.driver.extraClassPath=/usr/jars/*"
> --conf="spark.executor.extraClassPath=/usr/jars/*"
>
> Same thing want to achieve programmatically by specifying in sparkconfig
> object, but no luck . Could anyone help me on this .
>
> sparkConf.set("spark.driver.extraClassPath", "/usr/jars/*");
> sparkConf.set("spark.executor.extraClassPath", "/usr/jars/*");
> //tried below options also
> //sparkConf.set("spark.executor.userClassPathFirst", "true");
>  //sparkConf.set("spark.driver.userClassPathFirst", "true");
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


streaming from mongo

2018-07-12 Thread Chethan
​Hi Dev,

I am receving data from mongoDB​ through spark streaming.It gives Dstream
of org.bson.document.

How to convert Dstream [Document] to dataframe? all my other operations are
in dataframes.

Thanks,
Chethan.


how to specify external jars in program with SparkConf

2018-07-12 Thread mytramesh
Context :- In EMR class path has old version of jar, want to refer new
version of jar in my code. 

through bootstrap while spinning new nodes , copied necessary jars to local
folder from S3. 

In spark-submit command by using extra class path parameter my code able
refer new version jar which is in custom location .

--conf="spark.driver.extraClassPath=/usr/jars/*"
--conf="spark.executor.extraClassPath=/usr/jars/*"

Same thing want to achieve programmatically by specifying in sparkconfig
object, but no luck . Could anyone help me on this .

sparkConf.set("spark.driver.extraClassPath", "/usr/jars/*");
sparkConf.set("spark.executor.extraClassPath", "/usr/jars/*");
//tried below options also
//sparkConf.set("spark.executor.userClassPathFirst", "true");
 //sparkConf.set("spark.driver.userClassPathFirst", "true");



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-12 Thread chandan prakash
Hi,
Did anyone of you thought  about writing a custom foreach sink writer which
can decided which record should go to which sink (based on some marker in
record, which we can possibly annotate during transformation) and then
accordingly write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many
there are types of sink where records can go.
2.  every record will be read once in the single query but can be written
to multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records
as they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das 
wrote:

> Of course, you can write to multiple Kafka topics from a single query. If
> your dataframe that you want to write has a column named "topic" (along
> with "key", and "value" columns), it will write the contents of a row to
> the topic in that row. This automatically works. So the only thing you need
> to figure out is how to generate the value of that column.
>
> This is documented -
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka
>
> Or am i misunderstanding the problem?
>
> TD
>
>
>
>
> On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan 
> wrote:
>
>> I had a similar issue and i think that’s where the structured streaming
>> design lacks.
>> Seems like Question#2 in your email is a viable workaround for you.
>>
>> In my case, I have a custom Sink backed by an efficient in-memory column
>> store suited for fast ingestion.
>>
>> I have a Kafka stream coming from one topic, and I need to classify the
>> stream based on schema.
>> For example, a Kafka topic can have three different types of schema
>> messages and I would like to ingest into the three different column
>> tables(having different schema) using my custom Sink implementation.
>>
>> Right now only(?) option I have is to create three streaming queries
>> reading the same topic and ingesting to respective column tables using
>> their Sink implementations.
>> These three streaming queries create underlying three
>> IncrementalExecutions and three KafkaSources, and three queries reading the
>> same data from the same Kafka topic.
>> Even with CachedKafkaConsumers at partition level, this is not an
>> efficient way to handle a simple streaming use case.
>>
>> One workaround to overcome this limitation is to have same schema for all
>> the messages in a Kafka partition, unfortunately this is not in our control
>> and customers cannot change it due to their dependencies on other
>> subsystems.
>>
>> Thanks,
>> http://www.snappydata.io/blog 
>>
>> On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <
>> priy...@asperasoft.com> wrote:
>>
>>> I have a structured streaming query which sinks to Kafka.  This query
>>> has a complex aggregation logic.
>>>
>>>
>>> I would like to sink the output DF of this query to
>>> multiple Kafka topics each partitioned on a different ‘key’ column.  I
>>> don’t want to have multiple Kafka sinks for each of the
>>> different Kafka topics because that would mean running multiple streaming
>>> queries - one for each Kafka topic, especially since my aggregation logic
>>> is complex.
>>>
>>>
>>> Questions:
>>>
>>> 1.  Is there a way to output the results of a structured streaming query
>>> to multiple Kafka topics each with a different key column but without
>>> having to execute multiple streaming queries?
>>>
>>>
>>> 2.  If not,  would it be efficient to cascade the multiple queries such
>>> that the first query does the complex aggregation and writes output
>>> to Kafka and then the other queries just read the output of the first query
>>> and write their topics to Kafka thus avoiding doing the complex aggregation
>>> again?
>>>
>>>
>>> Thanks in advance for any help.
>>>
>>>
>>> Priyank
>>>
>>>
>>>
>>
>

-- 
Chandan Prakash


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-12 Thread Jayant Shekhar
Hello Chetan,

Sorry missed replying earlier. You can find some sample code here :

http://sparkflows.readthedocs.io/en/latest/user-guide/python/pipe-python.html

We will continue adding more there.

Feel free to ping me directly in case of questions.

Thanks,
Jayant


On Mon, Jul 9, 2018 at 9:56 PM, Chetan Khatri 
wrote:

> Hello Jayant,
>
> Thank you so much for suggestion. My view was to  use Python function as
> transformation which can take couple of column names and return object.
> which you explained. would that possible to point me to similiar codebase
> example.
>
> Thanks.
>
> On Fri, Jul 6, 2018 at 2:56 AM, Jayant Shekhar 
> wrote:
>
>> Hello Chetan,
>>
>> We have currently done it with .pipe(.py) as Prem suggested.
>>
>> That passes the RDD as CSV strings to the python script. The python
>> script can either process it line by line, create the result and return it
>> back. Or create things like Pandas Dataframe for processing and finally
>> write the results back.
>>
>> In the Spark/Scala/Java code, you get an RDD of string, which we convert
>> back to a Dataframe.
>>
>> Feel free to ping me directly in case of questions.
>>
>> Thanks,
>> Jayant
>>
>>
>> On Thu, Jul 5, 2018 at 3:39 AM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Prem sure, Thanks for suggestion.
>>>
>>> On Wed, Jul 4, 2018 at 8:38 PM, Prem Sure 
>>> wrote:
>>>
 try .pipe(.py) on RDD

 Thanks,
 Prem

 On Wed, Jul 4, 2018 at 7:59 PM, Chetan Khatri <
 chetan.opensou...@gmail.com> wrote:

> Can someone please suggest me , thanks
>
> On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, <
> chetan.opensou...@gmail.com> wrote:
>
>> Hello Dear Spark User / Dev,
>>
>> I would like to pass Python user defined function to Spark Job
>> developed using Scala and return value of that function would be returned
>> to DF / Dataset API.
>>
>> Can someone please guide me, which would be best approach to do this.
>> Python function would be mostly transformation function. Also would like 
>> to
>> pass Java Function as a String to Spark / Scala job and it applies to 
>> RDD /
>> Data Frame and should return RDD / Data Frame.
>>
>> Thank you.
>>
>>
>>
>>

>>>
>>
>


How to register custom structured streaming source

2018-07-12 Thread Farshid Zavareh
Hello.

I need to create a custom streaming source by extending *FileStreamSource*.
The idea is to override *commit*, so that processed files (S3 objects in my
case) are renamed to have a certain prefix. However, I don't know how to
use this custom source. Obviously I don't want to compile Spark -- the
application will be running on Amazon EMR clusters.

Thanks,
Farshid