Re: Do we anything for Deep Learning in Spark?

2017-07-05 Thread hosur narahari
Hi Roope,

Does this mmlspark project uses GPGPU for processing and just CPU cores
since DL models are computationally very intensive.

Best Regards,
Hari

On 6 Jul 2017 9:33 a.m., "Gaurav1809"  wrote:

> Thanks Roope for the inputs.
>
> On Wed, Jul 5, 2017 at 11:41 PM, Roope [via Apache Spark User List] <[hidden
> email] > wrote:
>
>> Microsoft Machine Learning Library for Apache Spark lets you run CNTK
>> deep learning models on Spark.
>>
>> https://github.com/Azure/mmlspark
>>
>> The library APIs are focused on image processing scenarios, and are
>> compatible with SparkML Pipelines.
>>
>> Cheers,
>> Roope - Microsoft Cloud AI Team
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-an
>> ything-for-Deep-Learning-in-Spark-tp28772p28824.html
>> To start a new topic under Apache Spark User List, email [hidden email]
>> 
>> To unsubscribe from Do we anything for Deep Learning in Spark?, click
>> here.
>> NAML
>> 
>>
>
>
> --
> View this message in context: Re: Do we anything for Deep Learning in
> Spark?
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


RE: SparkSession via HS2 - Error: Yarn application has already ended

2017-07-05 Thread Sudha KS
While testing like this, it does not read hive-site.xml, spark-env.sh of the 
cluster (had to pass in SparkSession.builder().config()).

Is there a way to make it read spark config present in the cluster?


From: Sudha KS
Sent: Wednesday, July 5, 2017 6:45 PM
To: user@spark.apache.org
Subject: RE: SparkSession via HS2 - Error: Yarn application has already ended

For now, passing the config in SparkSession:
SparkSession spark = SparkSession
.builder()
.enableHiveSupport()
.master("yarn-client")
.appName("SampleSparkUDTF_yarnV1")
.config("spark.yarn.jars","hdfs:///hdp/apps/2.6.1.0-129/spark2")

.config("spark.yarn.am.extraJavaOptions","-Dhdp.version=2.6.1.0-129")

.config("spark.driver.extra.JavaOptions","-Dhdp.version=2.6.1.0-129")
.config("spark.executor.memory","4g")
.getOrCreate();


While testing via HS2 & this is the error:
beeline -u jdbc:hive2://localhost:1 -d org.apache.hive.jdbc.HiveDriver
0: jdbc:hive2://localhost:1>
……
Caused by: org.apache.spark.SparkException: Yarn application has already ended! 
It might have been killed or unable to launch application master.
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:62)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
at org.apache.spark.SparkContext.(SparkContext.scala:509)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2320)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
at SparkHiveUDTF.sparkJob(SparkHiveUDTF.java:102)
at SparkHiveUDTF.process(SparkHiveUDTF.java:78)
at 
org.apache.hadoop.hive.ql.exec.UDTFOperator.process(UDTFOperator.java:109)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:841)
at 
org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:88)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:841)
at 
org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:133)
at 
org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:170)
at 
org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:555)
... 18 more

Is there a way to resolve this error?



On Wed, Jul 5, 2017 at 2:01 PM, Sudha KS 
> wrote:
The property “spark.yarn.jars” available via 
/usr/hdp/current/spark2-client/conf/spark-default.conf

spark.yarn.jars 
hdfs://ambari03.fuzzyl.com:8020/hdp/apps/2.6.1.0-129/spark2


Is there any other way to set/read/pass this property “spark.yarn.jars” ?

From: Sudha KS [mailto:sudha...@fuzzylogix.com]
Sent: Wednesday, July 5, 2017 1:51 PM
To: user@spark.apache.org
Subject: SparkSession via HS2 - Error -spark.yarn.jars not read

Why does “spark.yarn.jars” property not read, in this HDP 2.6 , Spark2.1.1 
cluster:
0: jdbc:hive2://localhost:1/db> set spark.yarn.jars;
+--+--+
| set  |
+--+--+
| 
spark.yarn.jars=hdfs://ambari03.fuzzyl.com:8020/hdp/apps/2.6.1.0-129/spark2
  |
+--+--+
1 row selected (0.101 seconds)
0: jdbc:hive2://localhost:1/db>



Error during launch of a SparkSession via HS2:
Caused by: java.lang.IllegalStateException: Library directory 
'/hadoop/yarn/local/usercache/hive/appcache/application_1499235958765_0042/container_e04_1499235958765_0042_01_05/assembly/target/scala-2.11/jars'
 does not exist; make sure Spark is built.
at 
org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:260)
at 
org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(CommandBuilderUtils.java:380)
at 
org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(YarnCommandBuilderUtils.scala:38)
at 
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:570)
at 

Re: custom column types for JDBC datasource writer

2017-07-05 Thread Georg Heiler
Great, thanks!
But for the current release is there any possibility to be able to catch
the exception and handle it i.e. not have spark only log it to the console?

Takeshi Yamamuro  schrieb am Do., 6. Juli 2017 um
06:44 Uhr:

