Re: spark-submit in mesos cluster mode --jars option not working

2017-05-16 Thread Timothy Chen
Hi Satya,

--jars doesn't work with local files in Mesos Cluster mode doesn't
upload or stage files automatically.

For now you need to put these files in a location that the Driver can access.

Tim


On Tue, May 16, 2017 at 10:17 PM, Satya Narayan1
 wrote:
> creating new thread for this.
>
> Is anyone able to use --jars with spark-submit in mesos  cluster mode.
> We have tried giving local file, hdfs file, file from http server , --jars
> didnt work with any of the approach
>
>
> Saw couple of similar open question with no answer
> http://stackoverflow.com/questions/33978672/spark-mesos-cluster-mode-who-uploads-the-jar
>
>
>  mesos cluster mode  with NO jar upload capability is very limiting.
> wondering anyone has any solution to this.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-in-mesos-cluster-mode-jars-option-not-working-tp28690.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark-submit in mesos cluster mode --jars option not working

2017-05-16 Thread Satya Narayan1
creating new thread for this.

Is anyone able to use --jars with spark-submit in mesos  cluster mode.
We have tried giving local file, hdfs file, file from http server , --jars
didnt work with any of the approach


Saw couple of similar open question with no answer
http://stackoverflow.com/questions/33978672/spark-mesos-cluster-mode-who-uploads-the-jar


 mesos cluster mode  with NO jar upload capability is very limiting.
wondering anyone has any solution to this. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-in-mesos-cluster-mode-jars-option-not-working-tp28690.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark <--> S3 flakiness

2017-05-16 Thread lucas.g...@gmail.com
Steve, thanks for the reply.  Digging through all the documentation now.

Much appreciated!



On 16 May 2017 at 10:10, Steve Loughran  wrote:

>
> On 11 May 2017, at 06:07, lucas.g...@gmail.com wrote:
>
> Hi users, we have a bunch of pyspark jobs that are using S3 for loading /
> intermediate steps and final output of parquet files.
>
>
> Please don't, not without a committer specially written to work against S3
> in the presence of failures.You are at risk of things going wrong and you
> not even noticing.
>
> The only one that I trust to do this right now is;
> https://github.com/rdblue/s3committer
>
>
> see also : https://github.com/apache/spark/blob/master/docs/cloud-
> integration.md
>
>
>
> We're running into the following issues on a semi regular basis:
> * These are intermittent errors, IE we have about 300 jobs that run
> nightly... And a fairly random but small-ish percentage of them fail with
> the following classes of errors.
>
>
> *S3 write errors *
>
>> "ERROR Utils: Aborting task
>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS
>> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS Error
>> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>>
>
>
>> "Py4JJavaError: An error occurred while calling o43.parquet.
>> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
>> Code: 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
>> Error Message: One or more objects could not be deleted, S3 Extended
>> Request ID: null"
>
>
>
>
> *S3 Read Errors: *
>
>> [Stage 1:=>   (27 +
>> 4) / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage
>> 1.0 (TID 11)
>> java.net.SocketException: Connection reset
>> at java.net.SocketInputStream.read(SocketInputStream.java:196)
>> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>> at org.apache.http.impl.io.AbstractSessionInputBuffer.read(
>> AbstractSessionInputBuffer.java:198)
>> at org.apache.http.impl.io.ContentLengthInputStream.read(
>> ContentLengthInputStream.java:178)
>> at org.apache.http.impl.io.ContentLengthInputStream.read(
>> ContentLengthInputStream.java:200)
>> at org.apache.http.impl.io.ContentLengthInputStream.close(
>> ContentLengthInputStream.java:103)
>> at org.apache.http.conn.BasicManagedEntity.streamClosed(
>> BasicManagedEntity.java:168)
>> at org.apache.http.conn.EofSensorInputStream.checkClose(
>> EofSensorInputStream.java:228)
>> at org.apache.http.conn.EofSensorInputStream.close(
>> EofSensorInputStream.java:174)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
>> at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:187)
>
>
>
> We have literally tons of logs we can add but it would make the email
> unwieldy big.  If it would be helpful I'll drop them in a pastebin or
> something.
>
> Our config is along the lines of:
>
>- spark-2.1.0-bin-hadoop2.7
>- '--packages 
> com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
>pyspark-shell'
>
>
> You should have the Hadoop 2.7 JARs on your CP, as s3a on 2.6 wasn't ready
> to play with. In particular, in a close() call it reads to the end of the
> stream, which is a performance killer on large files. That stack trace you
> see is from that same phase of operation, so should go away too.
>
> Hadoop 2.7.3 depends on Amazon SDK 1.7.4; trying to use a different one
> will probably cause link errors.
> http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/2.7.3
>
> Also: make sure Joda time >= 2.8.1 for Java 8
>
> If you go up to 2.8.0, and you still see the errors, file something
> against HADOOP in JIRA
>
>
> Given the stack overflow / googling I've been doing I know we're not the
> only org with these issues but I haven't found a good set of solutions in
> those spaces yet.
>
> Thanks!
>
> Gary Lucas
>
>
>


Re: Not able pass 3rd party jars to mesos executors

2017-05-16 Thread Satya Narayan1
Hi , Is anyone able to use --jars with spark-submit in mesos  cluster mode.

We have tried giving local file, hdfs file, file from http server , --jars
didnt work with any of the approach 