> -dev +user
>
> You can in master and see
> https://github.com/apache/spark/commit/c7911807050227fcd13161ce090330d9d8daa533
> .
> This option will be available in the next release.
>
> // maropu
>
> On Thu, Jul 6, 2017 at 1:25 PM, Georg Heiler 
> wrote:
>
>> Hi,
>> is it possible to somehow make spark not use VARCHAR(255) but something
>> bigger i.e. CLOB for Strings?
>>
>> If not, is it at least possible to catch the exception which is thrown.
>> To me, it seems that spark is catching and logging it - so I can no longer
>> intervene and handle it:
>>
>>
>> https://stackoverflow.com/questions/44927764/spark-jdbc-oracle-long-string-fields
>>
>> Regards,
>> Georg
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: custom column types for JDBC datasource writer

2017-07-05 Thread Takeshi Yamamuro
-dev +user

You can in master and see
https://github.com/apache/spark/commit/c7911807050227fcd13161ce090330d9d8daa533
.
This option will be available in the next release.

// maropu

On Thu, Jul 6, 2017 at 1:25 PM, Georg Heiler 
wrote:

> Hi,
> is it possible to somehow make spark not use VARCHAR(255) but something
> bigger i.e. CLOB for Strings?
>
> If not, is it at least possible to catch the exception which is thrown. To
> me, it seems that spark is catching and logging it - so I can no longer
> intervene and handle it:
>
> https://stackoverflow.com/questions/44927764/spark-jdbc-
> oracle-long-string-fields
>
> Regards,
> Georg
>



-- 
---
Takeshi Yamamuro


Re: Do we anything for Deep Learning in Spark?

2017-07-05 Thread Gaurav1809
Thanks Roope for the inputs.

On Wed, Jul 5, 2017 at 11:41 PM, Roope [via Apache Spark User List] <
ml+s1001560n2882...@n3.nabble.com> wrote:

> Microsoft Machine Learning Library for Apache Spark lets you run CNTK deep
> learning models on Spark.
>
> https://github.com/Azure/mmlspark
>
> The library APIs are focused on image processing scenarios, and are
> compatible with SparkML Pipelines.
>
> Cheers,
> Roope - Microsoft Cloud AI Team
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-
> anything-for-Deep-Learning-in-Spark-tp28772p28824.html
> To start a new topic under Apache Spark User List, email
> ml+s1001560n1...@n3.nabble.com
> To unsubscribe from Do we anything for Deep Learning in Spark?, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-anything-for-Deep-Learning-in-Spark-tp28772p28826.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

UDAFs for sketching Dataset columns with T-Digests

2017-07-05 Thread Erik Erlandson
After my talk on T-Digests in Spark at Spark Summit East, there were some
requests for a UDAF-based interface for working with Datasets.   I'm
pleased to announce that I released a library for doing T-Digest sketching
with UDAFs:

https://github.com/isarn/isarn-sketches-spark

This initial release provides support for Scala. Future releases will
support PySpark bindings, and additional tools for leveraging T-Digests in
ML pipelines.

Cheers!
Erik


Exception: JDK-8154035 using Whole text files api

2017-07-05 Thread Reth RM
Hi,

   Using sc.wholeTextFiles to read warc file (example file here
). Spark reporting
an error with stack trace pasted here : https://pastebin.com/qfmM2eKk

Looks like its same as bug reported here:
https://bugs.openjdk.java.net/browse/JDK-8154035
Thoughts?

And could you please suggest work around solutions other than reading those
files(warc) using java FileInputStream apis?


Thanks.


Re: PySpark working with Generators

2017-07-05 Thread Saatvik Shah
Hi Jörn,

I apologize for such a late response.

Yes, the data volume is very high(won't fit on 1 machine's memory) and I am
getting a significant benefit when reading the files in a distributed
manner.
Since the data volume is high, converting it to an alternative format would
be a worst case scenario.
I agree on writing a custom Spark writer, but that might take a while, and
to proceed with the work till then was hoping to use the current
implementation itself which is fast enough to work with. The only issue is
the one I've already discussed, which is of working with generators to
allow low memory executor tasks.
I'm not sure I fully understand your recommendation on the core usage -
Could you explain in a little more detail? I'm currently using dynamic
allocation with YARN allowing each spark executor 8 vcores.
The data format is proprietary and surely not heard of.

Thanks and Regards,
Saatvik Shah


On Fri, Jun 30, 2017 at 10:16 AM, Jörn Franke  wrote:

> In this case i do not see so many benefits of using Spark. Is the data
> volume high?
> Alternatively i recommend to convert the proprietary format into a format
> Sparks understand and then use this format in Spark.
> Another alternative would be to write a custom Spark datasource. Even your
> proprietary format should be then able to be put on HDFS.
> That being said, I do not recommend to use more cores outside Sparks
> control. The reason is that Spark thinks these core are free and does the
> wrong allocation of executors/tasks. This will slow down all applications
> on Spark.
>
> May I ask what the format is called?
>
> On 30. Jun 2017, at 16:05, Saatvik Shah  wrote:
>
> Hi Mahesh and Ayan,
>
> The files I'm working with are a very complex proprietary format, for whom
> I only have access to a reader function as I had described earlier which
> only accepts a path to a local file system.
> This rules out sc.wholeTextFile - since I cannot pass the contents of
> wholeTextFile to an function(API call) expecting a local file path.
> For similar reasons, I cannot use HDFS and am bound to using a highly
> available Network File System arrangement currently.
> Any suggestions, given these constraints? Or any incorrect assumptions
> you'll think I've made?
>
> Thanks and Regards,
> Saatvik Shah
>
>
>
> On Fri, Jun 30, 2017 at 12:50 AM, Mahesh Sawaiker <
> mahesh_sawai...@persistent.com> wrote:
>
>> Wouldn’t this work if you load the files in hdfs and let the partitions
>> be equal to the amount of parallelism you want?
>>
>>
>>
>> *From:* Saatvik Shah [mailto:saatvikshah1...@gmail.com]
>> *Sent:* Friday, June 30, 2017 8:55 AM
>> *To:* ayan guha
>> *Cc:* user
>> *Subject:* Re: PySpark working with Generators
>>
>>
>>
>> Hey Ayan,
>>
>>
>>
>> This isnt a typical text file - Its a proprietary data format for which a
>> native Spark reader is not available.
>>
>>
>>
>> Thanks and Regards,
>>
>> Saatvik Shah
>>
>>
>>
>> On Thu, Jun 29, 2017 at 6:48 PM, ayan guha  wrote:
>>
>> If your files are in same location you can use sc.wholeTextFile. If not,
>> sc.textFile accepts a list of filepaths.
>>
>>
>>
>> On Fri, 30 Jun 2017 at 5:59 am, saatvikshah1994 <
>> saatvikshah1...@gmail.com> wrote:
>>
>> Hi,
>>
>> I have this file reading function is called /foo/ which reads contents
>> into
>> a list of lists or into a generator of list of lists representing the same
>> file.
>>
>> When reading as a complete chunk(1 record array) I do something like:
>> rdd = file_paths_rdd.map(lambda x: foo(x,"wholeFile")).flatMap(lambda
>> x:x)
>>
>> I'd like to now do something similar but with the generator, so that I can
>> work with more cores and a lower memory. I'm not sure how to tackle this
>> since generators cannot be pickled and thus I'm not sure how to ditribute
>> the work of reading each file_path on the rdd?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/PySpark-working-with-Generators-tp28810.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
>>
>> Best Regards,
>> Ayan Guha
>>
>>
>> DISCLAIMER
>> ==
>> This e-mail may contain privileged and confidential information which is
>> the property of Persistent Systems Ltd. It is intended only for the use of
>> the individual or entity to which it is addressed. If you are not the
>> intended recipient, you are not authorized to read, retain, copy, print,
>> distribute or use this message. If you have received this communication in
>> error, please notify the sender and delete all copies of this message.
>> Persistent Systems Ltd. does not accept any liability for virus infected
>> mails.
>>
>


-- 
*Saatvik Shah,*
*1st  Year,*
*Masters in the School of Computer Science,*
*Carnegie Mellon University*


Re: Spark | Window Function |

2017-07-05 Thread Radhwane Chebaane
Hi Julien,


Although this is a strange bug in Spark, it's rare to need more than
Integer max value size for a window.

Nevertheless, most of the window functions can be expressed with
self-joins. Hence, your problem may be solved with this example:

If input data as follow:

+---+-+-+
| id|timestamp|value|
+---+-+-+
|  B|1|  100|
|  B|10010|   50|
|  B|10020|  200|
|  B|25000|  500|
+---+-+-+

And the window is  (-20L, 0)

Then this code will give the wanted result:

df.as("df1").join(df.as("df2"),
  $"df2.timestamp" between($"df1.timestamp" - 20L, $"df1.timestamp"))
  .groupBy($"df1.id", $"df1.timestamp", $"df1.value")
  .agg( functions.min($"df2.value").as("min___value"))
  .orderBy($"df1.timestamp")
  .show()

+---+-+-+---+
| id|timestamp|value|min___value|
+---+-+-+---+
|  B|1|  100|100|
|  B|10010|   50| 50|
|  B|10020|  200| 50|
|  B|25000|  500|500|
+---+-+-+---+

Or by SparkSQL:

SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as
min___value FROM
(
  SELECT a.id as id, a.timestamp as timestamp, a.value as value,
b.timestamp as _timestamp, b.value as _value
  FROM df a CROSS JOIN df b
  ON b.timestamp >= a.timestamp - 20L and b.timestamp <= a.timestamp
) c
GROUP BY c.id, c.timestamp, c.value ORDER BY c.timestamp


This must be also possible also on Spark Streaming however don't
expect high performance.


Cheers,
Radhwane



2017-07-05 10:41 GMT+02:00 Julien CHAMP :