Saw couple of similar open question with no answer 
http://stackoverflow.com/questions/33978672/spark-mesos-cluster-mode-who-uploads-the-jar


  mesos cluster mode  with jar upload capability is very limiting. wondering
anyone has any solution to this. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-pass-3rd-party-jars-to-mesos-executors-tp26918p28689.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: KTable like functionality in structured streaming

2017-05-16 Thread Tathagata Das
Dataframes have the combined functionalities of both KTable and Kstreams.
So I dont quite understand what you mean by querying a Ktable. If you meant
interactively querying a table, then you can put an aggregation streaming
query into memory format sink and complete output mode to have interactive
queries on the aggregated table. Is that what you want?

If not, could you further elaborate your use case?

On May 16, 2017 5:41 PM, "Stephen Fletcher" 
wrote:

> Are there any plans to add Kafka Streams KTable like functionality in
> structured streaming for kafka sources? Allowing querying keyed messages
> using spark sql,maybe calling KTables in the backend
>


Spark Streaming: NullPointerException when restoring Spark Streaming job from hdfs/s3 checkpoint

2017-05-16 Thread Richard Moorhead

Im having some difficulty reliably restoring a streaming job from a checkpoint. 
When restoring a streaming job constructed from the following snippet, I 
receive NullPointerException's when `map` is called on the the restored RDD.


lazy val ssc = StreamingContext.getOrCreate(checkpointDir, 
createStreamingContext _)

private def createStreamingContext: StreamingContext = {
  val ssc = new StreamingContext(spark.sparkContext, batchInterval)
  ssc.checkpoint(checkpointDir)
  consumeStreamingContext(ssc)
  ssc
}