> Hi there !
>
> Let me explain my problem to see if you have a good solution to help me :)
>
> Let's imagine that I have all my data in a DB or a file, that I load in a
> dataframe DF with the following columns :
> *id | timestamp(ms) | value*
> A | 100 |  100
> A | 110 |  50
> B | 100 |  100
> B | 110 |  50
> B | 120 |  200
> B | 250 |  500
> C | 100 |  200
> C | 110 |  500
>
> The timestamp is a *long value*, so as to be able to express date in ms
> from -01-01 to today !
>
> I want to compute operations such as min, max, average on the *value
> column*, for a given window function, and grouped by id ( Bonus :  if
> possible for only some timestamps... )
>
> For example if I have 3 tuples :
>
> id | timestamp(ms) | value
> B | 100 |  100
> B | 110 |  50
> B | 120 |  200
> B | 250 |  500
>
> I would like to be able to compute the min value for windows of time = 20.
> This would result in such a DF :
>
> id | timestamp(ms) | value | min___value
> B | 100 |  100 | 100
> B | 110 |  50  | 50
> B | 120 |  200 | 50
> B | 250 |  500 | 500
>
> This seems the perfect use case for window function in spark  ( cf :
> https://databricks.com/blog/2015/07/15/introducing-window-
> functions-in-spark-sql.html )
> I can use :
>
> Window.orderBy("timestamp").partitionBy("id").rangeBetween(-20,0)
> df.withColumn("min___value", min(df.col("value")).over(tw))
>
> This leads to the perfect answer !
>
> However, there is a big bug with window functions as reported here (
> https://issues.apache.org/jira/browse/SPARK-19451 ) when working with
> Long values !!! So I can't use this
>
> So my question is ( of course ) how can I resolve my problem ?
> If I use spark streaming I will face the same issue ?
>
> I'll be glad to discuss this problem with you, feel free to answer :)
>
> Regards,
>
> Julien
> --
>
>
> Julien CHAMP — Data Scientist
>
>
> *Web : **www.tellmeplus.com*  — *Email : 
> **jch...@tellmeplus.com
> *
>
> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
> 
>
> TellMePlus S.A — Predictive Objects
>
> *Paris* : 7 rue des Pommerots, 78400 Chatou
> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière
>
>
> Ce message peut contenir des informations confidentielles ou couvertes par
> le secret professionnel, à l’intention de son destinataire. Si vous n’en
> êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer
> toute copie.
> This email may contain confidential and/or privileged information for the
> intended recipient. If you are not the intended recipient, please contact
> the sender and delete all copies.
>
>
> 




-- 

[image: photo] Radhwane Chebaane
Distributed systems engineer, Mindlytix

Mail: radhw...@mindlytix.com  
Mobile: +33 695 588 906 <+33+695+588+906>

Skype: rad.cheb  
LinkedIn 



Re: Load multiple CSV from different paths

2017-07-05 Thread Didac Gil
Thanks man!

That was the key.

source = […].toSeq

sources: _*

Learnt something more with Scala.

> On 5 Jul 2017, at 16:29, Radhwane Chebaane  wrote:
> 
> Hi,
> 
> Referring to spark 2.x documentation, in org.apache.spark.sql.DataFrameReader 
>  you have this function:
> def csv(paths: String*): DataFrame 
> 
> 
> So you can unpack your Array of paths like this:
> val sources = paths.split(',').toSeq
> spark.read.option("header", "false")
> .schema(custom_schema)
> .option('delimiter', '\t')
> .option('mode', 'DROPMALFORMED')
> .csv(sources: _*)
> 
> In spark 1.6.x I think this may work with spark-csv 
>  :
> 
> spark.read.format("com.databricks.spark.csv").option("header", "false")
> .schema(custom_schema)
> .option('delimiter', '\t')
> .option('mode', 'DROPMALFORMED')
> .load(sources: _*)
> 
> 
> Cheers,
> Radhwane Chebaane
> 
> 2017-07-05 16:08 GMT+02:00 Didac Gil  >:
> Hi,
> 
> Do you know any simple way to load multiple csv files (same schema) that are 
> in different paths?
> Wildcards are not a solution, as I want to load specific csv files from 
> different folders.
> 
> I came across a solution 
> (https://stackoverflow.com/questions/37639956/how-to-import-multiple-csv-files-in-a-single-load
>  
> )
>  that suggests something like
> 
> spark.read.format("csv").option("header", "false")
> .schema(custom_schema)
> .option('delimiter', '\t')
> .option('mode', 'DROPMALFORMED')
> .load(paths.split(','))
> However, even it mentions that this approach would work in Spark 2.x, I don’t 
> find an implementation of load that accepts an Array[String] as an input 
> parameter.
> 
> Thanks in advance for your help.
> 
> 
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com 
> Spain: +34 696 285 544 
> Sweden: +46 (0)730229737 
> Skype: didac.gil.de.la.iglesia
> 
> 
> 
> 
> --
> 
>   Radhwane Chebaane
> Distributed systems engineer, Mindlytix
> Mail: radhw...@mindlytix.com  
> Mobile: +33 695 588 906   
> 
> Skype: rad.cheb  
> LinkedIn   
> 

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


Collecting matrix's entries raises an error only when run inside a test

2017-07-05 Thread Simone Robutti
Hello, I have this problem and  Google is not helping. Instead, it looks
like an unreported bug and there are no hints to possible workarounds.

the error is the following:

Traceback (most recent call last):
  File
"/home/simone/motionlogic/trip-labeler/test/trip_labeler_test/model_test.py",
line 43, in test_make_trip_matrix
entries = trip_matrix.entries.map(lambda entry: (entry.i, entry.j,
entry.value)).collect()
  File
"/opt/spark-1.6.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py",
line 770, in collect
with SCCallSiteSync(self.context) as css:
  File
"/opt/spark-1.6.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/traceback_utils.py",
line 72, in __enter__
self._context._jsc.setCallSite(self._call_site)
AttributeError: 'NoneType' object has no attribute 'setCallSite'

and it is raised when I try to collect a
pyspark.mllib.linalg.distributed.CoordinateMatrix entries with .collect()
and it happens only when this run in a test suite with more than one class,
so it's probably related to the creation and destruction of SparkContexts
but I cannot understand how.

Spark version is 1.6.2

I saw multiple references to this error for other classses in the pyspark
ml library but none of them contained hints toward the solution.

I'm running tests through nosetests when it breaks. Running a single
TestCase in Intellij works fine.

Is there a known solution? Is it a known problem?

Thank you,

Simone


Re: Load multiple CSV from different paths

2017-07-05 Thread Radhwane Chebaane
Hi,

Referring to spark 2.x documentation, in
org.apache.spark.sql.DataFrameReader  you have this function:
def csv(paths: String*): DataFrame


So you can unpack your Array of paths like this:

val sources = paths.split(',').toSeq

spark.read.option("header", "false")
.schema(custom_schema)
.option('delimiter', '\t')
.option('mode', 'DROPMALFORMED')
.csv(sources: _*)


In spark 1.6.x I think this may work with spark-csv
 :

spark.read.format("com.databricks.spark.csv").option("header", "false")
.schema(custom_schema)
.option('delimiter', '\t')
.option('mode', 'DROPMALFORMED')
.load(sources: _*)



Cheers,
Radhwane Chebaane

2017-07-05 16:08 GMT+02:00 Didac Gil :

> Hi,
>
> Do you know any simple way to load multiple csv files (same schema) that
> are in different paths?
> Wildcards are not a solution, as I want to load specific csv files from
> different folders.
>
> I came across a solution (https://stackoverflow.com/
> questions/37639956/how-to-import-multiple-csv-files-in-a-single-load) that
> suggests something like
>
> spark.read.format("csv").option("header", "false")
> .schema(custom_schema)
> .option('delimiter', '\t')
> .option('mode', 'DROPMALFORMED')
> .load(paths.split(','))
>
> However, even it mentions that this approach would work in Spark 2.x, I
> don’t find an implementation of load that accepts an Array[String] as an
> input parameter.
>
> Thanks in advance for your help.
>
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com
> Spain: +34 696 285 544 <+34%20696%2028%2055%2044>
> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
> Skype: didac.gil.de.la.iglesia
>
>


-- 

[image: photo] Radhwane Chebaane
Distributed systems engineer, Mindlytix

Mail: radhw...@mindlytix.com  
Mobile: +33 695 588 906 <+33+695+588+906>

Skype: rad.cheb  
LinkedIn 



Load multiple CSV from different paths

2017-07-05 Thread Didac Gil
Hi,

Do you know any simple way to load multiple csv files (same schema) that are in 
different paths?
Wildcards are not a solution, as I want to load specific csv files from 
different folders.

I came across a solution 
(https://stackoverflow.com/questions/37639956/how-to-import-multiple-csv-files-in-a-single-load
 
)
 that suggests something like

spark.read.format("csv").option("header", "false")
.schema(custom_schema)
.option('delimiter', '\t')
.option('mode', 'DROPMALFORMED')
.load(paths.split(','))
However, even it mentions that this approach would work in Spark 2.x, I don’t 
find an implementation of load that accepts an Array[String] as an input 
parameter.

Thanks in advance for your help.


Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


Re: Spark, S3A, and 503 SlowDown / rate limit issues

2017-07-05 Thread Vadim Semenov
Are you sure that you use S3A?
Because EMR says that they do not support S3A

https://aws.amazon.com/premiumsupport/knowledge-center/emr-file-system-s3/
> Amazon EMR does not currently support use of the Apache Hadoop S3A file
system.

I think that the HEAD requests come from the `createBucketIfNotExists` in
the AWS S3 library that checks if the bucket exists every time you do a PUT
request, i.e. creates a HEAD request.

You can disable that by setting `fs.s3.buckets.create.enabled` to `false`
http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html

On Thu, Jun 29, 2017 at 4:56 PM, Everett Anderson 
wrote:

> Hi,
>
> We're using Spark 2.0.2 + Hadoop 2.7.3 on AWS EMR with S3A for direct I/O
> from/to S3 from our Spark jobs. We set mapreduce.
> fileoutputcommitter.algorithm.version=2 and are using encrypted S3
> buckets.
>
> This has been working fine for us, but perhaps as we've been running more
> jobs in parallel, we've started getting errors like
>
> Status Code: 503, AWS Service: Amazon S3, AWS Request ID: ..., AWS Error
> Code: SlowDown, AWS Error Message: Please reduce your request rate., S3
> Extended Request ID: ...
>
> We enabled CloudWatch S3 request metrics for one of our buckets and I was
> a little alarmed to see spikes of over 800k S3 requests over a minute or
> so, with the bulk of them HEAD requests.
>
> We read and write Parquet files, and most tables have around 50
> shards/parts, though some have up to 200. I imagine there's additional
> parallelism when reading a shard in Parquet, though.
>
> Has anyone else encountered this? How did you solve it?
>
> I'd sure prefer to avoid copying all our data in and out of HDFS for each
> job, if possible.
>
> Thanks!
>
>


RE: SparkSession via HS2 - Error: Yarn application has already ended

2017-07-05 Thread Sudha KS
For now, passing the config in SparkSession:
SparkSession spark = SparkSession
.builder()
.enableHiveSupport()
.master("yarn-client")
.appName("SampleSparkUDTF_yarnV1")
.config("spark.yarn.jars","hdfs:///hdp/apps/2.6.1.0-129/spark2")

.config("spark.yarn.am.extraJavaOptions","-Dhdp.version=2.6.1.0-129")

.config("spark.driver.extra.JavaOptions","-Dhdp.version=2.6.1.0-129")
.config("spark.executor.memory","4g")
.getOrCreate();


While testing via HS2 & this is the error:
beeline -u jdbc:hive2://localhost:1 -d org.apache.hive.jdbc.HiveDriver
0: jdbc:hive2://localhost:1>
……
Caused by: org.apache.spark.SparkException: Yarn application has already ended! 
It might have been killed or unable to launch application master.
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:62)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
at org.apache.spark.SparkContext.(SparkContext.scala:509)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2320)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
at SparkHiveUDTF.sparkJob(SparkHiveUDTF.java:102)
at SparkHiveUDTF.process(SparkHiveUDTF.java:78)
at 
org.apache.hadoop.hive.ql.exec.UDTFOperator.process(UDTFOperator.java:109)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:841)
at 
org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:88)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:841)
at 
org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:133)
at 
org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:170)
at 
org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:555)
... 18 more

Is there a way to resolve this error?



On Wed, Jul 5, 2017 at 2:01 PM, Sudha KS 
> wrote:
The property “spark.yarn.jars” available via 
/usr/hdp/current/spark2-client/conf/spark-default.conf

spark.yarn.jars 
hdfs://ambari03.fuzzyl.com:8020/hdp/apps/2.6.1.0-129/spark2


Is there any other way to set/read/pass this property “spark.yarn.jars” ?

From: Sudha KS [mailto:sudha...@fuzzylogix.com]
Sent: Wednesday, July 5, 2017 1:51 PM
To: user@spark.apache.org
Subject: SparkSession via HS2 - Error -spark.yarn.jars not read

Why does “spark.yarn.jars” property not read, in this HDP 2.6 , Spark2.1.1 
cluster:
0: jdbc:hive2://localhost:1/db> set spark.yarn.jars;
+--+--+
| set  |
+--+--+
| 
spark.yarn.jars=hdfs://ambari03.fuzzyl.com:8020/hdp/apps/2.6.1.0-129/spark2
  |
+--+--+
1 row selected (0.101 seconds)
0: jdbc:hive2://localhost:1/db>



Error during launch of a SparkSession via HS2:
Caused by: java.lang.IllegalStateException: Library directory 
'/hadoop/yarn/local/usercache/hive/appcache/application_1499235958765_0042/container_e04_1499235958765_0042_01_05/assembly/target/scala-2.11/jars'
 does not exist; make sure Spark is built.