def consumeStreamingContext(ssc: StreamingContext) = {
  //... create dstreams
  val dstream = KinesisUtil.createStream(
  ...

  dstream.checkpoint(batchInterval)

  dstream
.foreachRDD(process)
}

def process(events: RDD[Event]) = {
  if (!events.isEmpty()) {
logger.info("Transforming events for processing")
//rdd seems to support some operations?
logger.info(s"RDD LENGTH: ${events.count}")
//nullpointer exception on call to .map
val df = events.map(e => {
  ...
}

  }
}




. . . . . . . . . . . . . . . . . . . . . . . . . . .

Richard Moorhead
Software Engineer
richard.moorh...@c2fo.com

C2FO: The World's Market for Working Capital®

[http://c2fo.com/wp-content/uploads/sites/1/2016/03/LinkedIN.png] 

 [http://c2fo.com/wp-content/uploads/sites/1/2016/03/YouTube.png]  
 
[http://c2fo.com/wp-content/uploads/sites/1/2016/03/Twitter.png]  
 
[http://c2fo.com/wp-content/uploads/sites/1/2016/03/Googleplus.png]  
 
[http://c2fo.com/wp-content/uploads/sites/1/2016/03/Facebook.png]  
 
[http://c2fo.com/wp-content/uploads/sites/1/2016/03/Forbes-Fintech-50.png] 


The information contained in this message and any attachment may be privileged, 
confidential, and protected from disclosure. If you are not the intended 
recipient, or an employee, or agent responsible for delivering this message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution, or copying of this communication is strictly prohibited. If you 
have received this communication in error, please notify us immediately by 
replying to the message and deleting from your computer.



KTable like functionality in structured streaming

2017-05-16 Thread Stephen Fletcher
Are there any plans to add Kafka Streams KTable like functionality in
structured streaming for kafka sources? Allowing querying keyed messages
using spark sql,maybe calling KTables in the backend


Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread Michael Armbrust
I mean the actual kafka client:


  org.apache.kafka
  kafka-clients
  0.10.0.1



On Tue, May 16, 2017 at 4:29 PM, kant kodali  wrote:

> Hi Michael,
>
> Thanks for the catch. I assume you meant
> *spark-streaming-kafka-0-10_2.11-2.1.0.jar*
>
> I add this in all spark machines under SPARK_HOME/jars.
>
> Still same error seems to persist. Is that the right jar or is there
> anything else I need to add?
>
> Thanks!
>
>
>
> On Tue, May 16, 2017 at 1:40 PM, Michael Armbrust 
> wrote:
>
>> Looks like you are missing the kafka dependency.
>>
>> On Tue, May 16, 2017 at 1:04 PM, kant kodali  wrote:
>>
>>> Looks like I am getting the following runtime exception. I am using
>>> Spark 2.1.0 and the following jars
>>>
>>> *spark-sql_2.11-2.1.0.jar*
>>>
>>> *spark-sql-kafka-0-10_2.11-2.1.0.jar*
>>>
>>> *spark-streaming_2.11-2.1.0.jar*
>>>
>>>
>>> Exception in thread "stream execution thread for [id = 
>>> fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId = 
>>> 7c54940a-e453-41de-b256-049b539b59b1]"
>>>
>>> java.lang.NoClassDefFoundError: 
>>> org/apache/kafka/common/serialization/ByteArrayDeserializer
>>> at 
>>> org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
>>> at 
>>> org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
>>> at 
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>>> at 
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>>> at 
>>> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>>>
>>>
>>> On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 The default "startingOffsets" is "latest". If you don't push any data
 after starting the query, it won't fetch anything. You can set it to
 "earliest" like ".option("startingOffsets", "earliest")" to start the
 stream from the beginning.

 On Tue, May 16, 2017 at 12:36 AM, kant kodali 
 wrote:

> Hi All,
>
> I have the following code.
>
>  val ds = sparkSession.readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers",bootstrapServers))
> .option("subscribe", topicName)
> .option("checkpointLocation", hdfsCheckPointDir)
> .load();
>
>  val ds1 = ds.select($"value")
>  val query = 
> ds1.writeStream.outputMode("append").format("console").start()
>  query.awaitTermination()
>
> There are no errors when I execute this code however I don't see any
> data being printed out to console? When I run my standalone test Kafka
> consumer jar I can see that it is receiving messages. so I am not sure 
> what
> is going on with above code? any ideas?
>
> Thanks!
>


>>>
>>
>


Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread kant kodali
Hi Michael,

Thanks for the catch. I assume you meant
*spark-streaming-kafka-0-10_2.11-2.1.0.jar*

I add this in all spark machines under SPARK_HOME/jars.

Still same error seems to persist. Is that the right jar or is there
anything else I need to add?

Thanks!



On Tue, May 16, 2017 at 1:40 PM, Michael Armbrust 
wrote:

> Looks like you are missing the kafka dependency.
>
> On Tue, May 16, 2017 at 1:04 PM, kant kodali  wrote:
>
>> Looks like I am getting the following runtime exception. I am using Spark
>> 2.1.0 and the following jars
>>
>> *spark-sql_2.11-2.1.0.jar*
>>
>> *spark-sql-kafka-0-10_2.11-2.1.0.jar*
>>
>> *spark-streaming_2.11-2.1.0.jar*
>>
>>
>> Exception in thread "stream execution thread for [id = 
>> fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId = 
>> 7c54940a-e453-41de-b256-049b539b59b1]"
>>
>> java.lang.NoClassDefFoundError: 
>> org/apache/kafka/common/serialization/ByteArrayDeserializer
>> at 
>> org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
>> at 
>> org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>> at 
>> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>>
>>
>> On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> The default "startingOffsets" is "latest". If you don't push any data
>>> after starting the query, it won't fetch anything. You can set it to
>>> "earliest" like ".option("startingOffsets", "earliest")" to start the
>>> stream from the beginning.
>>>
>>> On Tue, May 16, 2017 at 12:36 AM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I have the following code.

  val ds = sparkSession.readStream()
 .format("kafka")
 .option("kafka.bootstrap.servers",bootstrapServers))
 .option("subscribe", topicName)
 .option("checkpointLocation", hdfsCheckPointDir)
 .load();

  val ds1 = ds.select($"value")
  val query = ds1.writeStream.outputMode("append").format("console").start()
  query.awaitTermination()

 There are no errors when I execute this code however I don't see any
 data being printed out to console? When I run my standalone test Kafka
 consumer jar I can see that it is receiving messages. so I am not sure what
 is going on with above code? any ideas?

 Thanks!

>>>
>>>
>>
>


Re: s3 bucket access/read file

2017-05-16 Thread jazzed
How did you solve the problem with V4?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/s3-bucket-access-read-file-tp23536p28688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Restful API Spark Application

2017-05-16 Thread Debasish Das
You can run l
On May 15, 2017 3:29 PM, "Nipun Arora"  wrote:

> Thanks all for your response. I will have a look at them.
>
> Nipun
>
> On Sat, May 13, 2017 at 2:38 AM vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> It's in scala but it should be portable in java
>> https://github.com/vgkowski/akka-spark-experiments
>>
>>
>> Le 12 mai 2017 10:54 PM, "Василец Дмитрий"  a
>> écrit :
>>
>> and livy https://hortonworks.com/blog/livy-a-rest-interface-for-
>> apache-spark/
>>
>> On Fri, May 12, 2017 at 10:51 PM, Sam Elamin 
>> wrote:
>> > Hi Nipun
>> >
>> > Have you checked out the job servwr
>> >
>> > https://github.com/spark-jobserver/spark-jobserver
>> >
>> > Regards
>> > Sam
>> > On Fri, 12 May 2017 at 21:00, Nipun Arora 
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> We have written a java spark application (primarily uses spark sql). We
>> >> want to expand this to provide our application "as a service". For
>> this, we
>> >> are trying to write a REST API. While a simple REST API can be easily
>> made,
>> >> and I can get Spark to run through the launcher. I wonder, how the
>> spark
>> >> context can be used by service requests, to process data.
>> >>
>> >> Are there any simple JAVA examples to illustrate this use-case? I am
>> sure
>> >> people have faced this before.
>> >>
>> >>
>> >> Thanks
>> >> Nipun
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>


Cannot create parquet with snappy output for hive external table

2017-05-16 Thread Dhimant
Hi Group,

I am not able to load data into external hive table which is partitioned.

Trace :-

1. create external table test(id int, name string) stored as parquet
location 'hdfs://testcluster/user/abc/test' tblproperties
('PARQUET.COMPRESS'='SNAPPY');

2.Spark code

   val spark =
SparkSession.builder().enableHiveSupport().config("hive.exec.dynamic.partition",
"true")
  .config("hive.exec.dynamic.partition.mode",
"nonstrict").getOrCreate()
spark.sql("use default").show
val rdd = sc.parallelize(Seq((1, "one"), (2, "two")))
val df = spark.createDataFrame(rdd).toDF("id", "name")
df.write.mode(SaveMode.Overwrite).insertInto("test")

3. I can see few snappy.parquet files.

4. create external table test(id int) partitioned by  (name string)  stored
as parquet location 'hdfs://testcluster/user/abc/test' tblproperties
('PARQUET.COMPRESS'='SNAPPY');

5.Spark code

   val spark =
SparkSession.builder().enableHiveSupport().config("hive.exec.dynamic.partition",
"true")
  .config("hive.exec.dynamic.partition.mode",
"nonstrict").getOrCreate()
spark.sql("use default").show
val rdd = sc.parallelize(Seq((1, "one"), (2, "two")))
val df = spark.createDataFrame(rdd).toDF("id", "name")
df.write.mode(SaveMode.Overwrite).insertInto("test")

6. I see uncompressed files without snappy.parquet extension.
parquet-tools.jar also confirms that this is uncompressed parquet file.

7.i tried following options as well, but no luck

df.write.mode(SaveMode.Overwrite).format("parquet").option("compression",
"snappy").insertInto("test")


Thanks in advance.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-create-parquet-with-snappy-output-for-hive-external-table-tp28687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread Michael Armbrust
Looks like you are missing the kafka dependency.

On Tue, May 16, 2017 at 1:04 PM, kant kodali  wrote:

> Looks like I am getting the following runtime exception. I am using Spark
> 2.1.0 and the following jars
>
> *spark-sql_2.11-2.1.0.jar*
>
> *spark-sql-kafka-0-10_2.11-2.1.0.jar*
>
> *spark-streaming_2.11-2.1.0.jar*
>
>
> Exception in thread "stream execution thread for [id = 
> fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId = 
> 7c54940a-e453-41de-b256-049b539b59b1]"
>
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/serialization/ByteArrayDeserializer
> at 
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
> at 
> org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
> at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>
>
> On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> The default "startingOffsets" is "latest". If you don't push any data
>> after starting the query, it won't fetch anything. You can set it to
>> "earliest" like ".option("startingOffsets", "earliest")" to start the
>> stream from the beginning.
>>
>> On Tue, May 16, 2017 at 12:36 AM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I have the following code.
>>>
>>>  val ds = sparkSession.readStream()
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers",bootstrapServers))
>>> .option("subscribe", topicName)
>>> .option("checkpointLocation", hdfsCheckPointDir)
>>> .load();
>>>
>>>  val ds1 = ds.select($"value")
>>>  val query = ds1.writeStream.outputMode("append").format("console").start()
>>>  query.awaitTermination()
>>>
>>> There are no errors when I execute this code however I don't see any
>>> data being printed out to console? When I run my standalone test Kafka
>>> consumer jar I can see that it is receiving messages. so I am not sure what
>>> is going on with above code? any ideas?
>>>
>>> Thanks!
>>>
>>
>>
>


Documentation on "Automatic file coalescing for native data sources"?

2017-05-16 Thread Daniel Siegmann
When using spark.read on a large number of small files, these are
automatically coalesced into fewer partitions. The only documentation I can
find on this is in the Spark 2.0.0 release notes, where it simply says (
http://spark.apache.org/releases/spark-release-2-0-0.html):

"Automatic file coalescing for native data sources"

Can anyone point me to documentation explaining what triggers this feature,
how it decides how many partitions to coalesce to, and what counts as a
"native data source"? I couldn't find any mention of this feature in the
SQL Programming Guide and Google was not helpful.

--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread kant kodali
Looks like I am getting the following runtime exception. I am using Spark
2.1.0 and the following jars

*spark-sql_2.11-2.1.0.jar*

*spark-sql-kafka-0-10_2.11-2.1.0.jar*

*spark-streaming_2.11-2.1.0.jar*


Exception in thread "stream execution thread for [id =
fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId =
7c54940a-e453-41de-b256-049b539b59b1]"

java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer
at 
org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)


On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> The default "startingOffsets" is "latest". If you don't push any data
> after starting the query, it won't fetch anything. You can set it to
> "earliest" like ".option("startingOffsets", "earliest")" to start the
> stream from the beginning.
>
> On Tue, May 16, 2017 at 12:36 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> I have the following code.
>>
>>  val ds = sparkSession.readStream()
>> .format("kafka")
>> .option("kafka.bootstrap.servers",bootstrapServers))
>> .option("subscribe", topicName)
>> .option("checkpointLocation", hdfsCheckPointDir)
>> .load();
>>
>>  val ds1 = ds.select($"value")
>>  val query = ds1.writeStream.outputMode("append").format("console").start()
>>  query.awaitTermination()
>>
>> There are no errors when I execute this code however I don't see any data
>> being printed out to console? When I run my standalone test Kafka consumer
>> jar I can see that it is receiving messages. so I am not sure what is going
>> on with above code? any ideas?
>>
>> Thanks!
>>
>
>


Spark Streaming 2.1 recovery

2017-05-16 Thread Dominik Safaric
Hi,

currently I am exploring Spark’s fault tolerance capabilities in terms of fault 
recovery. Namely I run a Spark 2.1 standalone cluster on a master and four 
worker nodes. The application pulls data using the Kafka direct stream API from 
a Kafka topic over a (sliding) window of time, and writes transformed data back 
to another topic. During this process, using a bash script I randomly kill a 
Worker process with an expectation of getting insight onto RDD recovery using 
the log4j logs written by the Driver. However, expect of messages describing 
that a Worker has been lost, I cannot find any traces indicating that Spark is 
recovery the data lost by the killed Worker. Hence, I have the following 
questions: 

If using stateless transformations such as windowing does Spark checkpoint the 
data blocks or just the RDDs metadata? 
If not, is the state recovered from memory of a Worker to which the data has 
been replicated or just using the HDFS checkpoints? 
If Spark checkpoints and recovers the metadata only, how are exactly-once 
processing semantics achieved? I refer to processing semantics, and not output 
semantics, as the later would require storing the data into a transactional 
data store. 
Using Write Ahead Logs, would Spark recover the data from them in parallel 
instead of re-pulling the messages from Kafka? 

Thanks in advance for the clarification,
Dominik

Re: How does preprocessing fit into Spark MLlib pipeline

2017-05-16 Thread Adrian Stern
Hey Liang and Yan,

Been super busy, and just now getting back to this problem.

I've been thinking a bit more about it, and it still feels like using the
group by functionality even in a SQL transform is incorrect, and doesn't
follow the transform pattern.  It doesn't seem to fit the extractor pattern
either.

Reasoning:

Since my transforms/extractions would reduce the number of rows and remove
some of the columns, some information in the data frame is lost.  The
problem is that output data frame from a transform/extraction on the raw
data can not be used as the input of the next transform/extractor.
Basically, it doesn't simply add a column to the data frame, it reduces it
and adds a column.

Ex:
data = [
("id1", "purchase", "ios", 1),
("id1", "rental", "ios", 1),
("id1", "purchase", "android", 2),
("id2", "purchase", "android", 1),
("id2", "rental", "android", 2),
("id2", "rental", "android", 3),
("id3", "rental", "android", 1)
]
columns = ["id", "type", "env", "total"]
raw_df = spark.createDataFrame(data, columns)
raw_df.cache()
raw_df.show()
+---++---+-+
| id|type|env|total|
+---++---+-+
|id1|purchase|ios|1|
|id1|  rental|ios|1|
|id1|purchase|android|2|
|id2|purchase|android|1|
|id2|  rental|android|2|
|id2|  rental|android|3|
|id3|  rental|android|1|
+---++---+-+

purchase_df = raw_df.filter(raw_df.type == "purchase")
max_transform = purchase_df.groupby("id",
"env").agg(pyspark.sql.functions.max("total").alias("max_total"))
max_transform = max_transform.groupby("id").sum("max_total")
max_transform.show()
+---+--+
| id|sum(max_total)|
+---+--+
|id1| 3|
|id2| 1|
+---+--+

type_transform_ = raw_df.groupby("id").pivot("type").count()
type_transform_.show()
+---++--+
| id|purchase|rental|
+---++--+
|id3|null| 1|
|id1|   2| 1|
|id2|   1| 2|
+---++--+

env_transform = raw_df.groupby("id").pivot("env").count()
env_transform.show()
+---+---++
| id|android| ios|
+---+---++
|id3|  1|null|
|id1|  1|   2|
|id2|  3|null|
+---+---++

Here, I can't use the max_transform data frame as input for the
type_transform.
Since the output from one transform can't be used by the next, it doesn't
work in a pipeline.

It also feels unnatural that the size of the data frame from each transform
could be different depending on the data. This happens because we filter to
specific events for some of the features. e.g. max_transform ends up with 2
rows, while the others end up with 3.

Am I missing a clean way to do this in a pipeline?
Maybe there is a way to reduce the data, in the beginning, to make it so I
can use transforms.  Maybe by grouping all the events together in a list
per user.  Keep in mind in my actual use case I have more than the 4
columns, it's more like 10-50 columns in the raw df.

Currently, I'm thinking that I'm just going to create my own custom
pipeline for these data preprocessing steps.

Thanks again
Adrian


On Mon, Mar 20, 2017 at 10:33 PM, 颜发才(Yan Facai) 
wrote:

> SQLTransformer is a good solution if all operators are combined with SQL.
>
> By the way,
> if you like to get hands dirty,
> writing a Transformer in scala is not hard,
> and multiple output columns is valid in such case.
>
>
>
>
> On Fri, Mar 17, 2017 at 9:10 PM, Yanbo Liang  wrote:
>
>> Hi Adrian,
>>
>> Did you try SQLTransformer? Your preprocessing steps are SQL operations
>> and can be handled by SQLTransformer in MLlib pipeline scope.
>>
>> Thanks
>> Yanbo
>>
>> On Thu, Mar 9, 2017 at 11:02 AM, aATv  wrote:
>>
>>> I want to start using PySpark Mllib pipelines, but I don't understand
>>> how/where preprocessing fits into the pipeline.
>>>
>>> My preprocessing steps are generally in the following form:
>>>1) Load log files(from s3) and parse into a spark Dataframe with
>>> columns
>>> user_id, event_type, timestamp, etc
>>>2) Group by a column, then pivot and count another column
>>>   - e.g. df.groupby("user_id").pivot("event_type").count()
>>>   - We can think of the columns that this creates besides user_id as
>>> features, where the number of each event type is a different feature
>>>3) Join the data from step 1 with other metadata, usually stored in
>>> Cassandra. Then perform a transformation similar to one from step 2),
>>> where
>>> the column that is pivoted and counted is a column that came from the
>>> data
>>> stored in Cassandra.
>>>
>>> After this preprocessing, I would use transformers to create other
>>> features
>>> and feed it into a model, lets say Logistic Regression for example.
>>>
>>> I would like to make at lease step 2 a custom transformer and add that
>>> to a
>>> pipeline, but it doesn't fit the transformer abstraction. This is
>>> because it
>>> takes a single 