at 
org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:260)
at 
org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(CommandBuilderUtils.java:380)
at 
org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(YarnCommandBuilderUtils.scala:38)
at 
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:570)
at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:895)
at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:171)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
at 

Re: Spark querying parquet data partitioned in S3

2017-07-05 Thread Steve Loughran

> On 29 Jun 2017, at 17:44, fran  wrote:
> 
> We have got data stored in S3 partitioned by several columns. Let's say
> following this hierarchy:
> s3://bucket/data/column1=X/column2=Y/parquet-files
> 
> We run a Spark job in a EMR cluster (1 master,3 slaves) and realised the
> following:
> 
> A) - When we declare the initial dataframe to be the whole dataset (val df =
> sqlContext.read.parquet("s3://bucket/data/) then the driver splits the job
> into several tasks (259) that are performed by the executors and we believe
> the driver gets back the parquet metadata.
> 
> Question: The above takes about 25 minutes for our dataset, we believe it
> should be a lazy query (as we are not performing any actions) however it
> looks like something is happening, all the executors are reading from S3. We
> have tried mergeData=false and setting the schema explicitly via
> .schema(someSchema). Is there any way to speed this up?
> 
> B) - When we declare the initial dataframe to be scoped by the first column
> (val df = sqlContext.read.parquet("s3://bucket/data/column1=X) then it seems
> that all the work (getting the parquet metadata) is done by the driver and
> there is no job submitted to Spark. 
> 
> Question: Why does (A) send the work to executors but (B) does not?
> 
> The above is for EMR 5.5.0, Hadoop 2.7.3 and Spark 2.1.0.
> 
> 

Split calculation can be very slow against object stores, especially if the 
directory structure is deep: the treewalking done here is pretty inefficient 
against the object store.

Then there's the schema merge, which looks at the tail of every file, so has to 
do a seek() against all of them. That is something which it parallelises around 
the cluster, before your job actually gets scheduled. 

Turning that off with spark.sql.parquet.mergeSchema = false should make it go 
away, but clearly not.

Aa call to jstack against the driver will show where it is at: you'll probably 
have to start from there


I know if you are using EMR you are stuck using Amazon's own s3 ciients; if you 
were on Apache's own artifacts you could move up to Hadoop 2.8 and set the 
spark.hadoop.fs.s3a.experimental.fadvise=random option for high speed random 
access. You can also turn off job summary creation in Spark


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SparkSession via HS2 - Error -spark.yarn.jars not read

2017-07-05 Thread Sandeep Nemuri
STS will refer spark-thrift-sparkconf.conf, Can you check if the
spark.yarn.jars exists in this file?



On Wed, Jul 5, 2017 at 2:01 PM, Sudha KS  wrote:

> The property “spark.yarn.jars” available via /usr/hdp/current/spark2-
> client/conf/spark-default.conf
>
>
>
> spark.yarn.jars hdfs://ambari03.fuzzyl.com:8020/hdp/apps/2.6.1.0-129/
> spark2
>
>
>
>
>
> Is there any other way to set/read/pass this property “spark.yarn.jars” ?
>
>
>
> *From:* Sudha KS [mailto:sudha...@fuzzylogix.com]
> *Sent:* Wednesday, July 5, 2017 1:51 PM
> *To:* user@spark.apache.org
> *Subject:* SparkSession via HS2 - Error -spark.yarn.jars not read
>
>
>
> Why does “spark.yarn.jars” property not read, in this HDP 2.6 , Spark2.1.1
> cluster:
>
> 0: jdbc:hive2://localhost:1/db> set spark.yarn.jars;
>
> +---
> ---+--+
>
> | set
> |
>
> +---
> ---+--+
>
> | spark.yarn.jars=hdfs://ambari03.fuzzyl.com:8020/hdp/apps/2.
> 6.1.0-129/spark2  |
>
> +---
> ---+--+
>
> 1 row selected (0.101 seconds)
>
> 0: jdbc:hive2://localhost:1/db>
>
>
>
>
>
>
>
> Error during launch of a SparkSession via HS2:
>
> Caused by: java.lang.IllegalStateException: Library directory
> '/hadoop/yarn/local/usercache/hive/appcache/application_
> 1499235958765_0042/container_e04_1499235958765_0042_01_
> 05/assembly/target/scala-2.11/jars' does not exist; make sure Spark
> is built.
>
> at org.apache.spark.launcher.CommandBuilderUtils.checkState(
> CommandBuilderUtils.java:260)
>
> at org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(
> CommandBuilderUtils.java:380)
>
> at org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(
> YarnCommandBuilderUtils.scala:38)
>
> at org.apache.spark.deploy.yarn.Client.prepareLocalResources(
> Client.scala:570)
>
> at org.apache.spark.deploy.yarn.Client.
> createContainerLaunchContext(Client.scala:895)
>
> at org.apache.spark.deploy.yarn.Client.submitApplication(
> Client.scala:171)
>
> at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.
> start(YarnClientSchedulerBackend.scala:56)
>
> at org.apache.spark.scheduler.TaskSchedulerImpl.start(
> TaskSchedulerImpl.scala:156)
>
> at org.apache.spark.SparkContext.(SparkContext.scala:509)
>
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.
> scala:2320)
>
> at org.apache.spark.sql.SparkSession$Builder$$anonfun$
> 6.apply(SparkSession.scala:868)
>
> at org.apache.spark.sql.SparkSession$Builder$$anonfun$
> 6.apply(SparkSession.scala:860)
>
> at scala.Option.getOrElse(Option.scala:121)
>
> at org.apache.spark.sql.SparkSession$Builder.
> getOrCreate(SparkSession.scala:860)
>
> at SparkHiveUDTF.sparkJob(SparkHiveUDTF.java:97)
>
> at SparkHiveUDTF.process(SparkHiveUDTF.java:78)
>
> at org.apache.hadoop.hive.ql.exec.UDTFOperator.process(
> UDTFOperator.java:109)
>
> at org.apache.hadoop.hive.ql.exec.Operator.forward(
> Operator.java:841)
>
> at org.apache.hadoop.hive.ql.exec.SelectOperator.process(
> SelectOperator.java:88)
>
> at org.apache.hadoop.hive.ql.exec.Operator.forward(
> Operator.java:841)
>
> at org.apache.hadoop.hive.ql.exec.TableScanOperator.
> process(TableScanOperator.java:133)
>
> at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.
> forward(MapOperator.java:170)
>
> at org.apache.hadoop.hive.ql.exec.MapOperator.process(
> MapOperator.java:555)
>
> ... 18 more
>
>
>
>
>
>
>
>
>



-- 
*  Regards*
*  Sandeep Nemuri*


Reading csv.gz files

2017-07-05 Thread Sea aj
I need to import a set of files with csv.gz extension into Spark. each file
contains a table of data. I was wondering if anyone knows how to read it?



 Sent with Mailtrack



Spark | Window Function |

2017-07-05 Thread Julien CHAMP
Hi there !

Let me explain my problem to see if you have a good solution to help me :)

Let's imagine that I have all my data in a DB or a file, that I load in a
dataframe DF with the following columns :
*id | timestamp(ms) | value*
A | 100 |  100
A | 110 |  50
B | 100 |  100
B | 110 |  50
B | 120 |  200
B | 250 |  500
C | 100 |  200
C | 110 |  500

The timestamp is a *long value*, so as to be able to express date in ms
from -01-01 to today !

I want to compute operations such as min, max, average on the *value column*,
for a given window function, and grouped by id ( Bonus :  if possible for
only some timestamps... )

For example if I have 3 tuples :

id | timestamp(ms) | value
B | 100 |  100
B | 110 |  50
B | 120 |  200
B | 250 |  500

I would like to be able to compute the min value for windows of time = 20.
This would result in such a DF :

id | timestamp(ms) | value | min___value
B | 100 |  100 | 100
B | 110 |  50  | 50
B | 120 |  200 | 50
B | 250 |  500 | 500

This seems the perfect use case for window function in spark  ( cf :
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
 )
I can use :

Window.orderBy("timestamp").partitionBy("id").rangeBetween(-20,0)
df.withColumn("min___value", min(df.col("value")).over(tw))

This leads to the perfect answer !

However, there is a big bug with window functions as reported here (
https://issues.apache.org/jira/browse/SPARK-19451 ) when working with Long
values !!! So I can't use this

So my question is ( of course ) how can I resolve my problem ?
If I use spark streaming I will face the same issue ?

I'll be glad to discuss this problem with you, feel free to answer :)

Regards,

Julien
-- 


Julien CHAMP — Data Scientist


*Web : **www.tellmeplus.com*  — *Email :
**jch...@tellmeplus.com
*

*Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*


TellMePlus S.A — Predictive Objects

*Paris* : 7 rue des Pommerots, 78400 Chatou
*Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière

-- 

Ce message peut contenir des informations confidentielles ou couvertes par 
le secret professionnel, à l’intention de son destinataire. Si vous n’en 
êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer 
toute copie.
This email may contain confidential and/or privileged information for the 
intended recipient. If you are not the intended recipient, please contact 
the sender and delete all copies.


-- 
 


RE: SparkSession via HS2 - Error -spark.yarn.jars not read

2017-07-05 Thread Sudha KS
The property "spark.yarn.jars" available via 
/usr/hdp/current/spark2-client/conf/spark-default.conf

spark.yarn.jars hdfs://ambari03.fuzzyl.com:8020/hdp/apps/2.6.1.0-129/spark2


Is there any other way to set/read/pass this property "spark.yarn.jars" ?

From: Sudha KS [mailto:sudha...@fuzzylogix.com]
Sent: Wednesday, July 5, 2017 1:51 PM
To: user@spark.apache.org
Subject: SparkSession via HS2 - Error -spark.yarn.jars not read

Why does "spark.yarn.jars" property not read, in this HDP 2.6 , Spark2.1.1 
cluster:
0: jdbc:hive2://localhost:1/db> set spark.yarn.jars;
+--+--+
| set  |
+--+--+
| spark.yarn.jars=hdfs://ambari03.fuzzyl.com:8020/hdp/apps/2.6.1.0-129/spark2  |
+--+--+
1 row selected (0.101 seconds)
0: jdbc:hive2://localhost:1/db>



Error during launch of a SparkSession via HS2:
Caused by: java.lang.IllegalStateException: Library directory 
'/hadoop/yarn/local/usercache/hive/appcache/application_1499235958765_0042/container_e04_1499235958765_0042_01_05/assembly/target/scala-2.11/jars'
 does not exist; make sure Spark is built.
at 
org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:260)
at 
org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(CommandBuilderUtils.java:380)
at 
org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(YarnCommandBuilderUtils.scala:38)
at 
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:570)
at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:895)
at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:171)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
at org.apache.spark.SparkContext.(SparkContext.scala:509)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2320)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
at SparkHiveUDTF.sparkJob(SparkHiveUDTF.java:97)
at SparkHiveUDTF.process(SparkHiveUDTF.java:78)
at 
org.apache.hadoop.hive.ql.exec.UDTFOperator.process(UDTFOperator.java:109)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:841)
at 
org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:88)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:841)
at 
org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:133)
at 
org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:170)
at 
org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:555)
... 18 more