How to replay stream between 2 offsets?

2017-05-16 Thread ranjitreddy

I'm using Spark(2.1.1)  streaming as Consumer for Kakfa messages.

I've a data pipeline where I continuously (hourly) save offsets to  HBase.
So that I can replay stream from certain offset in case of errors in data
transformations.

I'm able to start the stream at a certain offset, but don't know of a way to
stop the stream after I reach the end offset. I'm using the following API:

Map replayOffset = new HashMap<>();
replayOffsets.put(new TopicPartition(topicPrefix, 0), offsets[0]);

List topicPartitionList = new ArrayList<>();
topicPartitionList.add(new TopicPartition(topicPrefix, 0));

JavaInputDStream> messages =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Assign(topicPartitionList,
kafkaParams, replayOffset)
);

I would like to know if there is an API or a solution for this use case.

Thanks,

-Ranjit





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-stream-between-2-offsets-tp28685.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread Shixiong(Ryan) Zhu
The default "startingOffsets" is "latest". If you don't push any data after
starting the query, it won't fetch anything. You can set it to "earliest"
like ".option("startingOffsets", "earliest")" to start the stream from the
beginning.

On Tue, May 16, 2017 at 12:36 AM, kant kodali  wrote:

> Hi All,
>
> I have the following code.
>
>  val ds = sparkSession.readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers",bootstrapServers))
> .option("subscribe", topicName)
> .option("checkpointLocation", hdfsCheckPointDir)
> .load();
>
>  val ds1 = ds.select($"value")
>  val query = ds1.writeStream.outputMode("append").format("console").start()
>  query.awaitTermination()
>
> There are no errors when I execute this code however I don't see any data
> being printed out to console? When I run my standalone test Kafka consumer
> jar I can see that it is receiving messages. so I am not sure what is going
> on with above code? any ideas?
>
> Thanks!
>