SparkSession via HS2 - Error -spark.yarn.jars not read

2017-07-05 Thread Sudha KS
Why does "spark.yarn.jars" property not read, in this HDP 2.6 , Spark2.1.1 
cluster:
0: jdbc:hive2://localhost:1/db> set spark.yarn.jars;
+--+--+
| set  |
+--+--+
| spark.yarn.jars=hdfs://ambari03.fuzzyl.com:8020/hdp/apps/2.6.1.0-129/spark2  |
+--+--+
1 row selected (0.101 seconds)
0: jdbc:hive2://localhost:1/db>



Error during launch of a SparkSession via HS2:
Caused by: java.lang.IllegalStateException: Library directory 
'/hadoop/yarn/local/usercache/hive/appcache/application_1499235958765_0042/container_e04_1499235958765_0042_01_05/assembly/target/scala-2.11/jars'
 does not exist; make sure Spark is built.
at 
org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:260)
at 
org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(CommandBuilderUtils.java:380)
at 
org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(YarnCommandBuilderUtils.scala:38)
at 
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:570)
at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:895)
at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:171)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
at org.apache.spark.SparkContext.(SparkContext.scala:509)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2320)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
at SparkHiveUDTF.sparkJob(SparkHiveUDTF.java:97)
at SparkHiveUDTF.process(SparkHiveUDTF.java:78)
at 
org.apache.hadoop.hive.ql.exec.UDTFOperator.process(UDTFOperator.java:109)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:841)
at 
org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:88)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:841)
at 
org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:133)
at 
org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:170)
at 
org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:555)
... 18 more






Re: Kafka 0.10 with PySpark

2017-07-05 Thread Saisai Shao
Please see the reason in this thread (
https://github.com/apache/spark/pull/14340). It would better to use
structured streaming instead.

So I would like to -1 this patch. I think it's been a mistake to support
> dstream in Python -- yes it satisfies a checkbox and Spark could claim
> there's support for streaming in Python. However, the tooling and maturity
> for working with streaming data (both in Spark and the more broad
> ecosystem) is simply not there. It is a big baggage to maintain, and
> creates a the wrong impression that production streaming jobs can be
> written in Python.
>

On Tue, Jul 4, 2017 at 10:53 PM, Daniel van der Ende <
daniel.vandere...@gmail.com> wrote:

> Hi,
>
> I'm working on integrating some pyspark code with Kafka. We'd like to use
> SSL/TLS, and so want to use Kafka 0.10. Because structured streaming is
> still marked alpha, we'd like to use Spark streaming. On this page,
> however, it indicates that the Kafka 0.10 integration in Spark does not
> support Python (https://spark.apache.org/docs/latest/streaming-kafka-
> integration.html). I've been trying to figure out why, but have not been
> able to find anything. Is there any particular reason for this?
>
> Thanks,
>
> Daniel
>