Re: Spark <--> S3 flakiness

2017-05-16 Thread Steve Loughran

On 11 May 2017, at 06:07, lucas.g...@gmail.com 
wrote:

Hi users, we have a bunch of pyspark jobs that are using S3 for loading / 
intermediate steps and final output of parquet files.

Please don't, not without a committer specially written to work against S3 in 
the presence of failures.You are at risk of things going wrong and you not even 
noticing.

The only one that I trust to do this right now is; 
https://github.com/rdblue/s3committer


see also : https://github.com/apache/spark/blob/master/docs/cloud-integration.md



We're running into the following issues on a semi regular basis:
* These are intermittent errors, IE we have about 300 jobs that run nightly... 
And a fairly random but small-ish percentage of them fail with the following 
classes of errors.

S3 write errors

"ERROR Utils: Aborting task
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS 
Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS Error 
Message: Not Found, S3 Extended Request ID: BlaBlahEtc="

"Py4JJavaError: An error occurred while calling o43.parquet.
: com.amazonaws.services.s3.model.MultiObjectDeleteException: Status Code: 0, 
AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS Error 
Message: One or more objects could not be deleted, S3 Extended Request ID: null"


S3 Read Errors:

[Stage 1:=>   (27 + 4) / 
31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage 1.0 (TID 
11)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
at sun.security.ssl.InputRecord.read(InputRecord.java:509)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
at 
org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
at 
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at 
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:200)
at 
org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:103)
at 
org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:168)
at 
org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
at 
org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:174)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:187)


We have literally tons of logs we can add but it would make the email unwieldy 
big.  If it would be helpful I'll drop them in a pastebin or something.

Our config is along the lines of:

  *   spark-2.1.0-bin-hadoop2.7
  *   '--packages 
com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 
pyspark-shell'

You should have the Hadoop 2.7 JARs on your CP, as s3a on 2.6 wasn't ready to 
play with. In particular, in a close() call it reads to the end of the stream, 
which is a performance killer on large files. That stack trace you see is from 
that same phase of operation, so should go away too.

Hadoop 2.7.3 depends on Amazon SDK 1.7.4; trying to use a different one will 
probably cause link errors.
http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/2.7.3

Also: make sure Joda time >= 2.8.1 for Java 8

If you go up to 2.8.0, and you still see the errors, file something against 
HADOOP in JIRA


Given the stack overflow / googling I've been doing I know we're not the only 
org with these issues but I haven't found a good set of solutions in those 
spaces yet.

Thanks!

Gary Lucas



RE: Spark SQL DataFrame to Kafka Topic

2017-05-16 Thread Revin Chalil
Thanks Michael, that worked, appreciate your help.

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Monday, May 15, 2017 11:45 AM
To: Revin Chalil 
Cc: User 
Subject: Re: Spark SQL DataFrame to Kafka Topic

The foreach sink from that blog post requires that you have a DataFrame with 
two columns in the form of a Tuple2, (String, String), where as your dataframe 
has only a single column `payload`.  You could change the KafkaSink to extend 
ForeachWriter[KafkaMessage] and then it would work.

I'd also suggest you just try the native 
KafkaSink
 that is part of Spark 
2.2.

On Sun, May 14, 2017 at 9:31 AM, Revin Chalil 
> wrote:
Hi TD / Michael,


I am trying to use the foreach sink to write to Kafka and followed 
this
 from DBricks blog by Sunil 
Sitaula . I get the below 
with DF.writeStream.foreach(writer).outputMode("update").start() when using a 
simple DF

Type mismatch, expected: foreachWriter[Row], actual: KafkaSink

Cannot resolve reference foreach with such signature



Below is the snippet

val data = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KafkaBroker)
  .option("subscribe", InTopic)
  .load()
  .select($"value".as[Array[Byte]])
  .flatMap(d => {
var events = AvroHelper.readEvents(d)
events.map((event: HdfsEvent) => {
  var payload = EventPayloadParser.read(event.getPayload)
  new KafkaMessage(payload)
})
  })



case class KafkaMessage(
  payload: String)



This is where I use the foreach

val writer = new KafkaSink("kafka-topic", KafkaBroker)
val query = data.writeStream.foreach(writer).outputMode("update").start()



In this case, it shows –

Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink

Cannot resolve reference foreach with such signature



Any help is much appreciated. Thank you.


From: Tathagata Das 
[mailto:tathagata.das1...@gmail.com]
Sent: Friday, January 13, 2017 3:31 PM
To: Koert Kuipers >
Cc: Peyman Mohajerian >; Senthil 
Kumar >; User 
>; 
senthilec...@apache.org
Subject: Re: Spark SQL DataFrame to Kafka Topic

Structured Streaming has a foreach sink, where you can essentially do what you 
want with your data. Its easy to create a Kafka producer, and write the data 
out to kafka.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers 
> wrote:
how do you do this with structured streaming? i see no mention of writing to 
kafka

On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
> wrote:
Yes, it is called Structured Streaming: 
https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
> wrote:
Hi Team ,

 Sorry if this question already asked in this forum..

Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??

Here is my Code which Reads Parquet File :


val sqlContext = new org.apache.spark.sql.SQLContext(sc);

val df = sqlContext.read.parquet("/temp/*.parquet")

df.registerTempTable("beacons")



I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve 
this ??



Cheers,

Senthil






Spark streaming app leaking memory?

2017-05-16 Thread Srikanth
Hi,

I have a Spark streaming(Spark 2.1.0) app where I see these logs in
executor. Does this point to some memory leak?

17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size =
67108864 bytes, TID = 7752
17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size =
67108864 bytes, TID = 7740
17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size =
67108864 bytes, TID = 7764
17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size =
67108864 bytes, TID = 7728
17/05/16 15:12:02 WARN Executor: 1 block locks were not released by TID = 7783:
[rdd_1_15]
17/05/16 15:12:02 WARN Executor: 1 block locks were not released by TID = 7807:
[rdd_1_39]

I notice that "Managed memory leak" logs are not seen when I use G1GC.


Srikanth


Re: [WARN] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

2017-05-16 Thread Steve Loughran

On 10 May 2017, at 13:40, Mendelson, Assaf 
> wrote:

Hi all,
When running spark I get the following warning: [WARN] 
org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Now I know that in general it is possible to ignore this warning, however, it 
means that utilities that catch “WARN” in the log keep flagging this.
I saw many answers to handling this (e.g. 
http://stackoverflow.com/questions/30369380/hadoop-unable-to-load-native-hadoop-library-for-your-platform-error-on-docker,
 
http://stackoverflow.com/questions/19943766/hadoop-unable-to-load-native-hadoop-library-for-your-platform-warning,http://stackoverflow.com/questions/40015416/spark-unable-to-load-native-hadoop-library-for-your-platform),
 however, I am unable to solve this on my local machine.
Specifically, I can’t find any such solution for windows (i.e. when running 
developer local builds) or on a centos 7 machine with no HDFS (basically it is 
a single node machine which uses spark standalone for testing).


Log4J is your friend. I usually have (at least)

log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN

if you are working on Windows though, you do actually need the native libraries 
an winutils.exe on your path, or things won't work

Any help would be appreciated.

Thanks,
  Assaf.



Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread kant kodali
This isn't structured Streaming right

On Tue, May 16, 2017 at 4:15 AM, Didac Gil  wrote:

> From what I know, you would have to iterate on each RDD. When you are
> reading from the Stream, Spark actually collects the data as a miniRDD for
> each period of time.
>
> I hope this helps.
>
> ds.foreachRDD{ rdd =>
>
>   val newNames = Seq(“Field1”,"Field2”,"Field3")
>   val mydataDF = rdd.toDF(newNames: _*)
>
>   mydataDF.createOrReplaceTempView(“myTempTable")
>   // Do word count on DataFrame using SQL and print it
>   val wordCountsDataFrame = spark.sql("select *, now() as TStamp from 
> myTempTable")
>   wordCountsDataFrame.write.mode(mode).save(output)
>   val lines = wordCountsDataFrame.count().toInt
> //  wordCountsDataFrame.show(20, false)
>   println("Total entries in this batch: "+lines)
>
> }
>
> On 16 May 2017, at 09:36, kant kodali  wrote:
>
> Hi All,
>
> I have the following code.
>
>  val ds = sparkSession.readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers",bootstrapServers))
> .option("subscribe", topicName)
> .option("checkpointLocation", hdfsCheckPointDir)
> .load();
>
>  val ds1 = ds.select($"value")
>  val query = ds1.writeStream.outputMode("append").format("console").start()
>  query.awaitTermination()
>
> There are no errors when I execute this code however I don't see any data
> being printed out to console? When I run my standalone test Kafka consumer
> jar I can see that it is receiving messages. so I am not sure what is going
> on with above code? any ideas?
>
> Thanks!
>
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com
> Spain: +34 696 285 544 <+34%20696%2028%2055%2044>
> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
> Skype: didac.gil.de.la.iglesia
>
>


Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread Didac Gil
From what I know, you would have to iterate on each RDD. When you are reading 
from the Stream, Spark actually collects the data as a miniRDD for each period 
of time.

I hope this helps.
ds.foreachRDD{ rdd =>
  val newNames = Seq(“Field1”,"Field2”,"Field3")
  val mydataDF = rdd.toDF(newNames: _*)
  mydataDF.createOrReplaceTempView(“myTempTable")
  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = spark.sql("select *, now() as TStamp from 
myTempTable")
  wordCountsDataFrame.write.mode(mode).save(output)
  val lines = wordCountsDataFrame.count().toInt
//  wordCountsDataFrame.show(20, false)
  println("Total entries in this batch: "+lines)
}

> On 16 May 2017, at 09:36, kant kodali  wrote:
> 
> Hi All,
> 
> I have the following code.
> 
>  val ds = sparkSession.readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers",bootstrapServers))
> .option("subscribe", topicName)
> .option("checkpointLocation", hdfsCheckPointDir)
> .load();
>  val ds1 = ds.select($"value")
>  val query = ds1.writeStream.outputMode("append").format("console").start()
>  query.awaitTermination()
> There are no errors when I execute this code however I don't see any data 
> being printed out to console? When I run my standalone test Kafka consumer 
> jar I can see that it is receiving messages. so I am not sure what is going 
> on with above code? any ideas?
> 
> Thanks!

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread kant kodali
Hi All,

I have the following code.

 val ds = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers",bootstrapServers))
.option("subscribe", topicName)
.option("checkpointLocation", hdfsCheckPointDir)
.load();

 val ds1 = ds.select($"value")
 val query = ds1.writeStream.outputMode("append").format("console").start()
 query.awaitTermination()

There are no errors when I execute this code however I don't see any data
being printed out to console? When I run my standalone test Kafka consumer
jar I can see that it is receiving messages. so I am not sure what is going
on with above code? any ideas?

Thanks!


Re: Is GraphX really deprecated?

2017-05-16 Thread Sergey Zhemzhitsky
GraphFrames seems promising but it still has a lot of algorithms, which involve
in one way or another GraphX, or run on top of GraphX according to github
repo (
https://github.com/graphframes/graphframes/tree/master/src/main/scala/org/graphframes/lib),
and in case of RDDs and semistructured data it's not really necessary to
include another library that just will delegate to GraphX, which is still
shipped with Spark as the default graph-processing module.

Also doesn't Pregel-like programming abstraction of GraphX (although it is
on top of RDD joins) seem to be more natural than a number of join steps of
GraphFrames? I believe such an abstraction wouldn't hurt GraphFrames too.



On May 14, 2017 19:07, "Jules Damji"  wrote:

GraphFrames is not part of Spark Core as is Structured Streaming; it's
still open-source and part of Spark packages. But I anticipate that as it
becomes more at parity with all GraphX in algorithms & functionality, it's
not unreasonable to anticipate its inevitable wide adoption and preference.

To get a flavor have a go at it https://databricks.com/blog
/2016/03/03/introducing-graphframes.html

Cheers
Jules

Sent from my iPhone
Pardon the dumb thumb typos :)

On May 13, 2017, at 2:01 PM, Jacek Laskowski  wrote:

Hi,

I'd like to hear the official statement too.

My take on GraphX and Spark Streaming is that they are long dead projects
with GraphFrames and Structured Streaming taking their place, respectively.

Jacek

On 13 May 2017 3:00 p.m., "Sergey Zhemzhitsky"  wrote:

> Hello Spark users,
>
> I just would like to know whether the GraphX component should be
> considered deprecated and no longer actively maintained
> and should not be considered when starting new graph-processing projects
> on top of Spark in favour of other
> graph-processing frameworks?
>
> I'm asking because
>
> 1. According to some discussions in GitHub pull requests, there are
> thoughts that GraphX is not under active development and
> can probably be deprecated soon.
>
> https://github.com/apache/spark/pull/15125
>
> 2. According to Jira activities GraphX component seems to be not very
> active and quite a lot of improvements are
> resolved as "Won't fix" event with pull requests provided.
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20S
> PARK%20AND%20component%20%3D%20GraphX%20AND%20resolution%
> 20in%20(%22Unresolved%22%2C%20%22Won%27t%20Fix%22%2C%20%22Wo
> n%27t%20Do%22%2C%20Later%2C%20%22Not%20A%20Bug%22%2C%20%
> 22Not%20A%20Problem%22)%20ORDER%20BY%20created%20DESC
>
> So, I'm wondering what the community who uses GraphX, and commiters who
> develop it think regarding this Spark component?
>
> Kind regards,
> Sergey
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>