Re: Announcing Delta Lake 0.2.0

2019-06-21 Thread Michael Armbrust
>
> Thanks for confirmation. We are using the workaround to create a separate
> Hive external table STORED AS PARQUET with the exact location of Delta
> table. Our use case is batch-driven and we are running VACUUM with 0
> retention after every batch is completed. Do you see any potential problem
> with this workaround, other than during the time when the batch is running
> the table can provide some wrong information?
>

This is a reasonable workaround to allow other systems to read Delta
tables. Another consideration is that if you are running on S3, eventual
consistency my increase the amount of time before external readers see a
consistent view. Also note, that this prevents you from using time travel.

In the near future, I think we should also support generating manifest
files that list the data files in the most recent version of the Delta
table (see #76  for details).
This will give support for Presto, though Hive would require some
additional modifications on the Hive side (if there are any Hive
contributors / committers on this list let me know!).

In the longer term, we are talking with authors of other engines to build
native support for reading the Delta transaction log (e.g. this
announcement from Starburst
).
Please contact me if you are interested in contributing here!


Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

2018-05-11 Thread Michael Armbrust
Hmm yeah that does look wrong.  Would be great if someone opened a PR to
correct the docs :)

On Thu, May 10, 2018 at 5:13 PM Yuta Morisawa 
wrote:

> The problem is solved.
> The actual schema of Kafka message is different from documentation.
>
>
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
> The documentation says the format of "timestamp" column is Long type,
> but the actual format is timestamp.
>
>
> The followings are my code and result to check schema.
>
> -code
> val df = spark
>.read
>.format("kafka")
>.option("kafka.bootstrap.servers", bootstrapServers)
>.option(subscribeType, topics)
>.load()
>.printSchema()
>
> -result
> root
>   |-- key: binary (nullable = true)
>   |-- value: binary (nullable = true)
>   |-- topic: string (nullable = true)
>   |-- partition: integer (nullable = true)
>   |-- offset: long (nullable = true)
>   |-- timestamp: timestamp (nullable = true)
>   |-- timestampType: integer (nullable = true)
>
>
> Regards,
> Yuta
>
> On 2018/05/09 16:14, Yuta Morisawa wrote:
> > Hi All
> >
> > I'm trying to extract Kafka-timestamp from Kafka topics.
> >
> > The timestamp does not contain milli-seconds information,
> > but it should contain because ConsumerRecord class of Kafka 0.10
> > supports milli-second timestamp.
> >
> > How can I get milli-second timestamp from Kafka topics?
> >
> >
> > These are websites I refer to.
> >
> >
> https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
> >
> >
> >
> https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html
> >
> >
> >
> > And this is my code.
> > 
> > val df = spark
> >.readStream
> >.format("kafka")
> >.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
> >.option("subscribe", "topic1,topic2")
> >.load()
> >.selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
> >.as[(Long, String)]
> > 
> >
> > Regards,
> > Yuta
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Michael Armbrust
You can calculate argmax using a struct.

df.groupBy($"id").agg(max($"my_timestamp",
struct($"*").as("data")).getField("data").select($"data.*")

You could transcode this to SQL, it'll just be complicated nested queries.


On Wed, Apr 18, 2018 at 3:40 PM, kant kodali  wrote:

> Hi Arun,
>
> I want to select the entire row with the max timestamp for each group. I
> have modified my data set below to avoid any confusion.
>
> *Input:*
>
> id | amount | my_timestamp
> ---
> 1  |  5 |  2018-04-01T01:00:00.000Z
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 1  |  6 |  2018-04-01T01:20:00.000Z
> 2  | 30 |  2018-04-01T01:25:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> *Expected Output:*
>
> id | amount | my_timestamp
> ---
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> Looking for a streaming solution using either raw sql like 
> sparkSession.sql("sql
> query") or similar to raw sql but not something like mapGroupWithState
>
> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan  wrote:
>
>> Cant the “max” function used here ? Something like..
>>
>> stream.groupBy($"id").max("amount").writeStream.outputMode(“
>> complete”/“update")….
>>
>> Unless the “stream” is already a grouped stream, in which case the above
>> would not work since the support for multiple aggregate operations is not
>> there yet.
>>
>> Thanks,
>> Arun
>>
>> From: kant kodali 
>> Date: Tuesday, April 17, 2018 at 11:41 AM
>> To: Tathagata Das 
>> Cc: "user @spark" 
>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>
>> Hi TD,
>>
>> Thanks for that. The only reason I ask is I don't see any alternative
>> solution to solve the problem below using raw sql.
>>
>>
>> How to select the max row for every group in spark structured streaming
>> 2.3.0 without using order by since it requires complete mode or
>> mapGroupWithState?
>>
>> *Input:*
>>
>> id | amount | my_timestamp
>> ---
>> 1  |  5 |  2018-04-01T01:00:00.000Z
>> 1  | 10 |  2018-04-01T01:10:00.000Z
>> 2  | 20 |  2018-04-01T01:20:00.000Z
>> 2  | 30 |  2018-04-01T01:25:00.000Z
>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>
>> *Expected Output:*
>>
>> id | amount | my_timestamp
>> ---
>> 1  | 10 |  2018-04-01T01:10:00.000Z
>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>
>> Looking for a streaming solution using either raw sql like 
>> sparkSession.sql("sql
>> query") or similar to raw sql but not something like mapGroupWithState
>>
>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Unfortunately no. Honestly it does not make sense as for type-aware
>>> operations like map, mapGroups, etc., you have to provide an actual JVM
>>> function. That does not fit in with the SQL language structure.
>>>
>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:
>>>
 Hi All,

 can we use mapGroupsWithState in raw SQL? or is it in the roadmap?

 Thanks!



>>>
>>
>


Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

2018-03-20 Thread Michael Armbrust
Those options will not affect structured streaming.  You are looking for

.option("maxOffsetsPerTrigger", "1000")

We are working on improving this by building a generic mechanism into the
Streaming DataSource V2 so that the engine can do admission control on the
amount of data returned in a source independent way.

On Tue, Mar 20, 2018 at 2:58 PM, kant kodali  wrote:

> I am using spark 2.3.0 and Kafka 0.10.2.0 so I assume structured streaming
> using Direct API's although I am not sure? If it is direct API's the only
> parameters that are relevant are below according to this
> 
> article
>
>- spark.conf("spark.streaming.backpressure.enabled", "true")
>- spark.conf("spark.streaming.kafka.maxRatePerPartition", "1")
>
> I set both of these and I run select count * on my 10M records I still
> don't see any output until it finishes the initial batch of 10M and this
> takes a while. so I am wondering if I miss something here?
>
> On Tue, Mar 20, 2018 at 6:09 AM, Geoff Von Allmen 
> wrote:
>
>> The following
>>  
>> settings
>> may be what you’re looking for:
>>
>>- spark.streaming.backpressure.enabled
>>- spark.streaming.backpressure.initialRate
>>- spark.streaming.receiver.maxRate
>>- spark.streaming.kafka.maxRatePerPartition
>>
>> ​
>>
>> On Mon, Mar 19, 2018 at 5:27 PM, kant kodali  wrote:
>>
>>> Yes it indeed makes sense! Is there a way to get incremental counts when
>>> I start from 0 and go through 10M records? perhaps count for every micro
>>> batch or something?
>>>
>>> On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen <
>>> ge...@ibleducation.com> wrote:
>>>
 Trigger does not mean report the current solution every 'trigger
 seconds'. It means it will attempt to fetch new data and process it no
 faster than trigger seconds intervals.

 If you're reading from the beginning and you've got 10M entries in
 kafka, it's likely pulling everything down then processing it completely
 and giving you an initial output. From here on out, it will check kafka
 every 1 second for new data and process it, showing you only the updated
 rows. So the initial read will give you the entire output since there is
 nothing to be 'updating' from. If you add data to kafka now that the
 streaming job has completed it's first batch (and leave it running), it
 will then show you the new/updated rows since the last batch every 1 second
 (assuming it can fetch + process in that time span).

 If the combined fetch + processing time is > the trigger time, you will
 notice warnings that it is 'falling behind' (I forget the exact verbiage,
 but something to the effect of the calculation took XX time and is falling
 behind). In that case, it will immediately check kafka for new messages and
 begin processing the next batch (if new messages exist).

 Hope that makes sense -


 On Mon, Mar 19, 2018 at 13:36 kant kodali  wrote:

> Hi All,
>
> I have 10 million records in my Kafka and I am just trying to
> spark.sql(select count(*) from kafka_view). I am reading from kafka and
> writing to kafka.
>
> My writeStream is set to "update" mode and trigger interval of one
> second (Trigger.ProcessingTime(1000)). I expect the counts to be
> printed every second but looks like it would print after going through all
> 10M. why?
>
> Also, it seems to take forever whereas Linux wc of 10M rows would take
> 30 seconds.
>
> Thanks!
>

>>>
>>
>


Re: [Structured Streaming] Deserializing avro messages from kafka source using schema registry

2018-02-09 Thread Michael Armbrust
This isn't supported yet, but there is on going work at spark-avro
 to enable this use
case.  Stay tuned.

On Fri, Feb 9, 2018 at 3:07 PM, Bram  wrote:

> Hi,
>
> I couldn't find any documentation about avro message deserialization using
> pyspark structured streaming. My aim is using confluent schema registry to
> get per topic schema then parse the avro messages with it.
>
> I found one but it was using DirectStream approach
> https://stackoverflow.com/questions/30339636/spark-
> python-avro-kafka-deserialiser
>
> Can anyone show me how?
>
> Thanks
>
> Regards,
>
> Abraham
>


Re: [Structured Streaming] Commit protocol to move temp files to dest path only when complete, with code

2018-02-09 Thread Michael Armbrust
We didn't go this way initially because it doesn't work on storage systems
that have weaker guarantees than HDFS with respect to rename.  That said,
I'm happy to look at other options if we want to make this configurable.



On Fri, Feb 9, 2018 at 2:53 PM, Dave Cameron 
wrote:

> Hi
>
>
> I have a Spark structured streaming job that reads from Kafka and writes
> parquet files to Hive/HDFS. The files are not very large, but the Kafka
> source is noisy so each spark job takes a long time to complete. There is a
> significant window during which the parquet files are incomplete and other
> tools, including PrestoDB, encounter errors while trying to read them.
>
> I wrote this list and stackoverflow about the problem last summer:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> Structured-Streaming-truncated-Parquet-after-driver-crash-or-kill-tt29043.
> html
> https://stackoverflow.com/questions/47337030/not-a-
> parquet-file-too-small-from-presto-during-spark-structured-streaming-r/
> 47339805#47339805
>
> After hesitating for a while, I wrote a custom commit protocol to solve
> the problem. It combines HadoopMapReduceCommitProtocol's behavior of
> writing to a temp file first, with ManifestFileCommitProtocol. From what
> I can tell ManifestFileCommitProtocol is required for the normal Structured
> Streaming behavior of being able to resume streams from a known point.
>
> I think this commit protocol could be generally useful. Writing to a temp
> file and moving it to the final location is low cost on HDFS and is the
> standard behavior for non-streaming jobs, as implemented in
> HadoopMapReduceCommitProtocol. At the same time ManifestFileCommitProtocol
> is needed for structured streaming jobs. We have been running this for a
> few months in production without problems.
>
> Here is the code (at the moment not up to Spark standards, admittedly):
> https://github.com/davcamer/spark/commit/361f1c69851f0f94cfd974ce720c69
> 4407f9340b
>
> Did I miss a better approach? Does anyone else think this would be useful?
>
> Thanks for reading,
> Dave
>
>
>


Re: Prefer Structured Streaming over Spark Streaming (DStreams)?

2018-01-31 Thread Michael Armbrust
At this point I recommend that new applications are built using structured
streaming. The engine was GA-ed as of Spark 2.2 and I know of several very
large (trillions of records) production jobs that are running in Structured
Streaming.  All of our production pipelines at databricks are written using
structured streaming as well.

Regarding the comparison with RDDs: The situation here is different than
when thinking about batch DataFrames vs. RDDs.  DataFrames are "just" a
higher-level abstraction on RDDs.  Structured streaming is a completely new
implementation that does not use DStreams at all, but instead directly runs
jobs using RDDs.  The advantages over DStreams include:
 - The ability to start and stop individual queries (rather than needing to
start/stop a separate StreamingContext)
 - The ability to upgrade your stream and still start from an existing
checkpoint
 - Support for working with Spark SQL data sources (json, parquet, etc)
 - Higher level APIs (DataFrames and SQL) and lambda functions (Datasets)
 - Support for event time aggregation

At this point, with the addition of mapGroupsWithState and
flatMapGroupsWithState, I think we should be at feature parity with
DStreams (and the state store does incremental checkpoints that are more
efficient than the DStream store).  However if there are applications you
are having a hard time porting over, please let us know!

On Wed, Jan 31, 2018 at 5:42 AM, vijay.bvp  wrote:

> here is my two cents, experts please correct me if wrong
>
> its important to understand why one over other and for what kind of use
> case. There might be sometime in future where low level API's are
> abstracted
> and become legacy but for now in Spark RDD API is the core and low level
> API, all higher APIs translate to RDD ultimately,  and RDD's are immutable.
>
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#unsupported-operations
> these are things that are not supported and this list needs to be validated
> with the use case you have.
>
> From my experience Structured Streaming is still new and DStreams API is a
> matured API.
> some things that are missing or need to explore more.
>
> watermarking/windowing based on no of records in a particular window
>
> assuming you have watermark and windowing on event time of the data,  the
> resultant dataframe is grouped data set, only thing you can do is run
> aggregate functions. you can't simply use that output as another dataframe
> and manipulate. There is a custom aggregator but I feel its limited.
>
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#arbitrary-stateful-operations
> There is option to do stateful operations, using GroupState where the
> function gets iterator of events for that window. This is the closest
> access
> to StateStore a developer could get.
> This arbitrary state that programmer could keep across invocations has its
> limitations as such how much state we could keep?, is that state stored in
> driver memory? What happens if the spark job fails is this checkpointed or
> restored?
>
> thanks
> Vijay
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Max number of streams supported ?

2018-01-31 Thread Michael Armbrust
-dev +user


> Similarly for structured streaming, Would there be any limit on number of
> of streaming sources I can have ?
>

There is no fundamental limit, but each stream will have a thread on the
driver that is doing coordination of execution.  We comfortably run 20+
streams on a single cluster in production, but I have not pushed the
limits.  You'd want to test with your specific application.


Re: Dataset API inconsistencies

2018-01-10 Thread Michael Armbrust
I wrote Datasets, and I'll say I only use them when I really need to (i.e.
when it would be very cumbersome to express what I am trying to do
relationally).  Dataset operations are almost always going to be slower
than their DataFrame equivalents since they usually require materializing
objects (where as DataFrame operations usually generate code that operates
directly on binary encoded data).

We certainly could flesh out the API further (e.g. add orderBy that takes a
lambda function), but so far I have not seen a lot of demand for this, and
it would be strictly slower than the DataFrame version. I worry this
wouldn't actually be beneficial to users as it would give them a choice
that looks the same but has performance implications that are non-obvious.
If I'm in the minority though with this opinion, we should do it.

Regarding the concerns about type-safety, I haven't really found that to be
a major issue.  Even though you don't have type safety from the scala
compiler, the Spark SQL analyzer is checking your query before any
execution begins. This opinion is perhaps biased by the fact that I do a
lot of Spark SQL programming in notebooks where the difference between
"compile-time" and "runtime" is pretty minimal.

On Wed, Jan 10, 2018 at 1:45 AM, Alex Nastetsky 
wrote:

> I am finding using the Dataset API to be very cumbersome to use, which is
> unfortunate, as I was looking forward to the type-safety after coming from
> a Dataframe codebase.
>
> This link summarizes my troubles: http://loicdescotte.
> github.io/posts/spark2-datasets-type-safety/
>
> The problem is having to continuously switch back and forth between typed
> and untyped semantics, which really kills productivity. In contrast, the
> RDD API is consistently typed and the Dataframe API is consistently
> untyped. I don't have to continuously stop and think about which one to use
> for each operation.
>
> I gave the Frameless framework (mentioned in the link) a shot, but
> eventually started running into oddities and lack of enough documentation
> and community support and did not want to sink too much time into it.
>
> At this point I'm considering just sticking with Dataframes, as I don't
> really consider Datasets to be usable. Has anyone had a similar experience
> or has had better luck?
>
> Alex.
>


Re: Spark error while trying to spark.read.json()

2017-12-19 Thread Michael Armbrust
- dev

java.lang.AbstractMethodError almost always means that you have different
libraries on the classpath than at compilation time.  In this case I would
check to make sure you have the correct version of Scala (and only have one
version of scala) on the classpath.

On Tue, Dec 19, 2017 at 5:42 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> Can anyone help me with below error,
>
> Exception in thread "main" java.lang.AbstractMethodError
> at scala.collection.TraversableLike$class.filterNot(TraversableLike.
> scala:278)
> at org.apache.spark.sql.types.StructType.filterNot(StructType.scala:98)
> at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:386)
> at org.spark.jsonDF.StructStreamKafkaToDF$.getValueSchema(
> StructStreamKafkaToDF.scala:22)
> at org.spark.jsonDF.StructStreaming$.createRowDF(StructStreaming.scala:21)
> at SparkEntry$.delayedEndpoint$SparkEntry$1(SparkEntry.scala:22)
> at SparkEntry$delayedInit$body.apply(SparkEntry.scala:7)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
> at scala.runtime.AbstractFunction0.apply$mcV$
> sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at SparkEntry$.main(SparkEntry.scala:7)
> at SparkEntry.main(SparkEntry.scala)
>
> This happening, when i try to pass Dataset[String] containing jsons to
> spark.read.json(Records).
>
> Regards,
> Satyajit.
>


Re: Kafka version support

2017-11-30 Thread Michael Armbrust
Oh good question.  I was saying that the stock structured streaming
connector should be able to talk to 0.11 or 1.0 brokers.

On Thu, Nov 30, 2017 at 1:12 PM, Cody Koeninger  wrote:

> Are you talking about the broker version, or the kafka-clients artifact
> version?
>
> On Thu, Nov 30, 2017 at 12:17 AM, Raghavendra Pandey
>  wrote:
> > Just wondering if anyone has tried spark structured streaming kafka
> > connector (2.2) with Kafka 0.11 or Kafka 1.0 version
> >
> > Thanks
> > Raghav
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Kafka version support

2017-11-30 Thread Michael Armbrust
I would expect that to work.

On Wed, Nov 29, 2017 at 10:17 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> Just wondering if anyone has tried spark structured streaming kafka
> connector (2.2) with Kafka 0.11 or Kafka 1.0 version
>
> Thanks
> Raghav
>


Re: Generate windows on processing time in Spark Structured Streaming

2017-11-10 Thread Michael Armbrust
Hmmm, we should allow that.  current_timestamp() is acutally deterministic
within any given batch.  Could you open a JIRA ticket?

On Fri, Nov 10, 2017 at 1:52 AM, wangsan  wrote:

> Hi all,
>
> How can I use current processing time to generate windows in streaming
> processing?
> *window* function's Scala doc says "For a streaming query, you may use
> the function current_timestamp to generate windows on processing time.”
>  But when using current_timestamp as column in window function, exceptions
> occurred.
>
> Here are my code:
>
> val socketDF = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", )
>   .load()
>
> socketDF.createOrReplaceTempView("words")
> val windowedCounts = spark.sql(
>   """
> |SELECT value as word, current_timestamp() as time, count(1) as count 
> FROM words
> |   GROUP BY window(time, "5 seconds"), word
>   """.stripMargin)
>
> windowedCounts
>   .writeStream
>   .outputMode("complete")
>   .format("console")
>   .start()
>   .awaitTermination()
>
> And here are Exception Info:
> *Caused by: org.apache.spark.sql.AnalysisException: nondeterministic
> expressions are only allowed in*
> *Project, Filter, Aggregate or Window, *found:
>
>
>
>


Re: [Spark Structured Streaming] Changing partitions of (flat)MapGroupsWithState

2017-11-08 Thread Michael Armbrust
The relevant config is spark.sql.shuffle.partitions.  Note that once you
start a query, this number is fixed.  The config will only affect queries
starting from an empty checkpoint.

On Wed, Nov 8, 2017 at 7:34 AM, Teemu Heikkilä  wrote:

> I have spark structured streaming job and I'm crunching through few
> terabytes of data.
>
> I'm using file stream reader and it works flawlessly, I can adjust the
> partitioning of that with spark.default.parallelism
>
> However I'm doing sessionization for the data after loading it and I'm
> currently locked with just 200 partitions for that stage. I've tried to
> repartition before and after the stateful map but it just adds new stage
> and so it's not very useful
>
> Changing spark.sql.shuffle.partitions doesn't affect the count either.
>
> val sessions = streamingSource // -> spark.default.parallelism defined
> amount of partitions/tasks (ie. 2000)
>  .repartition(n) // -> n partitions/tasks
>  .groupByKey(event => event.sessid) // -> stage opens / fixed 200 tasks
>  .flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.
> EventTimeTimeout())(SessionState.updateSessionEvents) // -> fixed 200
> tasks / stage closes
>
>
> I tried to grep through spark source code but couldn’t find that param
> anywhere.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Structured Stream equivalent of reduceByKey

2017-11-06 Thread Michael Armbrust
Hmmm, I see.  You could output the delta using flatMapGroupsWithState
<https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroupsWithState-org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction-org.apache.spark.sql.streaming.OutputMode-org.apache.spark.sql.Encoder-org.apache.spark.sql.Encoder-org.apache.spark.sql.streaming.GroupStateTimeout->
probably.

On Thu, Oct 26, 2017 at 10:11 PM, Piyush Mukati <piyush.muk...@gmail.com>
wrote:

> Thanks, Michael
> I have explored Aggregator
> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>
>  with
> update mode. The problem is it will give the overall aggregated value for
> the changed. while I only want the delta change in the group as the
> aggregation we are doing at sink level too.
>
> Below is the plan generated with count Aggregator.
>
> *HashAggregate
> StateStoreSave
> *HashAggregate,
> StateStoreRestore
> *HashAggregate,
> Exchange
> *HashAggregate,
> *Project
> StreamingRelation
>
> we are looking for some aggregation which will avoid state
> store interactions.
>
> Also anyone aware of any design doc or some example about how we can add
> new operation on dataSet and corresponding physical plan.
>
>
>
> On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> - dev
>>
>> I think you should be able to write an Aggregator
>> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>.
>> You probably want to run in update mode if you are looking for it to output
>> any group that has changed in the batch.
>>
>> On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <piyush.muk...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> we are migrating some jobs from Dstream to Structured Stream.
>>>
>>> Currently to handle aggregations we call map and reducebyKey on each RDD
>>> like
>>> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>>>
>>> The final output of each RDD is merged to the sink with support for
>>> aggregation at the sink( Like co-processor at HBase ).
>>>
>>> In the new DataSet API, I am not finding any suitable API to aggregate
>>> over the micro-batch.
>>> Most of the aggregation API uses state-store and provide global
>>> aggregations. ( with append mode it does not give the change in existing
>>> buckets )
>>> Problems we are suspecting are :
>>>  1) state-store is tightly linked to the job definitions. while in our
>>> case we want may edit the job while keeping the older calculated aggregate
>>> as it is.
>>>
>>> The desired result can be achieved with below dataset APIs.
>>> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) =>
>>> merge(valueItr))
>>> while on observing the physical plan it does not call any merge before
>>> sort.
>>>
>>>  Anyone aware of API or other workarounds to get the desired result?
>>>
>>
>>
>


Re: Structured Stream equivalent of reduceByKey

2017-10-26 Thread Michael Armbrust
- dev

I think you should be able to write an Aggregator
.
You probably want to run in update mode if you are looking for it to output
any group that has changed in the batch.

On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati 
wrote:

> Hi,
> we are migrating some jobs from Dstream to Structured Stream.
>
> Currently to handle aggregations we call map and reducebyKey on each RDD
> like
> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>
> The final output of each RDD is merged to the sink with support for
> aggregation at the sink( Like co-processor at HBase ).
>
> In the new DataSet API, I am not finding any suitable API to aggregate
> over the micro-batch.
> Most of the aggregation API uses state-store and provide global
> aggregations. ( with append mode it does not give the change in existing
> buckets )
> Problems we are suspecting are :
>  1) state-store is tightly linked to the job definitions. while in our
> case we want may edit the job while keeping the older calculated aggregate
> as it is.
>
> The desired result can be achieved with below dataset APIs.
> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr))
> while on observing the physical plan it does not call any merge before
> sort.
>
>  Anyone aware of API or other workarounds to get the desired result?
>


Re: Implement Dataset reader from SEQ file with protobuf to Dataset

2017-10-08 Thread Michael Armbrust
spark-avro  would be a good
example to start with.

On Sun, Oct 8, 2017 at 3:00 AM, Serega Sheypak 
wrote:

> Hi, did anyone try to implement Spark SQL dataset reader from SEQ file
> with protobuf inside to Dataset?
>
> Imagine I have protobuf def
> Person
>  - name: String
>  - lastName: String
> - phones: List[String]
>
> and generated scala case class:
> case class Person(name:String, lastName: String, phones: List[String])
>
> I want to write some component that gives me Dataset with types schema.
>
> val personsDataset = spark.read
>   .option("inferSchema", "true")[Person]
>
> Where can I take a look at references?
>


Re: Chaining Spark Streaming Jobs

2017-09-18 Thread Michael Armbrust
You specify the schema when loading a dataframe by calling
spark.read.schema(...)...

On Tue, Sep 12, 2017 at 4:50 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Hi Michael,
>
> I am wondering what I am doing wrong. I get error like:
>
> Exception in thread "main" java.lang.IllegalArgumentException: Schema
> must be specified when creating a streaming source DataFrame. If some files
> already exist in the directory, then depending on the file format you may
> be able to create a static DataFrame on that directory with
> 'spark.read.load(directory)' and infer schema from it.
> at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(
> DataSource.scala:223)
> at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$
> lzycompute(DataSource.scala:87)
> at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(
> DataSource.scala:87)
> at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(
> StreamingRelation.scala:30)
> at org.apache.spark.sql.streaming.DataStreamReader.
> load(DataStreamReader.scala:125)
> at org.apache.spark.sql.streaming.DataStreamReader.
> load(DataStreamReader.scala:134)
> at com.aol.persist.UplynkAggregates$.aggregator(
> UplynkAggregates.scala:23)
> at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
> at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(
> AppMain.java:144)
> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook
>
>
> I tried specifying the schema as well.
> Here is my code:
>
> object Aggregates {
>
>   val aggregation=
> """select sum(col1), sum(col2), id, first(name)
>   from enrichedtb
>   group by id
> """.stripMargin
>
>   def aggregator(conf:Config)={
> implicit val spark = 
> SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
> implicit val sqlctx = spark.sqlContext
> printf("Source path is" + conf.getString("source.path"))
> val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // 
> Added this as it was complaining about schema.
> val df=spark.readStream.format("parquet").option("inferSchema", 
> true).schema(schemadf.schema).load(conf.getString("source.path"))
> df.createOrReplaceTempView("enrichedtb")
> val res = spark.sql(aggregation)
> 
> res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
>   }
>
>   def main(args: Array[String]): Unit = {
> val mainconf = ConfigFactory.load()
> val conf = mainconf.getConfig(mainconf.getString("pipeline"))
> print(conf.toString)
> aggregator(conf)
>   }
>
> }
>
>
> I tried to extract schema from static read of the input path and provided it 
> to the readStream API. With that, I get this error:
>
> at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>   at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)
>
> While running on the EMR cluster all paths point to S3. In my laptop, they 
> all point to local filesystem.
>
> I am using Spark2.2.0
>
> Appreciate your help.
>
> regards
>
> Sunita
>
>
> On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> If you use structured streaming and the file sink, you can have a
>> subsequent stream read using the file source.  This will maintain exactly
>> once processing even if there are hiccups or failures

Re: [SS] Any way to optimize memory consumption of SS?

2017-09-14 Thread Michael Armbrust
How many UUIDs do you expect to have in a day?  That is likely where all
the memory is being used.  Does it work without that?

On Tue, Sep 12, 2017 at 8:42 PM, 张万新 <kevinzwx1...@gmail.com> wrote:

> *Yes, my code is shown below(I also post my code in another mail)*
> /**
> * input
> */
>   val logs = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", BROKER_SERVER)
> .option("subscribe", TOPIC)
> .option("startingOffset", "latest")
> .load()
>
>   /**
> * process
> */
>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>
>   val events = logValues
> .map(parseFunction)
> .select(
>   $"_1".alias("date").cast("timestamp"),
>   $"_2".alias("uuid").cast("string")
> )
>
>   val results = events
> .withWatermark("date", "1 day")
> .dropDuplicates("uuid", "date")
> .groupBy($"date")
> .count()
>
>   /**
> * output
> */
>   val query = results
> .writeStream
> .outputMode("update")
> .format("console")
> .option("truncate", "false")
> .trigger(Trigger.ProcessingTime("1 seconds"))
> .start()
>
>   query.awaitTermination()
>
> *and I use play json to parse input logs from kafka ,the parse function is
> like*
>
>   def parseFunction(str: String): (Long, String) = {
> val json = Json.parse(str)
> val timestamp = (json \ "time").get.toString().toLong
> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
> val uuid = (json \ "uuid").get.toString()
> (date, uuid)
>   }
>
> and the java heap space is like (I've increase the executor memory to 15g):
>
> [image: image.png]
> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:23写道:
>
>> Can you show the full query you are running?
>>
>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm using structured streaming to count unique visits of our website. I
>>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
>>> memory to 4 cores * 10g memory for each executor, but there are frequent
>>> full gc, and once the count raises to about more than 4.5 millions the
>>> application will be blocked and finally crash in OOM. It's kind of
>>> unreasonable. So is there any suggestion to optimize the memory consumption
>>> of SS? Thanks.
>>>
>>
>>


Re: [SS]How to add a column with custom system time?

2017-09-14 Thread Michael Armbrust
Can you show the explain() for the version that doesn't work?  You might
just be hitting a bug.

On Tue, Sep 12, 2017 at 9:03 PM, 张万新 <kevinzwx1...@gmail.com> wrote:

> It seems current_timestamp() cannot be used directly in window function?
> because after attempts I found that using
>
> *df.count.withColumn("pTime", current_timestamp).select(window($"pTime",
> "15 minutes"), $"count")*
>
> instead of
>
> *df.count.withColumn("window", window(current_timestamp(), "15 minutes"))*
>
> throws no exception and works fine. I don't know if this is a problem that
> needs improvement.
>
>
> 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:43写道:
>
>> and I use .withColumn("window", window(current_timestamp(), "15
>> minutes")) after count
>>
>> 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道:
>>
>>> *Yes, my code is shown below*
>>> /**
>>> * input
>>> */
>>>   val logs = spark
>>> .readStream
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", BROKER_SERVER)
>>> .option("subscribe", TOPIC)
>>> .option("startingOffset", "latest")
>>> .load()
>>>
>>>   /**
>>> * process
>>> */
>>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>>
>>>   val events = logValues
>>> .map(parseFunction)
>>> .select(
>>>   $"_1".alias("date").cast("timestamp"),
>>>   $"_2".alias("uuid").cast("string")
>>> )
>>>
>>>   val results = events
>>> .withWatermark("date", "1 day")
>>> .dropDuplicates("uuid", "date")
>>> .groupBy($"date")
>>> .count()
>>> .withColumn("window", window(current_timestamp(), "15 minutes"))
>>>
>>>   /**
>>> * output
>>>     */
>>>   val query = results
>>> .writeStream
>>> .outputMode("update")
>>> .format("console")
>>> .option("truncate", "false")
>>> .trigger(Trigger.ProcessingTime("1 seconds"))
>>> .start()
>>>
>>>   query.awaitTermination()
>>>
>>> *and I use play json to parse input logs from kafka ,the parse function
>>> is like*
>>>
>>>   def parseFunction(str: String): (Long, String) = {
>>> val json = Json.parse(str)
>>> val timestamp = (json \ "time").get.toString().toLong
>>> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>>> val uuid = (json \ "uuid").get.toString()
>>> (date, uuid)
>>>   }
>>>
>>> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道:
>>>
>>>> Can you show all the code?  This works for me.
>>>>
>>>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>
>>>>> The spark version is 2.2.0
>>>>>
>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:
>>>>>
>>>>>> Which version of spark?
>>>>>>
>>>>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for reply, but using this method I got an exception:
>>>>>>>
>>>>>>> "Exception in thread "main" 
>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException:
>>>>>>> nondeterministic expressions are only allowed in
>>>>>>>
>>>>>>> Project, Filter, Aggregate or Window"
>>>>>>>
>>>>>>> Can you give more advice?
>>>>>>>
>>>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>>>>>>
>>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>>
>>>>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>>>>>
>>>>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> In structured streaming how can I add a column to a dataset with
>>>>>>>>> current system time aligned with 15 minutes?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>


Re: [Structured Streaming] Multiple sources best practice/recommendation

2017-09-14 Thread Michael Armbrust
I would probably suggest that you partition by format (though you can get
the file name from the build in function input_file_name()).  You can load
multiple streams from different directories and union them together as long
as the schema is the same after parsing.  Otherwise you can just run
multiple streams on the same cluster.

On Wed, Sep 13, 2017 at 7:56 AM, JG Perrin  wrote:

> Hi,
>
>
>
> I have different files being dumped on S3, I want to ingest them and join
> them.
>
>
>
> What does sound better to you? Have one “ directory” for all or one per
> file format?
>
>
>
> If I have one directory for all, can you get some metadata about the file,
> like its name?
>
>
>
> If multiple directory, how can I have multiple “listeners”?
>
>
>
> Thanks
>
>
>
> jg
> --
>
> This electronic transmission and any documents accompanying this
> electronic transmission contain confidential information belonging to the
> sender. This information may contain confidential health information that
> is legally privileged. The information is intended only for the use of the
> individual or entity named above. The authorized recipient of this
> transmission is prohibited from disclosing this information to any other
> party unless required to do so by law or regulation and is required to
> delete or destroy the information after its stated need has been fulfilled.
> If you are not the intended recipient, you are hereby notified that any
> disclosure, copying, distribution or the taking of any action in reliance
> on or regarding the contents of this electronically transmitted information
> is strictly prohibited. If you have received this E-mail in error, please
> notify the sender and delete this message immediately.
>


Re: [SS]How to add a column with custom system time?

2017-09-12 Thread Michael Armbrust
Can you show all the code?  This works for me.

On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote:

> The spark version is 2.2.0
>
> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:
>
>> Which version of spark?
>>
>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>
>>> Thanks for reply, but using this method I got an exception:
>>>
>>> "Exception in thread "main" 
>>> org.apache.spark.sql.streaming.StreamingQueryException:
>>> nondeterministic expressions are only allowed in
>>>
>>> Project, Filter, Aggregate or Window"
>>>
>>> Can you give more advice?
>>>
>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>>
>>>> import org.apache.spark.sql.functions._
>>>>
>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>
>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> In structured streaming how can I add a column to a dataset with
>>>>> current system time aligned with 15 minutes?
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>


Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread Michael Armbrust
Can you show the full query you are running?

On Tue, Sep 12, 2017 at 10:11 AM, 张万新  wrote:

> Hi,
>
> I'm using structured streaming to count unique visits of our website. I
> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
> memory to 4 cores * 10g memory for each executor, but there are frequent
> full gc, and once the count raises to about more than 4.5 millions the
> application will be blocked and finally crash in OOM. It's kind of
> unreasonable. So is there any suggestion to optimize the memory consumption
> of SS? Thanks.
>


Re: [SS]How to add a column with custom system time?

2017-09-11 Thread Michael Armbrust
Which version of spark?

On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:

> Thanks for reply, but using this method I got an exception:
>
> "Exception in thread "main" 
> org.apache.spark.sql.streaming.StreamingQueryException:
> nondeterministic expressions are only allowed in
>
> Project, Filter, Aggregate or Window"
>
> Can you give more advice?
>
> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>
>> import org.apache.spark.sql.functions._
>>
>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>
>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> In structured streaming how can I add a column to a dataset with current
>>> system time aligned with 15 minutes?
>>>
>>> Thanks.
>>>
>>
>>


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-11 Thread Michael Armbrust
The following will convert the whole row to JSON.

import org.apache.spark.sql.functions.*
df.select(to_json(struct(col("*"

On Sat, Sep 9, 2017 at 6:27 PM, kant kodali  wrote:

> Thanks Ryan! In this case, I will have Dataset so is there a way to
> convert Row to Json string?
>
> Thanks
>
> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> It's because "toJSON" doesn't support Structured Streaming. The current
>> implementation will convert the Dataset to an RDD, which is not supported
>> by streaming queries.
>>
>> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali  wrote:
>>
>>> yes it is a streaming dataset. so what is the problem with following
>>> code?
>>>
>>> Dataset ds = dataset.toJSON().map(()->{some function that returns a 
>>> string});
>>>  StreamingQuery query = ds.writeStream().start();
>>>  query.awaitTermination();
>>>
>>>
>>> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung 
>>> wrote:
>>>
 What is newDS?
 If it is a Streaming Dataset/DataFrame (since you have writeStream
 there) then there seems to be an issue preventing toJSON to work.

 --
 *From:* kant kodali 
 *Sent:* Saturday, September 9, 2017 4:04:33 PM
 *To:* user @spark
 *Subject:* Queries with streaming sources must be executed with
 writeStream.start()

 Hi All,

 I  have the following code and I am not sure what's wrong with it? I
 cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
 2.2.0 so I am wondering if there is any work around?

  Dataset ds = newDS.toJSON().map(()->{some function that returns a 
 string});
  StreamingQuery query = ds.writeStream().start();
  query.awaitTermination();


>>>
>>
>


Re: Need some Clarification on checkpointing w.r.t Spark Structured Streaming

2017-09-11 Thread Michael Armbrust
Checkpoints record what has been processed for a specific query, and as
such only need to be defined when writing (which is how you "start" a
query).

You can use the DataFrame created with readStream to start multiple
queries, so it wouldn't really make sense to have a single checkpoint there.

On Mon, Sep 11, 2017 at 2:36 AM, kant kodali  wrote:

> Hi All,
>
> I was wondering if we need to checkpoint both read and write streams when
> reading from Kafka and inserting into a target store?
>
> for example
>
> sparkSession.readStream().option("checkpointLocation", "hdfsPath").load()
>
> vs
>
> dataSet.writeStream().option("checkpointLocation", "hdfsPath")
>
> Thanks!
>


Re: [SS]How to add a column with custom system time?

2017-09-11 Thread Michael Armbrust
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新  wrote:

> Hi,
>
> In structured streaming how can I add a column to a dataset with current
> system time aligned with 15 minutes?
>
> Thanks.
>


Re: Joining 2 dataframes, getting result as nested list/structure in dataframe

2017-08-23 Thread Michael Armbrust
You can create a nested struct that contains multiple columns using
struct().

Here's a pretty complete guide on working with nested data:
https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

On Wed, Aug 23, 2017 at 2:30 PM, JG Perrin  wrote:

> Hi folks,
>
>
>
> I am trying to join 2 dataframes, but I would like to have the result as a
> list of rows of the right dataframe (dDf in the example) in a column of the
> left dataframe (cDf in the example). I made it work with *one column*,
> but having issues adding more columns/creating a row(?).
>
> Seq joinColumns = new Set2<>("c1", "c2").toSeq();
>
> Dataset allDf = cDf.join(dDf, joinColumns, "inner");
>
> allDf.printSchema();
>
> allDf.show();
>
>
>
> Dataset aggDf = allDf.groupBy(cDf.col("c1"), cDf.col("c2"))
>
> .agg(collect_list(col("c50")));
>
> aggDf.show();
>
>
>
> Output:
>
> ++---+---+
>
> |c1  |c2 |collect_list(c50)  |
>
> ++---+---+
>
> |3744|1160242| [6, 5, 4, 3, 2, 1]|
>
> |3739|1150097|[1]|
>
> |3780|1159902|[5, 4, 3, 2, 1]|
>
> | 132|1200743|   [4, 3, 2, 1]|
>
> |3778|1183204|[1]|
>
> |3766|1132709|[1]|
>
> |3835|1146169|[1]|
>
> ++---+---+
>
>
>
> Thanks,
>
>
>
> jg
>
>
> --
>
> This electronic transmission and any documents accompanying this
> electronic transmission contain confidential information belonging to the
> sender. This information may contain confidential health information that
> is legally privileged. The information is intended only for the use of the
> individual or entity named above. The authorized recipient of this
> transmission is prohibited from disclosing this information to any other
> party unless required to do so by law or regulation and is required to
> delete or destroy the information after its stated need has been fulfilled.
> If you are not the intended recipient, you are hereby notified that any
> disclosure, copying, distribution or the taking of any action in reliance
> on or regarding the contents of this electronically transmitted information
> is strictly prohibited. If you have received this E-mail in error, please
> notify the sender and delete this message immediately.
>


Re: Chaining Spark Streaming Jobs

2017-08-23 Thread Michael Armbrust
If you use structured streaming and the file sink, you can have a
subsequent stream read using the file source.  This will maintain exactly
once processing even if there are hiccups or failures.

On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
wrote:

> Hello Spark Experts,
>
> I have a design question w.r.t Spark Streaming. I have a streaming job
> that consumes protocol buffer encoded real time logs from a Kafka cluster
> on premise. My spark application runs on EMR (aws) and persists data onto
> s3. Before I persist, I need to strip header and convert protobuffer to
> parquet (I use sparksql-scalapb to convert from Protobuff to
> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
> enrichment on the same dataframe after persisting the raw data, however, in
> order to modularize I am planning to have a separate job which picks up the
> raw data and performs enrichment on it. Also,  I am trying to avoid all in
> 1 job as the enrichments could get project specific while raw data
> persistence stays customer/project agnostic.The enriched data is allowed to
> have some latency (few minutes)
>
> My challenge is, after persisting the raw data, how do I chain the next
> streaming job. The only way I can think of is -  job 1 (raw data)
> partitions on current date (MMDD) and within current date, the job 2
> (enrichment job) filters for records within 60s of current time and
> performs enrichment on it in 60s batches.
> Is this a good option? It seems to be error prone. When either of the jobs
> get delayed due to bursts or any error/exception this could lead to huge
> data losses and non-deterministic behavior . What are other alternatives to
> this?
>
> Appreciate any guidance in this regard.
>
> regards
> Sunita Koppar
>


Re: Question on how to get appended data from structured streaming

2017-08-20 Thread Michael Armbrust
What is your end goal?  Right now the foreach writer is the way to do
arbitrary processing on the data produced by various output modes.

On Sun, Aug 20, 2017 at 12:23 PM, Yanpeng Lin  wrote:

> Hello,
>
> I am new to Spark.
> It would be appreciated if anyone could help me understand how to get
> appended data from structured streaming. According to the document
> ,
> data stream could be treated as new rows appended to unbounded table. I
> want to know besides writing out data to external storage to get appended
> data only at every time, is there any other way to get appended data? like
> from memory directly.
>
> Here is my case. I had a Kafka source keeping publish data to Spark with
> `test` topic.
>
> val source = spark.readStream.format("kafka")
>  .option("kafka.bootstrap.servers",
> "broker:9092")
>  .option("subscribe", "test")\
>  .load()
>
> I tried that write stream with format `memory` like the following:
>
> val query = source.writeStream.format("memory")
>   .trigger(ProcessingTime("3 seconds"))
>   .queryName("tests").outputMode
> (OutputMode.Append).start()
> spark.sql("select topic, value from tests")
> The result table `tests` contains all data from the beginning of stream.
> like
>
> Trigger Time, Topic, Value
> t1 test,   1
> t1 test,   2
> t2 test,   3
> t3 test,   4
>
> By appended data I mean only the delta data after each trigger. For
> example, after trigger time t1, rows of value 1 and 2 are newly appended.
> After trigger time t2, row of value 3 will be treated as newly appended.
> And after t3, row of value 4 could be fetched as newly appended.
> I understand each appended data could be processed using `ForeachWriter`,
> but if I want to fetch all newly appended data after any trigger time,
> is there any way to do that directly from dataframe?
>
> Thanks!
> Yanpeng
>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Michael Armbrust
See
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

Though I think that this currently doesn't work with the console sink.

On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep 
wrote:

> Hi,
>
>>
>> I'm trying to restart a streaming query to refresh cached data frame
>>
>> Where and how should I restart streaming query
>>
>
>
> val sparkSes = SparkSession
>
>   .builder
>
>   .config("spark.master", "local")
>
>   .appName("StreamingCahcePoc")
>
>   .getOrCreate()
>
>
>
> import sparkSes.implicits._
>
>
>
> val dataDF = sparkSes.readStream
>
>   .schema(streamSchema)
>
>   .csv("testData")
>
>
>
>
>
>val query = counts.writeStream
>
>   .outputMode("complete")
>
>   .format("console")
>
>   .start()
>
>
> query.awaittermination()
>
>
>
>>
>>
>>


Re: Question about 'Structured Streaming'

2017-08-08 Thread Michael Armbrust
>
> 1) Parsing data/Schema creation: The Bro IDS logs have a 8 line header
> that contains the 'schema' for the data, each log http/dns/etc will have
> different columns with different data types. So would I create a specific
> CSV reader inherited from the general one?  Also I'm assuming this would
> need to be in Scala/Java? (I suck at both of those :)
>

This is a good question. What I have seen others do is actually run
different streams for the different log types.  This way you can customize
the schema to the specific log type.

Even without using Scala/Java you could also use the text data source
(assuming the logs are new line delimited) and then write the parser for
each line in python.  There will be a performance penalty here though.


> 2) Dynamic Tailing: Does the CSV/TSV data sources support dynamic tailing
> and handle log rotations?
>

The file based sources work by tracking which files have been processed and
then scanning (optionally using glob patterns) for new files.  There a two
assumptions here: files are immutable when they arrive and files always
have a unique name. If files are deleted, we ignore that, so you are okay
to rotate them out.

The full pipeline that I have seen often involves the logs getting uploaded
to something like S3.  This is nice because you get atomic visibility of
files that have already been rotated.  So I wouldn't really call this
dynamically tailing, but we do support looking for new files at some
location.


Re: Question about 'Structured Streaming'

2017-08-08 Thread Michael Armbrust
Cool stuff! A pattern I have seen is to use our CSV/TSV or JSON support to
read bro logs, rather than a python library.  This is likely to have much
better performance since we can do all of the parsing on the JVM without
having to flow it though an external python process.

On Tue, Aug 8, 2017 at 9:35 AM, Brian Wylie  wrote:

> Hi All,
>
> I've read the new information about Structured Streaming in Spark, looks
> super great.
>
> Resources that I've looked at
> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
> - https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/
> Structured%20Streaming%20using%20Python%20DataFrames%20API.html
>
> + YouTube videos from Spark Summit 2016/2017
>
> So finally getting to my question:
>
> I have Python code that yields a Python generator... this is a great
> streaming approach within Python. I've used it for network packet
> processing and a bunch of other stuff. I'd love to simply hook up this
> generator (that yields python dictionaries) along with a schema definition
> to create an  'unbounded DataFrame' as discussed in
> https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
>
> Possible approaches:
> - Make a custom receiver in Python: https://spark.apache.
> org/docs/latest/streaming-custom-receivers.html
> - Use Kafka (this is definitely possible and good but overkill for my use
> case)
> - Send data out a socket and use socketTextStream to pull back in (seems a
> bit silly to me)
> - Other???
>
> Since Python Generators so naturally fit into streaming pipelines I'd
> think that this would be straightforward to 'couple' a python generator
> into a Spark structured streaming pipeline..
>
> I've put together a small notebook just to give a concrete example
> (streaming Bro IDS network data) https://github.com/
> Kitware/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb
>
> Any thoughts/suggestions/pointers are greatly appreciated.
>
> -Brian
>
>


Re: [SS] Console sink not supporting recovering from checkpoint location? Why?

2017-08-07 Thread Michael Armbrust
I think there is really no good reason for this limitation.

On Mon, Aug 7, 2017 at 2:58 AM, Jacek Laskowski  wrote:

> Hi,
>
> While exploring checkpointing with kafka source and console sink I've
> got the exception:
>
> // today's build from the master
> scala> spark.version
> res8: String = 2.3.0-SNAPSHOT
>
> scala> val q = records.
>  |   writeStream.
>  |   format("console").
>  |   option("truncate", false).
>  |   option("checkpointLocation", "/tmp/checkpoint"). // <--
> checkpoint directory
>  |   trigger(Trigger.ProcessingTime(10.seconds)).
>  |   outputMode(OutputMode.Update).
>  |   start
> org.apache.spark.sql.AnalysisException: This query does not support
> recovering from checkpoint location. Delete /tmp/checkpoint/offsets to
> start over.;
>   at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(
> StreamingQueryManager.scala:222)
>   at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(
> StreamingQueryManager.scala:278)
>   at org.apache.spark.sql.streaming.DataStreamWriter.
> start(DataStreamWriter.scala:284)
>   ... 61 elided
>
> The "trigger" is the change
> https://issues.apache.org/jira/browse/SPARK-16116 and this line in
> particular https://github.com/apache/spark/pull/13817/files#diff-
> d35e8fce09686073f81de598ed657de7R277.
>
> Why is this needed? I can't think of a use case where console sink
> could not recover from checkpoint location (since all the information
> is available). I'm lost on it and would appreciate some help (to
> recover :))
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: how to convert the binary from kafak to srring pleaae

2017-07-24 Thread Michael Armbrust
There are end to end examples of using Kafka in in this blog:
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

On Sun, Jul 23, 2017 at 7:44 PM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> Hi all
>
> I want to change the binary from kafka to string. Would you like help me
> please?
>
> val df = ss.readStream.format("kafka").option("kafka.bootstrap.
> server","")
> .option("subscribe","")
> .load
>
> val value = df.select("value")
>
> value.writeStream
> .outputMode("append")
> .format("console")
> .start()
> .awaitTermination()
>
>
> Above code outputs result like:
>
> ++
> |value|
> +-+
> |[61,61]|
> +-+
>
>
> 61 is character a receiced from kafka.
> I want to print [a,a] or aa.
> How should I do please?
>


Re: custom joins on dataframe

2017-07-23 Thread Michael Armbrust
>
> left.join(right, my_fuzzy_udf (left("cola"),right("cola")))
>

While this could work, the problem will be that we'll have to check every
possible combination of tuples from left and right using your UDF.  It
would be best if you could somehow partition the problem so that we could
reduce the number of comparisons.  For example, if you had a fuzzy hash
that you could do an equality check on in addition to the UDF, that would
greatly speed up the computation.


Re: Flatten JSON to multiple columns in Spark

2017-07-18 Thread Michael Armbrust
Here is an overview of how to work with complex JSON in Spark:
https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
(works in streaming and batch)

On Tue, Jul 18, 2017 at 10:29 AM, Riccardo Ferrari 
wrote:

> What's against:
>
> df.rdd.map(...)
>
> or
>
> dataset.foreach()
>
> https://spark.apache.org/docs/2.0.1/api/scala/index.html#
> org.apache.spark.sql.Dataset@foreach(f:T=>Unit):Unit
>
> Best,
>
> On Tue, Jul 18, 2017 at 6:46 PM, lucas.g...@gmail.com <
> lucas.g...@gmail.com> wrote:
>
>> I've been wondering about this for awhile.
>>
>> We wanted to do something similar for generically saving thousands of
>> individual homogenous events into well formed parquet.
>>
>> Ultimately I couldn't find something I wanted to own and pushed back on
>> the requirements.
>>
>> It seems the canonical answer is that you need to 'own' the schema of the
>> json and parse it out manually and into your dataframe.  There's nothing
>> challenging about it.  Just verbose code.  If you're 'info' is a consistent
>> schema then you'll be fine.  For us it was 12 wildly diverging schemas and
>> I didn't want to own the transforms.
>>
>> I also recommend persisting anything that isn't part of your schema in an
>> 'extras field'  So when you parse out your json, if you've got anything
>> leftover drop it in there for later analysis.
>>
>> I can provide some sample code but I think it's pretty straightforward /
>> you can google it.
>>
>> What you can't seem to do efficiently is dynamically generate a dataframe
>> from random JSON.
>>
>>
>> On 18 July 2017 at 01:57, Chetan Khatri 
>> wrote:
>>
>>> Implicit tried - didn't worked!
>>>
>>> from_json - didnt support spark 2.0.1 any alternate solution would be
>>> welcome please
>>>
>>>
>>> On Tue, Jul 18, 2017 at 12:18 PM, Georg Heiler <
>>> georg.kf.hei...@gmail.com> wrote:
>>>
 You need to have spark implicits in scope
 Richard Xin  schrieb am Di. 18. Juli
 2017 um 08:45:

> I believe you could use JOLT (bazaarvoice/jolt
> ) to flatten it to a json string
> and then to dataframe or dataset.
>
> bazaarvoice/jolt
>
> jolt - JSON to JSON transformation library written in Java.
> 
>
>
>
>
> On Monday, July 17, 2017, 11:18:24 PM PDT, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>
> Explode is not working in this scenario with error - string cannot be
> used in explore either array or map in spark
> On Tue, Jul 18, 2017 at 11:39 AM, 刘虓  wrote:
>
> Hi,
> have you tried to use explode?
>
> Chetan Khatri  于2017年7月18日 周二下午2:06写道:
>
> Hello Spark Dev's,
>
> Can you please guide me, how to flatten JSON to multiple columns in
> Spark.
>
> *Example:*
>
> Sr No Title ISBN Info
> 1 Calculus Theory 1234567890
>
> [{"cert":[{
> "authSbmtr":"009415da-c8cd- 418d-869e-0a19601d79fa",
> 009415da-c8cd-418d-869e- 0a19601d79fa
> "certUUID":"03ea5a1a-5530- 4fa3-8871-9d1ebac627c4",
>
> "effDt":"2016-05-06T15:04:56. 279Z",
>
>
> "fileFmt":"rjrCsv","status":" live"}],
>
> "expdCnt":"15",
> "mfgAcctNum":"531093",
>
> "oUUID":"23d07397-4fbe-4897- 8a18-b79c9f64726c",
>
>
> "pgmRole":["RETAILER"],
> "pgmUUID":"1cb5dd63-817a-45bc- a15c-5660e4accd63",
> "regUUID":"cc1bd898-657d-40dc- af5d-4bf1569a1cc4",
> "rtlrsSbmtd":["009415da-c8cd- 418d-869e-0a19601d79fa"]}]
>
> I want to get single row with 11 columns.
>
> Thanks.
>
>
>>>
>>
>


[ANNOUNCE] Announcing Apache Spark 2.2.0

2017-07-11 Thread Michael Armbrust
Hi all,

Apache Spark 2.2.0 is the third release of the Spark 2.x line. This release
removes the experimental tag from Structured Streaming. In addition, this
release focuses on usability, stability, and polish, resolving over 1100
tickets.

We'd like to thank our contributors and users for their contributions and
early feedback to this release. This release would not have been possible
without you.

To download Spark 2.2.0, head over to the download page:
http://spark.apache.org/downloads.html

To view the release notes: https://spark.apache.
org/releases/spark-release-2-2-0.html

*(note: If you see any issues with the release notes, webpage or published
artifacts, please contact me directly off-list) *

Michael


Re: Event time aggregation is possible in Spark Streaming ?

2017-07-10 Thread Michael Armbrust
Event-time aggregation is only supported in Structured Streaming.

On Sat, Jul 8, 2017 at 4:18 AM, Swapnil Chougule 
wrote:

> Hello,
>
> I want to know whether event time aggregation in spark streaming. I could
> see it's possible in structured streaming. As I am working on conventional
> spark streaming, I need event time aggregation in it. I checked but didn't
> get any relevant documentation.
>
> Thanks in advance
>
> Regards,
> Swapnil
>


Re: Union of 2 streaming data frames

2017-07-10 Thread Michael Armbrust
As I said in the voting thread:

This vote passes! I'll followup with the release on Monday.



On Mon, Jul 10, 2017 at 10:55 AM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> Michael,
>
>
>
> I see that 2.2 RC6 has passed a vote on Friday. Does this mean 2.2 is
> going to be out soon? Do you have some sort of ETA?
>
>
>
> *From: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
> *Date: *Friday, July 7, 2017 at 5:46 PM
> *To: *Michael Armbrust <mich...@databricks.com>
>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat <
> mm-heartb...@capitalone.com>
> *Subject: *Re: Union of 2 streaming data frames
>
>
>
> Great! Even, *val **dfAllEvents =
> sparkSession.table("oldEvents").union(sparkSession.table("newEvents")) 
> *doesn’t
> work. Will this be addressed in 2.2?
>
>
>
>
>
> *From: *Michael Armbrust <mich...@databricks.com>
> *Date: *Friday, July 7, 2017 at 5:42 PM
> *To: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat <
> mm-heartb...@capitalone.com>
> *Subject: *Re: Union of 2 streaming data frames
>
>
>
> Ah, looks like you are hitting SPARK-20441
> <https://issues.apache.org/jira/browse/SPARK-20441>.  Should be fixed in
> 2.2.
>
>
>
> On Fri, Jul 7, 2017 at 2:37 PM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
> I created a small sample code to verify this. It looks like union using
> Spark SQL doesn’t work. Calling union on dataframe works.
> https://gist.github.com/GaalDornick/8920577ca92842f44d7bfd3a277c7545. I’m
> on 2.1.0
>
>
>
> I get the following exception. If I change val dfAllEvents =
> sparkSession.sql("select * from oldEvents union select * from newEvents")
> to val dfAllEvents = dfNewEvents.union(dfOldEvents) it works fine
>
>
>
> 17/07/07 17:33:34 ERROR StreamExecution: Query [id =
> 3bae26a1-7ee3-45ab-a98d-9346eaf03d08, runId = 
> 063af01f-9878-452e-aa30-7c21e2ef4c18]
> terminated with error
>
> org.apache.spark.sql.AnalysisException: resolved attribute(s) acctId#29
> missing from 
> eventType#2,acctId#0,eventId#37L,acctId#36,eventType#38,eventId#1L
> in operator !Join Inner, (acctId#0 = acctId#29);;
>
> Distinct
>
> +- Union
>
>:- Project [acctId#0, eventId#1L, eventType#2]
>
>:  +- SubqueryAlias oldevents, `oldEvents`
>
>: +- Project [acctId#0, eventId#1L, eventType#2]
>
>   :+- !Join Inner, (acctId#0 = acctId#29)
>
>:   :- SubqueryAlias alloldevents, `allOldEvents`
>
>:   :  +- Relation[acctId#0,eventId#1L,eventType#2] json
>
>:   +- SubqueryAlias newevents, `newEvents`
>
>:  +- Relation[acctId#36,eventId#37L,eventType#38] json
>
>+- Project [acctId#29, eventId#30L, eventType#31]
>
>   +- SubqueryAlias newevents, `newEvents`
>
>  +- Relation[acctId#29,eventId#30L,eventType#31] json
>
>
>
> at org.apache.spark.sql.catalyst.
> analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
>
> at org.apache.spark.sql.catalyst.analysis.Analyzer.
> failAnalysis(Analyzer.scala:57)
>
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337)
>
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:128)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
> at scala.collection.immutable.List.foreach(List.scala:381)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:127)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
> at scala.collection.immutable.List.foreach(List.scala:381)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:127)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode

Re: Union of 2 streaming data frames

2017-07-07 Thread Michael Armbrust
alysis.Analyzer.
> checkAnalysis(Analyzer.scala:57)
>
> at org.apache.spark.sql.execution.QueryExecution.
> assertAnalyzed(QueryExecution.scala:48)
>
> at org.apache.spark.sql.execution.QueryExecution.
> withCachedData$lzycompute(QueryExecution.scala:68)
>
> at org.apache.spark.sql.execution.QueryExecution.
> withCachedData(QueryExecution.scala:67)
>
> at org.apache.spark.sql.execution.streaming.
> IncrementalExecution.optimizedPlan$lzycompute(
> IncrementalExecution.scala:60)
>
> at org.apache.spark.sql.execution.streaming.
> IncrementalExecution.optimizedPlan(IncrementalExecution.scala:60)
>
> at org.apache.spark.sql.execution.QueryExecution.
> sparkPlan$lzycompute(QueryExecution.scala:79)
>
> at org.apache.spark.sql.execution.QueryExecution.
> sparkPlan(QueryExecution.scala:75)
>
> at org.apache.spark.sql.execution.QueryExecution.
> executedPlan$lzycompute(QueryExecution.scala:84)
>
> at org.apache.spark.sql.execution.QueryExecution.
> executedPlan(QueryExecution.scala:84)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:496)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:488)
>
> at org.apache.spark.sql.execution.streaming.
> ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution.reportTimeTaken(StreamExecution.scala:46)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution.org$apache$spark$sql$execution$streaming$
> StreamExecution$$runBatch(StreamExecution.scala:488)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$
> mcV$sp(StreamExecution.scala:255)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(
> StreamExecution.scala:244)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(
> StreamExecution.scala:244)
>
> at org.apache.spark.sql.execution.streaming.
> ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution.reportTimeTaken(StreamExecution.scala:46)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(
> StreamExecution.scala:244)
>
> at org.apache.spark.sql.execution.streaming.
> ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution.org$apache$spark$sql$execution$streaming$
> StreamExecution$$runBatches(StreamExecution.scala:239)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anon$1.run(StreamExecution.scala:177)
>
>
>
>
>
>
>
>
>
> *From: *Michael Armbrust <mich...@databricks.com>
> *Date: *Friday, July 7, 2017 at 2:30 PM
> *To: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *Re: Union of 2 streaming data frames
>
>
>
> df.union(df2) should be supported when both DataFrames are created from a
> streaming source.  What error are you seeing?
>
>
>
> On Fri, Jul 7, 2017 at 11:27 AM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
> In structured streaming, Is there a way to Union 2 streaming data frames?
> Are there any plans to support Union of 2 streaming dataframes soon? I can
> understand the inherent complexity in joining 2 streaming data frames. But,
> Union is  just concatenating 2 microbatches, innit?
>
>
>
> The problem that we are trying to solve is that we have a Kafka stream
> that is receiving events. Each event is assosciated with an account ID. We
> have a data store that stores historical  events for hundreds of millions
> of accounts. What we want to do is for the events coming in the input
> stre

Re: Union of 2 streaming data frames

2017-07-07 Thread Michael Armbrust
df.union(df2) should be supported when both DataFrames are created from a
streaming source.  What error are you seeing?

On Fri, Jul 7, 2017 at 11:27 AM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> In structured streaming, Is there a way to Union 2 streaming data frames?
> Are there any plans to support Union of 2 streaming dataframes soon? I can
> understand the inherent complexity in joining 2 streaming data frames. But,
> Union is  just concatenating 2 microbatches, innit?
>
>
>
> The problem that we are trying to solve is that we have a Kafka stream
> that is receiving events. Each event is assosciated with an account ID. We
> have a data store that stores historical  events for hundreds of millions
> of accounts. What we want to do is for the events coming in the input
> stream, we want to add in all the historical events from the data store and
> give it to a model.
>
>
>
> Initially, the way we were planning to do this is
> a) read from Kafka into a streaming dataframe. Call this inputDF.
> b) In a mapWithPartition method, get all the unique accounts in the
> partition. Look up all the historical events for those unique accounts and
> return them. Let’s call this historicalDF
>
> c) Union inputDF with historicalDF. Call this allDF
>
> d) Call mapWithPartition on allDF and give the records to the model
>
>
>
> Of course, this doesn’t work because both inputDF and historicalDF are
> streaming data frames.
>
>
>
> What we ended up doing is in step b) we output the input records with the
> historical records, which works but seems like a hacky way of doing things.
> The operation that does lookup does union too. This works for now because
> the data from the data store doesn’t require any transformation or
> aggregation. But, if it did, we would like to do that using Spark SQL,
> whereas this solution forces us to doing any transformation of historical
> data in Scala
>
>
>
> Is there a Sparky way of doing this?
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: If I pass raw SQL string to dataframe do I still get the Spark SQL optimizations?

2017-07-06 Thread Michael Armbrust
It goes through the same optimization pipeline.  More in this video
.

On Thu, Jul 6, 2017 at 5:28 PM, kant kodali  wrote:

> HI All,
>
> I am wondering If I pass a raw SQL string to dataframe do I still get the
> Spark SQL optimizations? why or why not?
>
> Thanks!
>


Re: Interesting Stateful Streaming question

2017-06-30 Thread Michael Armbrust
This does sound like a good use case for that feature.  Note that Spark
2.2. adds a similar [flat]MapGroupsWithState operation to structured
streaming.  Stay tuned for a blog post on that!

On Thu, Jun 29, 2017 at 6:11 PM, kant kodali  wrote:

> Is mapWithState an answer for this ? https://databricks.com/blog/
> 2016/02/01/faster-stateful-stream-processing-in-apache-
> spark-streaming.html
>
> On Thu, Jun 29, 2017 at 11:55 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> Here is a problem and I am wondering if Spark Streaming is the right tool
>> for this ?
>>
>> I have stream of messages m1, m2, m3and each of those messages can be
>> in state s1, s2, s3,sn (you can imagine the number of states are about
>> 100) and I want to compute some metrics that visit all the states from s1
>> to sn but these state transitions can happen at indefinite amount of
>> time. A simple example of that would be count all messages that visited
>> state s1, s2, s3. Other words, the transition function should know that say
>> message m1 had visited state s1 and s2 but not s3 yet and once the message
>> m1 visits s3 increment the counter +=1 .
>>
>> If it makes anything easier I can say a message has to visit s1 before
>> visiting s2 and s2 before visiting s3 and so on but would like to know both
>> with and without order.
>>
>> Thanks!
>>
>>
>


Re: org.apache.spark.sql.types missing from spark-sql_2.11-2.1.1.jar?

2017-06-20 Thread Michael Armbrust
It's in the spark-catalyst_2.11-2.1.1.jar since the logical query plans and
optimization also need to know about types.

On Tue, Jun 20, 2017 at 1:14 PM, Jean Georges Perrin  wrote:

> Hey all,
>
> i was giving a run to 2.1.1 and got an error on one of my test program:
>
> package net.jgp.labs.spark.l000_ingestion;
>
> import java.util.Arrays;
> import java.util.List;
>
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.IntegerType;
>
> public class ArrayToDataset {
>
> public static void main(String[] args) {
> ArrayToDataset app = new ArrayToDataset();
> app.start();
> }
>
> private void start() {
> SparkSession spark = SparkSession.builder().appName("Array to Dataset"
> ).master("local").getOrCreate();
>
> Integer[] l = new Integer[] { 1, 2, 3, 4, 5, 6, 7 };
> List data = Arrays.asList(l);
> Dataset df = spark.createDataFrame(data, IntegerType.class);
>
> df.show();
> }
> }
>
> Eclipse is complaining that it cannot find 
> org.apache.spark.sql.types.IntegerType
> and after looking in the spark-sql_2.11-2.1.1.jar jar, I could not find it
> as well:
>
> I looked at the 2.1.1 release notes as well, did not see anything. The
> package is still in Javadoc: https://spark.apache.
> org/docs/latest/api/java/org/apache/spark/sql/types/package-summary.html
>
> I must be missing something. Any hint?
>
> Thanks!
>
> jg
>
>
>
>
>
>


Re: how many topics spark streaming can handle

2017-06-19 Thread Michael Armbrust
I don't think that there is really a Spark specific limit here.  It would
be a function of the size of your spark / kafka clusters and the type of
processing you are trying to do.

On Mon, Jun 19, 2017 at 12:00 PM, Ashok Kumar 
wrote:

> Hi Gurus,
>
> Within one Spark streaming process how many topics can be handled? I have
> not tried more than one topic.
>
> Thanks
>


Re: the scheme in stream reader

2017-06-19 Thread Michael Armbrust
The socket source can't know how to parse your data.  I think the right
thing would be for it to throw an exception saying that you can't set the
schema here.  Would you mind opening a JIRA ticket?

If you are trying to parse data from something like JSON then you should
use from_json` on the value returned.

On Sun, Jun 18, 2017 at 12:27 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> Hi all,
>
> L set the scheme for  DataStreamReader but when I print the scheme.It just
> printed:
> root
> |--value:string (nullable=true)
>
> My code is
>
> val line = ss.readStream.format("socket")
> .option("ip",xxx)
> .option("port",xxx)
> .scheme(StructField("name",StringType)::(StructField("age",
> IntegerType))).load
> line.printSchema
>
> My spark version is 2.1.0.
> I want the printSchema prints the schema I set in the code.How should I do
> please?
> And my original target is the received data from socket is handled as
> schema directly.What should I do please?
>
> thanks
> Fei Shao
>
>
>
>
>
>
>


Re: Nested "struct" fonction call creates a compilation error in Spark SQL

2017-06-15 Thread Michael Armbrust
You might also try with a newer version.  Several instance of code
generation failures have been fixed since 2.0.

On Thu, Jun 15, 2017 at 1:15 PM, Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> Hi Michael,
> Spark 2.0.2 - but I have a very interesting test case actually
> The optimiser seems to be at fault in a way, I've joined to this email the
> explain when I limit myself to 2 levels of struct mutation and when it goes
> to 5.
> As you can see the optimiser seems to be doing a lot more in the later
> case.
> After further investigation, the code is not "failing" per se - spark is
> trying the whole stage codegen, the compilation is failing due to the
> compilation error and I think it's falling back to the "non codegen" way.
>
> I'll try to create a simpler test case to reproduce this if I can, what do
> you think ?
>
> Regards,
>
> Olivier.
>
>
> 2017-06-15 21:08 GMT+02:00 Michael Armbrust <mich...@databricks.com>:
>
>> Which version of Spark?  If its recent I'd open a JIRA.
>>
>> On Thu, Jun 15, 2017 at 6:04 AM, Olivier Girardot <
>> o.girar...@lateral-thoughts.com> wrote:
>>
>>> Hi everyone,
>>> when we create recursive calls to "struct" (up to 5 levels) for
>>> extending a complex datastructure we end up with the following compilation
>>> error :
>>>
>>> org.codehaus.janino.JaninoRuntimeException: Code of method
>>> "(I[Lscala/collection/Iterator;)V" of class
>>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator"
>>> grows beyond 64 KB
>>>
>>> The CreateStruct code itself is properly using the ctx.splitExpression
>>> command but the "end result" of the df.select( struct(struct(struct()
>>> ))) ends up being too much.
>>>
>>> Should I open a JIRA or is there a workaround ?
>>>
>>> Regards,
>>>
>>> --
>>> *Olivier Girardot* | Associé
>>> o.girar...@lateral-thoughts.com
>>>
>>
>>
>
>
> --
> *Olivier Girardot* | Associé
> o.girar...@lateral-thoughts.com
> +33 6 24 09 17 94
>


Re: Nested "struct" fonction call creates a compilation error in Spark SQL

2017-06-15 Thread Michael Armbrust
Which version of Spark?  If its recent I'd open a JIRA.

On Thu, Jun 15, 2017 at 6:04 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> Hi everyone,
> when we create recursive calls to "struct" (up to 5 levels) for extending
> a complex datastructure we end up with the following compilation error :
>
> org.codehaus.janino.JaninoRuntimeException: Code of method
> "(I[Lscala/collection/Iterator;)V" of class "org.apache.spark.sql.
> catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB
>
> The CreateStruct code itself is properly using the ctx.splitExpression
> command but the "end result" of the df.select( struct(struct(struct()
> ))) ends up being too much.
>
> Should I open a JIRA or is there a workaround ?
>
> Regards,
>
> --
> *Olivier Girardot* | Associé
> o.girar...@lateral-thoughts.com
>


Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-15 Thread Michael Armbrust
Continuous processing is still a work in progress.  I would really like to
at least have a basic version in Spark 2.3.

The announcement about 2.2 is that we are planning to remove the
experimental tag from Structured Streaming.

On Thu, Jun 15, 2017 at 11:53 AM, kant kodali <kanth...@gmail.com> wrote:

> vow! you caught the 007!  Is continuous processing mode available in 2.2?
> The ticket says the target version is 2.3 but the talk in the Video says
> 2.2 and beyond so I am just curious if it is available in 2.2 or should I
> try it from the latest build?
>
> Thanks!
>
> On Wed, Jun 14, 2017 at 5:32 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> This a good question. I really like using Kafka as a centralized source
>> for streaming data in an organization and, with Spark 2.2, we have full
>> support for reading and writing data to/from Kafka in both streaming and
>> batch
>> <https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html>.
>> I'll focus here on what I think the advantages are of Structured Streaming
>> over Kafka Streams (a stream processing library that reads from Kafka).
>>
>>  - *High level productive APIs* - Streaming queries in Spark can be
>> expressed using DataFrames, Datasets or even plain SQL.  Streaming
>> DataFrames/SQL are supported in Scala, Java, Python and even R.  This means
>> that for common operations like filtering, joining, aggregating, you can
>> use built-in operations.  For complicated custom logic you can use UDFs and
>> lambda functions. In contrast, Kafka Streams mostly requires you to express
>> your transformations using lambda functions.
>>  - *High Performance* - Since it is built on Spark SQL, streaming
>> queries take advantage of the Catalyst optimizer and the Tungsten execution
>> engine. This design leads to huge performance wins
>> <https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html>,
>> which means you need less hardware to accomplish the same job.
>>  - *Ecosystem* - Spark has connectors for working with all kinds of data
>> stored in a variety of systems.  This means you can join a stream with data
>> encoded in parquet and stored in S3/HDFS.  Perhaps more importantly, it
>> also means that if you decide that you don't want to manage a Kafka cluster
>> anymore and would rather use Kinesis, you can do that too.  We recently
>> moved a bunch of our pipelines from Kafka to Kinesis and had to only change
>> a few lines of code! I think its likely that in the future Spark will also
>> have connectors for Google's PubSub and Azure's streaming offerings.
>>
>> Regarding latency, there has been a lot of discussion about the inherent
>> latencies of micro-batch.  Fortunately, we were very careful to leave
>> batching out of the user facing API, and as we demo'ed last week, this
>> makes it possible for the Spark Streaming to achieve sub-millisecond
>> latencies <https://www.youtube.com/watch?v=qAZ5XUz32yM>.  Watch
>> SPARK-20928 <https://issues.apache.org/jira/browse/SPARK-20928> for more
>> on this effort to eliminate micro-batch from Spark's execution model.
>>
>> At the far other end of the latency spectrum...  For those with jobs that
>> run in the cloud on data that arrives sporadically, you can run streaming
>> jobs that only execute every few hours or every few days, shutting the
>> cluster down in between.  This architecture can result in a huge cost
>> savings for some applications
>> <https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html>
>> .
>>
>> Michael
>>
>> On Sun, Jun 11, 2017 at 1:12 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am trying hard to figure out what is the real difference between Kafka
>>> Streaming vs Spark Streaming other than saying one can be used as part of
>>> Micro services (since Kafka streaming is just a library) and the other is a
>>> Standalone framework by itself.
>>>
>>> If I can accomplish same job one way or other this is a sort of a
>>> puzzling question for me so it would be great to know what Spark streaming
>>> can do that Kafka Streaming cannot do efficiently or whatever ?
>>>
>>> Thanks!
>>>
>>>
>>
>


Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-14 Thread Michael Armbrust
This a good question. I really like using Kafka as a centralized source for
streaming data in an organization and, with Spark 2.2, we have full support
for reading and writing data to/from Kafka in both streaming and batch
.
I'll focus here on what I think the advantages are of Structured Streaming
over Kafka Streams (a stream processing library that reads from Kafka).

 - *High level productive APIs* - Streaming queries in Spark can be
expressed using DataFrames, Datasets or even plain SQL.  Streaming
DataFrames/SQL are supported in Scala, Java, Python and even R.  This means
that for common operations like filtering, joining, aggregating, you can
use built-in operations.  For complicated custom logic you can use UDFs and
lambda functions. In contrast, Kafka Streams mostly requires you to express
your transformations using lambda functions.
 - *High Performance* - Since it is built on Spark SQL, streaming queries
take advantage of the Catalyst optimizer and the Tungsten execution engine.
This design leads to huge performance wins
,
which means you need less hardware to accomplish the same job.
 - *Ecosystem* - Spark has connectors for working with all kinds of data
stored in a variety of systems.  This means you can join a stream with data
encoded in parquet and stored in S3/HDFS.  Perhaps more importantly, it
also means that if you decide that you don't want to manage a Kafka cluster
anymore and would rather use Kinesis, you can do that too.  We recently
moved a bunch of our pipelines from Kafka to Kinesis and had to only change
a few lines of code! I think its likely that in the future Spark will also
have connectors for Google's PubSub and Azure's streaming offerings.

Regarding latency, there has been a lot of discussion about the inherent
latencies of micro-batch.  Fortunately, we were very careful to leave
batching out of the user facing API, and as we demo'ed last week, this
makes it possible for the Spark Streaming to achieve sub-millisecond
latencies .  Watch SPARK-20928
 for more on this effort
to eliminate micro-batch from Spark's execution model.

At the far other end of the latency spectrum...  For those with jobs that
run in the cloud on data that arrives sporadically, you can run streaming
jobs that only execute every few hours or every few days, shutting the
cluster down in between.  This architecture can result in a huge cost
savings for some applications

.

Michael

On Sun, Jun 11, 2017 at 1:12 AM, kant kodali  wrote:

> Hi All,
>
> I am trying hard to figure out what is the real difference between Kafka
> Streaming vs Spark Streaming other than saying one can be used as part of
> Micro services (since Kafka streaming is just a library) and the other is a
> Standalone framework by itself.
>
> If I can accomplish same job one way or other this is a sort of a puzzling
> question for me so it would be great to know what Spark streaming can do
> that Kafka Streaming cannot do efficiently or whatever ?
>
> Thanks!
>
>


Re: Running into the same problem as JIRA SPARK-20325

2017-05-31 Thread Michael Armbrust
>
> So, my question is the same as stated in the following ticket which is Do
> we need create a checkpoint directory for each individual query?
>

Yes.  Checkpoints record what data has been processed.  Thus two different
queries need their own checkpoints.


Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread Michael Armbrust
-dev

Have you tried clearing out the checkpoint directory?  Can you also give
the full stack trace?

On Wed, May 24, 2017 at 3:45 PM, kant kodali  wrote:

> Even if I do simple count aggregation like below I get the same error as
> https://issues.apache.org/jira/browse/SPARK-19268
>
> Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 
> hours", "24 hours"), df1.col("AppName")).count();
>
>
> On Wed, May 24, 2017 at 3:35 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>> Kafka
>>
>> I am running into the same problem as https://issues.apache.org/jira
>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>
>> Here is my sample code
>>
>> *Here is how I create ReadStream*
>>
>> sparkSession.readStream()
>> .format("kafka")
>> .option("kafka.bootstrap.servers", 
>> config.getString("kafka.consumer.settings.bootstrapServers"))
>> .option("subscribe", 
>> config.getString("kafka.consumer.settings.topicName"))
>> .option("startingOffsets", "earliest")
>> .option("failOnDataLoss", "false")
>> .option("checkpointLocation", hdfsCheckPointDir)
>> .load();
>>
>>
>> *The core logic*
>>
>> Dataset df = ds.select(from_json(new Column("value").cast("string"), 
>> client.getSchema()).as("payload"));
>> Dataset df1 = df.selectExpr("payload.info.*", "payload.data.*");
>> Dataset df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 
>> hours"), df1.col("AppName")).agg(sum("Amount"));
>> StreamingQuery query = df1.writeStream().foreach(new 
>> KafkaSink()).outputMode("update").start();
>> query.awaitTermination();
>>
>>
>> I can also provide any other information you may need.
>>
>> Thanks!
>>
>
>


Re: Impact of coalesce operation before writing dataframe

2017-05-23 Thread Michael Armbrust
coalesce is nice because it does not shuffle, but the consequence of
avoiding a shuffle is it will also reduce parallelism of the preceding
computation.  Have you tried using repartition instead?

On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi <
andrii.bilets...@yahoo.com.invalid> wrote:

> Hi all,
>
> I'm trying to understand the impact of coalesce operation on spark job
> performance.
>
> As a side note: were are using emrfs (i.e. aws s3) as source and a target
> for the job.
>
> Omitting unnecessary details job can be explained as: join 200M records
> Dataframe stored in orc format on emrfs with another 200M records cached
> Dataframe, the result of the join put back to emrfs. First DF is a set of
> wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark
> shows 20 GB).
>
> I have enough resources in my cluster to perform the job but I don't like
> the fact that output datasource contains 200 part orc files (as
> spark.sql.shuffle.partitions defaults to 200) so before saving orc to
> emrfs I'm doing .coalesce(10). From documentation coalesce looks like a
> quite harmless operations: no repartitioning etc.
>
> But with such setup my job fails to write dataset on the last stage. Right
> now the error is OOM: GC overhead. When I change  .coalesce(10) to
> .coalesce(100) the job runs much faster and finishes without errors.
>
> So what's the impact of .coalesce in this case? And how to do in place
> concatenation of files (not involving hive) to end up with smaller amount
> of bigger files, as with .coalesce(100) job generates 100 orc snappy
> encoded files ~300MB each.
>
> Thanks,
> Andrii
>


Re: Are there any Kafka forEachSink examples?

2017-05-23 Thread Michael Armbrust
There is an example in this post:

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

On Tue, May 23, 2017 at 11:35 AM, kant kodali  wrote:

> Hi All,
>
> Are there any Kafka forEachSink examples preferably in Java but Scala is
> fine too?
>
> Thanks!
>


Re: 2.2. release date ?

2017-05-23 Thread Michael Armbrust
Mark is right.  I will cut another RC as soon as the known issues are
resolve.  In the mean time it would be very helpful for people to test RC2
and report issues.

On Tue, May 23, 2017 at 11:10 AM, Mark Hamstra 
wrote:

> I heard that once we reach release candidates it's not a question of time
> or a target date, but only whether blockers are resolved and the code is
> ready to release.
>
> On Tue, May 23, 2017 at 11:07 AM, kant kodali  wrote:
>
>> Heard its end of this month (May)
>>
>> On Tue, May 23, 2017 at 9:41 AM, mojhaha kiklasds <
>> sesca.syst...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I could see a RC2 candidate for Spark 2.2, but not sure about the
>>> expected release timeline on that.
>>> Would be great if somebody can confirm it.
>>>
>>> Thanks,
>>> Mhojaha
>>>
>>
>>
>


Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-22 Thread Michael Armbrust
There is an RC here.  Please test!

http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html

On Fri, May 19, 2017 at 4:07 PM, kant kodali  wrote:

> Hi Patrick,
>
> I am using 2.1.1 and I tried the above code you sent and I get
>
> "java.lang.UnsupportedOperationException: Data source kafka does not
> support streamed writing"
>
> so yeah this probably works only from Spark 2.2 onwards. I am not sure
> when it officially releases.
>
> Thanks!
>
> On Fri, May 19, 2017 at 8:39 AM,  wrote:
>
>> Hi!
>>
>> Is this possible possible in spark 2.1.1?
>>
>> Sent from my iPhone
>>
>> On May 19, 2017, at 5:55 AM, Patrick McGloin 
>> wrote:
>>
>> # Write key-value data from a DataFrame to a Kafka topic specified in an 
>> option
>> query = df \
>>   .selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS 
>> value") \
>>   .writeStream \
>>   .format("kafka") \
>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
>>   .option("topic", "topic1") \
>>   .option("checkpointLocation", "/path/to/HDFS/dir") \
>>   .start()
>>
>> Described here:
>>
>> https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>
>>
>>
>> On 19 May 2017 at 10:45,  wrote:
>>
>>> Is there a Kafka sink for Spark Structured Streaming ?
>>>
>>> Sent from my iPhone
>>>
>>
>>
>


Re: How to see the full contents of dataset or dataframe is structured streaming?

2017-05-18 Thread Michael Armbrust
You can write it to the memory sink.

df.writeStream.format("memory").queryName("myStream").start()

spark.table("myStream").show()

On Wed, May 17, 2017 at 7:55 PM, kant kodali  wrote:

> Hi All,
>
> How to see the full contents of dataset or dataframe is structured
> streaming just like we normally with *df.show(false)*? Is there any
> parameter I can pass in to the code below?
>
> val df1 = df.selectExpr("payload.data.*");
>
> df1.writeStream().outputMode("append").format("console").start()
>
>
> Thanks!
>
>


Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread Michael Armbrust
I mean the actual kafka client:


  org.apache.kafka
  kafka-clients
  0.10.0.1



On Tue, May 16, 2017 at 4:29 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi Michael,
>
> Thanks for the catch. I assume you meant
> *spark-streaming-kafka-0-10_2.11-2.1.0.jar*
>
> I add this in all spark machines under SPARK_HOME/jars.
>
> Still same error seems to persist. Is that the right jar or is there
> anything else I need to add?
>
> Thanks!
>
>
>
> On Tue, May 16, 2017 at 1:40 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Looks like you are missing the kafka dependency.
>>
>> On Tue, May 16, 2017 at 1:04 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Looks like I am getting the following runtime exception. I am using
>>> Spark 2.1.0 and the following jars
>>>
>>> *spark-sql_2.11-2.1.0.jar*
>>>
>>> *spark-sql-kafka-0-10_2.11-2.1.0.jar*
>>>
>>> *spark-streaming_2.11-2.1.0.jar*
>>>
>>>
>>> Exception in thread "stream execution thread for [id = 
>>> fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId = 
>>> 7c54940a-e453-41de-b256-049b539b59b1]"
>>>
>>> java.lang.NoClassDefFoundError: 
>>> org/apache/kafka/common/serialization/ByteArrayDeserializer
>>> at 
>>> org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
>>> at 
>>> org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
>>> at 
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>>> at 
>>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>>> at 
>>> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>>>
>>>
>>> On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> The default "startingOffsets" is "latest". If you don't push any data
>>>> after starting the query, it won't fetch anything. You can set it to
>>>> "earliest" like ".option("startingOffsets", "earliest")" to start the
>>>> stream from the beginning.
>>>>
>>>> On Tue, May 16, 2017 at 12:36 AM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have the following code.
>>>>>
>>>>>  val ds = sparkSession.readStream()
>>>>> .format("kafka")
>>>>> .option("kafka.bootstrap.servers",bootstrapServers))
>>>>> .option("subscribe", topicName)
>>>>> .option("checkpointLocation", hdfsCheckPointDir)
>>>>> .load();
>>>>>
>>>>>  val ds1 = ds.select($"value")
>>>>>  val query = 
>>>>> ds1.writeStream.outputMode("append").format("console").start()
>>>>>  query.awaitTermination()
>>>>>
>>>>> There are no errors when I execute this code however I don't see any
>>>>> data being printed out to console? When I run my standalone test Kafka
>>>>> consumer jar I can see that it is receiving messages. so I am not sure 
>>>>> what
>>>>> is going on with above code? any ideas?
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>>
>>>
>>
>


Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread Michael Armbrust
Looks like you are missing the kafka dependency.

On Tue, May 16, 2017 at 1:04 PM, kant kodali  wrote:

> Looks like I am getting the following runtime exception. I am using Spark
> 2.1.0 and the following jars
>
> *spark-sql_2.11-2.1.0.jar*
>
> *spark-sql-kafka-0-10_2.11-2.1.0.jar*
>
> *spark-streaming_2.11-2.1.0.jar*
>
>
> Exception in thread "stream execution thread for [id = 
> fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId = 
> 7c54940a-e453-41de-b256-049b539b59b1]"
>
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/serialization/ByteArrayDeserializer
> at 
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
> at 
> org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
> at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>
>
> On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> The default "startingOffsets" is "latest". If you don't push any data
>> after starting the query, it won't fetch anything. You can set it to
>> "earliest" like ".option("startingOffsets", "earliest")" to start the
>> stream from the beginning.
>>
>> On Tue, May 16, 2017 at 12:36 AM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I have the following code.
>>>
>>>  val ds = sparkSession.readStream()
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers",bootstrapServers))
>>> .option("subscribe", topicName)
>>> .option("checkpointLocation", hdfsCheckPointDir)
>>> .load();
>>>
>>>  val ds1 = ds.select($"value")
>>>  val query = ds1.writeStream.outputMode("append").format("console").start()
>>>  query.awaitTermination()
>>>
>>> There are no errors when I execute this code however I don't see any
>>> data being printed out to console? When I run my standalone test Kafka
>>> consumer jar I can see that it is receiving messages. so I am not sure what
>>> is going on with above code? any ideas?
>>>
>>> Thanks!
>>>
>>
>>
>


Re: what is the difference between json format vs kafka format?

2017-05-15 Thread Michael Armbrust
For that simple count, you don't actually have to even parse the JSON
data.  You can just do a count.  The following code assumes you are
running Spark
2.2

.

df.groupBy().count().writeStream.outputMode("co
mplete").format("console").start()

If you want to do something more complicated, you will need specify the
schema at least for the columns that you want Spark to understand.  We need
to know the names and types of the column so that we know how to extract
them from the JSON.  Its okay however to omit columns that you don't care
about.

df.select(from_json($"value".cast("string"), "name STRING, age INT") as
'message).groupBy($"message.name").agg(avg($"age"))

If you are not sure what json looks like, you can ask spark to infer it
based on a sample.

spark.read.json(spark.read.format("kafka").option(...).load().limit(1000).select($"value".as[String])).printSchema()

On Sat, May 13, 2017 at 8:48 PM, kant kodali  wrote:

> Hi,
>
> Here is a little bit of background.
>
> I've been using stateless streaming API's for a while like using
> JavaDstream and so on and they worked well. It's has come to a point where
> we need to do realtime stateful streaming based on event time and other
> things but for now I am just trying to get used to structured streaming
> API's by running simple aggregations like count(). so naturally, I tend to
> think that the concepts that are behind JavaDstream would also apply in
> StructuredStreaming as well (Please correct me if I am wrong). For example,
> I can do the following with Dstreams without writing to any text files.
>
> // dstreams version
> jsonDStream.foreachRDD{rdd =>
> val jsonDF = spark.read.json(rdd)
> jsonDF.createOrReplaceTempView("dataframe")
> }
> javaStreamingContext.start()
> select count(*) from dataframe;
>
> or I can also do javaDstream.count() such that at every batch interval it
> spits out the count.
>
> I was expecting something like this with Structured Streaming as well. so
> I thought of doing something like below to mimic the above version. It
> looks very similar to me so I am not sure what you mean by
>
> "For streaming queries, you have to let it run in the background
> continuously by starting it using writeStreamstart()." Dstreams are
> also unbounded right? except at every batch interval the count() action
> gets invoked so why I can't call .count() on stream of dataframes in
> structured streaming (especially when it is possible with stream of RDD's
> like Dstreams)? I guess I probably have some misconception somewhere.
>
> //structured streaming version
> val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON
> val foo = ds.select("*").count()
> val query = foo.writeStream.outputMode("complete").format("console").sta
> rt();
> query.awaitTermination()
>
> How should I change this code to do a simple count in structured
> streaming? you can assume there is schema ahead of time if thats really a
> problem.
>
> since we want to do real time structured streaming we would like to avoid
> any extra level of indirections such as writing to text files and so on but
> if I really have to do a workaround to infer schema like writing to text
> files I rather try and figure out how I can get schemas ahead of time which
> is not ideal for our case but I can try to survive.
>
> Thanks a lot!
>
>
>
>
>
>
>
>
>
>
>
>
> On Sat, May 13, 2017 at 7:11 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> You cant do ".count()" directly on streaming DataFrames. This is because
>> "count" is an Action (remember RDD actions) that executes and returns a
>> result immediately which can be done only when the data is bounded (e.g.
>> batch/interactive queries). For streaming queries, you have to let it run
>> in the background continuously by starting it using writeStreamstart().
>>
>> And for streaming queries, you have specify schema from before so that at
>> runtime it explicitly fails when schema is incorrectly changed.
>>
>> In your case, what you can do is the following.
>> - Run a streaming query that converts the binary data from KAFka to
>> string, and saves as text files (i.e. 
>> *writeStream.format("text").start("path")
>> *)
>>
>> - Then run a batch query on the saved text files with format json (i.e.  
>> *read.format("json").load(path)
>> *)  with schema inference, and get the schema from the Dataset created
>> (i.e Dataset.schema ).
>>
>> - Then you can run the real streaming query with from_json and the learnt
>> schema.
>>
>> Make sure that the generated text file have sufficient data to infer the
>> full schema. Let me know if this works for you.
>>
>> TD
>>
>>
>> On Sat, May 13, 2017 at 6:04 PM, kant kodali  wrote:
>>
>>> Hi!
>>>
>>> Thanks for the response. Looks like from_json requires schema ahead of
>>> time. Is there any function I can use to infer schema from the json

Re: Spark SQL DataFrame to Kafka Topic

2017-05-15 Thread Michael Armbrust
The foreach sink from that blog post requires that you have a DataFrame
with two columns in the form of a Tuple2, (String, String), where as your
dataframe has only a single column `payload`.  You could change the
KafkaSink to extend ForeachWriter[KafkaMessage] and then it would work.

I'd also suggest you just try the native KafkaSink

that is part of Spark 2.2

.

On Sun, May 14, 2017 at 9:31 AM, Revin Chalil  wrote:

> Hi TD / Michael,
>
>
>
> I am trying to use the foreach sink to write to Kafka and followed this 
> 
>  from DBricks blog by Sunil Sitaula 
>  . I get the below with 
> DF.writeStream.foreach(writer).outputMode("update").start() when using a 
> simple DF
>
> Type mismatch, expected: foreachWriter[Row], actual: KafkaSink
>
> Cannot resolve reference foreach with such signature
>
>
>
> Below is the snippet
>
> *val *data = session
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", KafkaBroker)
>   .option("subscribe", InTopic)
>   .load()
>   .select($"value".as[Array[Byte]])
>   .flatMap(d => {
> *var *events = AvroHelper.*readEvents*(d)
> events.map((event: HdfsEvent) => {
>   *var *payload = EventPayloadParser.*read*(event.getPayload)
>   *new *KafkaMessage(payload)
> })
>   })
>
>
>
> *case class *KafkaMessage(
>   payload: String)
>
>
>
> This is where I use the foreach
>
> *val *writer = *new *KafkaSink("kafka-topic", KafkaBroker)
> *val *query = data.writeStream.foreach(writer).outputMode("update").start()
>
>
>
> In this case, it shows –
>
> Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: 
> Main.KafkaSink
>
> Cannot resolve reference foreach with such signature
>
>
>
> Any help is much appreciated. Thank you.
>
>
>
>
>
> *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com]
> *Sent:* Friday, January 13, 2017 3:31 PM
> *To:* Koert Kuipers 
> *Cc:* Peyman Mohajerian ; Senthil Kumar <
> senthilec...@gmail.com>; User ;
> senthilec...@apache.org
> *Subject:* Re: Spark SQL DataFrame to Kafka Topic
>
>
>
> Structured Streaming has a foreach sink, where you can essentially do what
> you want with your data. Its easy to create a Kafka producer, and write the
> data out to kafka.
>
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#using-foreach
>
>
>
> On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers  wrote:
>
> how do you do this with structured streaming? i see no mention of writing
> to kafka
>
>
>
> On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
> wrote:
>
> Yes, it is called Structured Streaming: https://docs.
> databricks.com/_static/notebooks/structured-streaming-kafka.html
>
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html
>
>
>
> On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
> wrote:
>
> Hi Team ,
>
>
>
>  Sorry if this question already asked in this forum..
>
>
>
> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
>
>
>
> Here is my Code which Reads Parquet File :
>
>
>
> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);*
>
> *val df = sqlContext.read.parquet("/temp/*.parquet")*
>
> *df.registerTempTable("beacons")*
>
>
>
> I want to directly ingest df DataFrame to Kafka ! Is there any way to
> achieve this ??
>
>
>
> Cheers,
>
> Senthil
>
>
>
>
>
>
>


Re: Reading Avro messages from Kafka using Structured Streaming in Spark 2.1

2017-05-12 Thread Michael Armbrust
I believe that Avro/Kafka messages have a few bytes at the beginning of the
message to denote which schema is being used.  Have you tried using
the KafkaAvroDecoder inside of the map instead?

On Fri, May 12, 2017 at 9:26 AM, Revin Chalil  wrote:

> Just following up on this; would appreciate any responses on this. Thanks.
>
>
>
> *From:* Revin Chalil [mailto:rcha...@expedia.com]
> *Sent:* Wednesday, May 10, 2017 11:21 PM
> *To:* user@spark.apache.org
> *Subject:* Reading Avro messages from Kafka using Structured Streaming in
> Spark 2.1
>
>
>
> I am trying to convert avro records with field type = bytes to json string 
> using Structured Streaming in Spark 2.1. Please see below.
>
>
>
> *object *AvroConvert {
>
>   *case class *KafkaMessage(
>payload: String
>  )
>
>   *val **schemaString *="""{
> "type" : "record",
> "name" : "HdfsEvent",
> "namespace" : "com.abc.def.domain.hdfs",
> "fields" : [ {
>   "name" : "payload",
>   "type" : {
> "type" : "bytes",
> "java-class" : "[B"
>   }
> } ]
>   }"""
>   *val **messageSchema *= *new *Schema.Parser().parse(*schemaString*)
>   *val **reader *= *new *GenericDatumReader[GenericRecord](*messageSchema*)
>   // Binary decoder
>   *val **decoder *= DecoderFactory.*get*()
>   // Register implicit encoder for map operation
>   *implicit val **encoder*: Encoder[GenericRecord] = 
> org.apache.spark.sql.Encoders.*kryo*[GenericRecord]
>
>   *def *main(args: Array[String]) {
>
> *val *KafkaBroker = "**.**.**.**:9092";
> *val *InTopic = "avro";
>
> // Get Spark session
> *val *session = SparkSession
>   .
> *builder  *.master("local[*]")
>   .appName("myapp")
>   .getOrCreate()
>
> // Load streaming data
> *import *session.implicits._
>
> *val *data = session
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", KafkaBroker)
>   .option("subscribe", InTopic)
>   .load()
>   .select($"value".as[Array[Byte]])
>   .map(d => {
> *val *rec = *reader*.read(*null*, *decoder*.binaryDecoder(d, *null*))
> *val *payload = rec.get("payload").asInstanceOf[Byte].toString
> *new *KafkaMessage(payload)
>   })
>
> *val *query = data.writeStream
>   .outputMode("Append")
>   .format("console")
>   .start()
>
> query.awaitTermination()
>   }
> }
>
>
>
>
>
> I am getting the below error.
>
> org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
>
> at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>
> at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>
> at 
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
>
> at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
>
> at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
>
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
>
> at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
>
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
>
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
>
> at 
> com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)
>
> at 
> com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)
>
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
>
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
>
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>
> at 

Re: Convert DStream into Streaming Dataframe

2017-05-12 Thread Michael Armbrust
Are there any particular things that the DataFrame or Dataset API are
missing?

On Fri, May 12, 2017 at 9:49 AM, Tejinder Aulakh 
wrote:

> Hi,
>
> Is there any way to convert a DStream to a streaming dataframe? I want to
> use Structured streaming in a new common module that I'm developing. The
> existing code uses DStream so trying to figure out how to convert a DStream
> to a Streaming Dataframe. The documentation only describes how to read the
> data from the source into a streaming Dataframe.
>
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#creating-streaming-dataframes-
> and-streaming-datasets
>
> Thanks,
> TJ
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Michael Armbrust
Must be a bug.  This works for me
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/908554720841389/2840265927289860/latest.html>
in
Spark 2.1.

On Tue, May 9, 2017 at 12:10 PM, Yang <tedd...@gmail.com> wrote:

> somehow the schema check is here
>
> https://github.com/apache/spark/blob/master/sql/
> catalyst/src/main/scala/org/apache/spark/sql/catalyst/
> ScalaReflection.scala#L697-L750
>
> supposedly beans are to be handled, but it's not clear to me which line
> handles the type of beans. if that's clear, I could probably annotate my
> bean class properly
>
> On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> I think you are supposed to set BeanProperty on a var as they do here
>> <https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala#L71-L83>.
>> If you are using scala though I'd consider using the case class encoders.
>>
>> On Tue, May 9, 2017 at 12:21 AM, Yang <tedd...@gmail.com> wrote:
>>
>>> I'm trying to use Encoders.bean() to create an encoder for my custom
>>> class, but it fails complaining about can't find the schema:
>>>
>>>
>>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>>> parallelize at :31 scala> sqlcontext.createDataFrame(per
>>> son_rdd) java.lang.UnsupportedOperationException: Schema for type
>>> Person4 is not supported at org.apache.spark.sql.catalyst.
>>> ScalaReflection$.schemaFor(ScalaReflection.scala:716) at org.apache.
>>> spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
>>> ScalaReflection.scala:71 2) at org.apache.spark.sql.catalyst.
>>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 1)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
>>> .scala:234) at
>>>
>>>
>>> but if u look at the encoder's schema, it does know it:
>>> but the system does seem to understand the schema for "Person4":
>>>
>>>
>>> scala> personEncoder.schema
>>> res38: org.apache.spark.sql.types.StructType = 
>>> StructType(StructField(x,IntegerType,false))
>>>
>>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Michael Armbrust
Which version of Spark?

On Tue, May 9, 2017 at 11:28 AM, Yang <tedd...@gmail.com> wrote:

> actually with var it's the same:
>
>
> scala> class Person4 {
>  |
>  | @scala.beans.BeanProperty var X:Int = 1
>  | }
> defined class Person4
>
> scala> val personEncoder = Encoders.bean[Person4](classOf[Person4])
> personEncoder: org.apache.spark.sql.Encoder[Person4] = class[x[0]: int]
>
> scala> val person_rdd =sc.parallelize(Array( (new Person4(), 1), (new
> Person4(), 2) ))
> person_rdd: org.apache.spark.rdd.RDD[(Person4, Int)] =
> ParallelCollectionRDD[3] at parallelize at :39
>
> scala> sqlContext.createDataFrame(person_rdd)
> java.lang.UnsupportedOperationException: Schema for type Person4 is not
> supported
>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(
> ScalaReflection.scala:716)
>   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$
> schemaFor$2.apply(ScalaReflection.scala:712)
>   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$
> schemaFor$2.apply(ScalaReflection.scala:711)
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(
> ScalaReflection.scala:711)
>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(
> ScalaReflection.scala:654)
>   at org.apache.spark.sql.SparkSession.createDataFrame(
> SparkSession.scala:251)
>   at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:278)
>   ... 54 elided
>
> On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> I think you are supposed to set BeanProperty on a var as they do here
>> <https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala#L71-L83>.
>> If you are using scala though I'd consider using the case class encoders.
>>
>> On Tue, May 9, 2017 at 12:21 AM, Yang <tedd...@gmail.com> wrote:
>>
>>> I'm trying to use Encoders.bean() to create an encoder for my custom
>>> class, but it fails complaining about can't find the schema:
>>>
>>>
>>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>>> parallelize at :31 scala> sqlcontext.createDataFrame(per
>>> son_rdd) java.lang.UnsupportedOperationException: Schema for type
>>> Person4 is not supported at org.apache.spark.sql.catalyst.
>>> ScalaReflection$.schemaFor(ScalaReflection.scala:716) at org.apache.
>>> spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
>>> ScalaReflection.scala:71 2) at org.apache.spark.sql.catalyst.
>>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 1)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
>>> .scala:234) at
>>>
>>>
>>> but if u look at the encoder's schema, it does know it:
>>> but the system does seem to understand the schema for "Person4":
>>>
>>>
>>> scala> personEncoder.schema
>>> res38: org.apache.spark.sql.types.StructType = 
>>> StructType(StructField(x,IntegerType,false))
>>>
>>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Michael Armbrust
I think you are supposed to set BeanProperty on a var as they do here
.
If you are using scala though I'd consider using the case class encoders.

On Tue, May 9, 2017 at 12:21 AM, Yang  wrote:

> I'm trying to use Encoders.bean() to create an encoder for my custom
> class, but it fails complaining about can't find the schema:
>
>
> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
> parallelize at :31 scala> sqlcontext.createDataFrame(person_rdd)
> java.lang.UnsupportedOperationException: Schema for type Person4 is not
> supported at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
> laReflection.scala:716) at org.apache.spark.sql.catalyst.
> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2) at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
> ScalaReflection.scala:71 1) at scala.collection.TraversableLi
> ke$$anonfun$map$1.apply(TraversableLike.scala:234) at
>
>
> but if u look at the encoder's schema, it does know it:
> but the system does seem to understand the schema for "Person4":
>
>
> scala> personEncoder.schema
> res38: org.apache.spark.sql.types.StructType = 
> StructType(StructField(x,IntegerType,false))
>
>


Re: What are Analysis Errors With respect to Spark Sql DataFrames and DataSets?

2017-05-03 Thread Michael Armbrust
>
>  if I do dataset.select("nonExistentColumn") then the Analysis Error is
> thrown at compile time right?
>

if you do df.as[MyClass].map(_.badFieldName) you will get a compile error.
However, if df doesn't have the right columns for MyClass, that error will
only be thrown at runtime (whether DF is backed by something in memory or
some remote database).


Re: What are Analysis Errors With respect to Spark Sql DataFrames and DataSets?

2017-05-03 Thread Michael Armbrust
An analysis exception occurs whenever the scala/java/python program is
valid, but the dataframe operations being performed are not.  For example,
df.select("nonExistentColumn") would throw an analysis exception.

On Wed, May 3, 2017 at 1:38 PM, kant kodali  wrote:

> Hi All,
>
> I understand the compile time Errors this blog
> 
>  is
> talking about but I don't understand what are Analysis Errors? Any Examples?
>
> Thanks!
>


[ANNOUNCE] Apache Spark 2.1.1

2017-05-02 Thread Michael Armbrust
We are happy to announce the availability of Spark 2.1.1!

Apache Spark 2.1.1 is a maintenance release, based on the branch-2.1
maintenance branch of Spark. We strongly recommend all 2.1.x users to
upgrade to this stable release.

To download Apache Spark 2.1.1 visit http://spark.apache.org/downloads.html

We would like to acknowledge all community members for contributing patches
to this release.


Re: Schema Evolution for nested Dataset[T]

2017-05-01 Thread Michael Armbrust
Oh, and if you want a default other than null:

import org.apache.spark.sql.functions._
df.withColumn("address", coalesce($"address", lit())

On Mon, May 1, 2017 at 10:29 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> The following should work:
>
> val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema
> spark.read.schema(schema).parquet("data.parquet").as[Course]
>
> Note this will only work for nullable files (i.e. if you add a primitive
> like Int you need to make it an Option[Int])
>
> On Sun, Apr 30, 2017 at 9:12 PM, Mike Wheeler <
> rotationsymmetr...@gmail.com> wrote:
>
>> Hi Spark Users,
>>
>> Suppose I have some data (stored in parquet for example) generated as
>> below:
>>
>> package com.company.entity.old
>> case class Course(id: Int, students: List[Student])
>> case class Student(name: String)
>>
>> Then usually I can access the data by
>>
>> spark.read.parquet("data.parquet").as[Course]
>>
>> Now I want to add a new field `address` to Student:
>>
>> package com.company.entity.new
>> case class Course(id: Int, students: List[Student])
>> case class Student(name: String, address: String)
>>
>> Then obviously running `spark.read.parquet("data.parquet").as[Course]`
>> on data generated by the old entity/schema will fail because `address`
>> is missing.
>>
>> In this case, what is the best practice to read data generated with
>> the old entity/schema to the new entity/schema, with the missing field
>> set to some default value? I know I can manually write a function to
>> do the transformation from the old to the new. But it is kind of
>> tedious. Any automatic methods?
>>
>> Thanks,
>>
>> Mike
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Schema Evolution for nested Dataset[T]

2017-05-01 Thread Michael Armbrust
The following should work:

val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema
spark.read.schema(schema).parquet("data.parquet").as[Course]

Note this will only work for nullable files (i.e. if you add a primitive
like Int you need to make it an Option[Int])

On Sun, Apr 30, 2017 at 9:12 PM, Mike Wheeler 
wrote:

> Hi Spark Users,
>
> Suppose I have some data (stored in parquet for example) generated as
> below:
>
> package com.company.entity.old
> case class Course(id: Int, students: List[Student])
> case class Student(name: String)
>
> Then usually I can access the data by
>
> spark.read.parquet("data.parquet").as[Course]
>
> Now I want to add a new field `address` to Student:
>
> package com.company.entity.new
> case class Course(id: Int, students: List[Student])
> case class Student(name: String, address: String)
>
> Then obviously running `spark.read.parquet("data.parquet").as[Course]`
> on data generated by the old entity/schema will fail because `address`
> is missing.
>
> In this case, what is the best practice to read data generated with
> the old entity/schema to the new entity/schema, with the missing field
> set to some default value? I know I can manually write a function to
> do the transformation from the old to the new. But it is kind of
> tedious. Any automatic methods?
>
> Thanks,
>
> Mike
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Arraylist is empty after JavaRDD.foreach

2017-04-24 Thread Michael Armbrust
Foreach runs on the executors and so is not able to modify an array list
that is only present on the driver.  You should just call collectAsList on
the DataFrame.

On Mon, Apr 24, 2017 at 10:36 AM, Devender Yadav <
devender.ya...@impetus.co.in> wrote:

> Hi All,
>
>
> I am using Spark 1.6.2 and Java 7.
>
>
> *Sample json* (total 100 records):
>
> {"name":"dev","salary":1,"occupation":"engg","address":"noida"}
>
> {"name":"karthik","salary":2,"occupation":"engg","address":"noida"}
>
> *Useful code:*
>
>final List> jsonData = new ArrayList<>();
>
>DataFrame df =  
> sqlContext.read().json("file:///home/dev/data-json/emp.json");
>JavaRDD rdd = df.repartition(1).toJSON().toJavaRDD();
>
>rdd.foreach(new VoidFunction() {
>@Override
>public void call(String line)  {
>try {
>jsonData.add (new ObjectMapper().readValue(line, Map.class));
>System.out.println(Thread.currentThread().getName());
>System.out.println("List size: "+jsonData.size());
>} catch (IOException e) {
>e.printStackTrace();
>}
>}
>});
>
>System.out.println(Thread.currentThread().getName());
>System.out.println("List size: "+jsonData.size());
>
> jsonData List is empty in the end.
>
>
> Output:
>
> Executor task launch worker-1List size: 1Executor task launch worker-1List 
> size: 2Executor task launch worker-1List size: 3...Executor task launch 
> worker-1List size: 100
>
> mainList size: 0
>
>
>
> Regards,
> Devender
>
> --
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>


Re: SPARK-20325 - Spark Structured Streaming documentation Update: checkpoint configuration

2017-04-14 Thread Michael Armbrust
>
> 1)  could we update documentation for Structured Streaming and describe
> that checkpointing could be specified by 
> spark.sql.streaming.checkpointLocation
> on SparkSession level and thus automatically checkpoint dirs will be
> created per foreach query?
>
>
Sure, please open a pull request.


> 2) Do we really need to specify the checkpoint dir per query? what the
> reason for this? finally we will be forced to write some checkpointDir name
> generator, for example associate it with some particular named query and so
> on?
>

Every query needs to have a unique checkpoint as this is how we track what
has been processed.  If we don't have this, we can't restart the query
where it left off.  In you example, I would suggest including the metric
name in the checkpoint location path.


Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Michael Armbrust
It sounds like you want a tumbling window (where the slide and duration are
the same).  This is the default if you give only one interval.  You should
set the output mode to "update" (i.e. output only the rows that have been
updated since the last trigger) and the trigger to "1 second".

Try thinking about the batch query that would produce the answer you want.
Structured streaming will figure out an efficient way to compute that
answer incrementally as new data arrives.

On Mon, Apr 10, 2017 at 12:20 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi Michael,
>
> Thanks for the response. I guess I was thinking more in terms of the
> regular streaming model. so In this case I am little confused what my
> window interval and slide interval be for the following case?
>
> I need to hold a state (say a count) for 24 hours while capturing all its
> updates and produce results every second. I also need to reset the state
> (the count) back to zero every 24 hours.
>
>
>
>
>
>
> On Mon, Apr 10, 2017 at 11:49 AM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> Nope, structured streaming eliminates the limitation that micro-batching
>> should affect the results of your streaming query.  Trigger is just an
>> indication of how often you want to produce results (and if you leave it
>> blank we just run as quickly as possible).
>>
>> To control how tuples are grouped into a window, take a look at the
>> window
>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time>
>> function.
>>
>> On Thu, Apr 6, 2017 at 10:26 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Is the trigger interval mentioned in this doc
>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
>>> the same as batch interval in structured streaming? For example I have a
>>> long running receiver(not kafka) which sends me a real time stream I want
>>> to use window interval, slide interval of 24 hours to create the Tumbling
>>> window effect but I want to process updates every second.
>>>
>>> Thanks!
>>>
>>
>>
>


Re: Cant convert Dataset to case class with Option fields

2017-04-10 Thread Michael Armbrust
Options should work.  Can you give a full example that is freezing?  Which
version of Spark are you using?

On Fri, Apr 7, 2017 at 6:59 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi Devs,
> I've some case classes here, and it's fields are all optional
> case class A(b:Option[B] = None, c: Option[C] = None, ...)
>
> If I read some data in a DataSet and try to connvert it to this case class
> using the as method, it doesn't give me any answer, it simple freeze.
> If I change the case class to
>
> case class A(b:B,c:C)
> id work nice and return the field values as null.
>
> Option fields aren't supported by the as method or is this an Issue?
>
> Kind Regards,
> Dirceu
>


Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Michael Armbrust
Nope, structured streaming eliminates the limitation that micro-batching
should affect the results of your streaming query.  Trigger is just an
indication of how often you want to produce results (and if you leave it
blank we just run as quickly as possible).

To control how tuples are grouped into a window, take a look at the window

function.

On Thu, Apr 6, 2017 at 10:26 AM, kant kodali  wrote:

> Hi All,
>
> Is the trigger interval mentioned in this doc
> 
> the same as batch interval in structured streaming? For example I have a
> long running receiver(not kafka) which sends me a real time stream I want
> to use window interval, slide interval of 24 hours to create the Tumbling
> window effect but I want to process updates every second.
>
> Thanks!
>


Re: map transform on array in spark sql

2017-04-04 Thread Michael Armbrust
If you can find the name of the struct field from the schema you can just
do:

df.select($"arrayField.a")

Selecting a field from an array returns an array with that field selected
from each element.

On Mon, Apr 3, 2017 at 8:18 PM, Koert Kuipers  wrote:

> i have a DataFrame where one column has type:
>
> ArrayType(StructType(Seq(
>   StructField("a", typeA, nullableA),
>   StructField("b", typeB, nullableB)
> )))
>
> i would like to map over this array to pick the first element in the
> struct. so the result should be a ArrayType(typeA, nullableA). i realize i
> can do this with a scala udf if i know typeA. but what if i dont know typeA?
>
> basically i would like to do an expression like:
> map(col("x"), _(0)))
>
> any suggestions?
>
>


Re: Convert Dataframe to Dataset in pyspark

2017-04-03 Thread Michael Armbrust
You don't need encoders in python since its all dynamically typed anyway.
You can just do the following if you want the data as a string.

sqlContext.read.text("/home/spark/1.6/lines").rdd.map(lambda row: row.value)

2017-04-01 5:36 GMT-07:00 Selvam Raman :

> In Scala,
> val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
>
> what is the equivalent code in pyspark?
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Why VectorUDT private?

2017-03-30 Thread Michael Armbrust
I think really the right way to think about things that are marked private
is, "this may disappear or change in a future minor release".  If you are
okay with that, working about the visibility restrictions is reasonable.

On Thu, Mar 30, 2017 at 5:52 AM, Koert Kuipers  wrote:

> I stopped asking long time ago why things are private in spark... I
> mean... The conversion between ml and mllib vectors is private... the
> conversion between spark vector and breeze used to be (or still is?)
> private. it just goes on. Lots of useful stuff is private[SQL].
>
> Luckily there are simple ways to get around these visibility restrictions
>
> On Mar 29, 2017 22:57, "Ryan"  wrote:
>
>> I'm writing a transformer and the input column is vector type(which is
>> the output column from other transformer). But as the VectorUDT is private,
>> how could I check/transform schema for the vector column?
>>
>


Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
Ah, I understand what you are asking now.  There is no API for specifying a
kafka specific "decoder", since Spark SQL already has a rich language for
expressing transformations.  The dataframe code I gave will parse the JSON
and materialize in a class, very similar to what objectMapper.readValue(bytes,
Tweet.class) would do.

However, there are other cases where you might need to do some domain
specific transformation that Spark SQL doesn't support natively.  In this
case you can write a UDF that does the translation. There are a couple of
different ways you can specify this, depending on whether you want to
map/flatMap or just apply the function as a UDF to a single column
<http://stackoverflow.com/questions/35348058/how-do-i-call-a-udf-on-a-spark-dataframe-using-java>
.


On Mon, Mar 27, 2017 at 1:59 PM, kaniska Mandal <kaniska.man...@gmail.com>
wrote:

> yup, that solves the compilation issue :-)
>
> one quick question regarding specifying Decoder in kafka stream:
>
> please note that I am encoding the message as follows while sending data
> to kafka -
>
> 
>
> *String msg = objectMapper.writeValueAsString(tweetEvent);*
>
> *return msg.getBytes();*
>
> I have a corresponding 
>
> *return objectMapper.readValue(bytes, Tweet.class)*
>
>
> *>> how do I specify the Decoder in the following stream-processing flow ?*
> streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .withColumn("message", from_json(col("value").cast("string"),
> tweetSchema)) // cast the binary value to a string and parse it as json
>   .select("message.*") // unnest the json
>   .as(Encoders.bean(Tweet.class))
>
> Thanks
> Kaniska
>
> -
>
> On Mon, Mar 27, 2017 at 1:25 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> You need to import col from org.apache.spark.sql.functions.
>>
>> On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal <kaniska.man...@gmail.com
>> > wrote:
>>
>>> Hi Michael,
>>>
>>> Can you please check if I am using correct version of spark-streaming
>>> library as specified in my pom (specified in the email) ?
>>>
>>> col("value").cast("string") - throwing an error 'cannot find symbol
>>> method col(java.lang.String)'
>>> I tried $"value" which results into similar compilation error.
>>>
>>> Thanks
>>> Kaniska
>>>
>>>
>>>
>>> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> Sorry, I don't think that I understand the question.  Value is just a
>>>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>>>> I think the code I provided is a good option, but if you are using a
>>>> different encoding you may need to write a UDF.
>>>>
>>>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <
>>>> kaniska.man...@gmail.com> wrote:
>>>>
>>>>> Hi Michael,
>>>>>
>>>>> Thanks much for the suggestion.
>>>>>
>>>>> I was wondering - whats the best way to deserialize the 'value' field
>>>>>
>>>>>
>>>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> Encoders can only map data into an object if those columns already
>>>>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>>>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>>>>> in JSON it should be pretty straight forward.
>>>>>>
>>>>>> streams = spark
>>>>>>   .readStream()
>>>>>>   .format("kafka")
>>>>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>>>>   .option(subscribeType, topics)
>>>>>>   .load()
>>>>>>   .withColumn("message", from_json(col("value").cast("string"),
>>>>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>>>>   .select("message.*") // unnest the json
>>>>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to
>>>>>> use lambda functions on the data usin

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
You need to import col from org.apache.spark.sql.functions.

On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal <kaniska.man...@gmail.com>
wrote:

> Hi Michael,
>
> Can you please check if I am using correct version of spark-streaming
> library as specified in my pom (specified in the email) ?
>
> col("value").cast("string") - throwing an error 'cannot find symbol
> method col(java.lang.String)'
> I tried $"value" which results into similar compilation error.
>
> Thanks
> Kaniska
>
>
>
> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> Sorry, I don't think that I understand the question.  Value is just a
>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>> I think the code I provided is a good option, but if you are using a
>> different encoding you may need to write a UDF.
>>
>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <kaniska.man...@gmail.com
>> > wrote:
>>
>>> Hi Michael,
>>>
>>> Thanks much for the suggestion.
>>>
>>> I was wondering - whats the best way to deserialize the 'value' field
>>>
>>>
>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> Encoders can only map data into an object if those columns already
>>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>>> in JSON it should be pretty straight forward.
>>>>
>>>> streams = spark
>>>>   .readStream()
>>>>   .format("kafka")
>>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>>   .option(subscribeType, topics)
>>>>   .load()
>>>>   .withColumn("message", from_json(col("value").cast("string"),
>>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>>   .select("message.*") // unnest the json
>>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>>>> lambda functions on the data using this class
>>>>
>>>> Here is some more info on working with JSON and other semi-structured
>>>> formats
>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>>> .
>>>>
>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Currently , encountering the following exception while working with
>>>>> below-mentioned code snippet :
>>>>>
>>>>> > Please suggest the correct approach for reading the stream into a sql
>>>>> > schema.
>>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>>> message -
>>>>> > we can not change static schema for kafka.
>>>>>
>>>>> 
>>>>> ---
>>>>>
>>>>> *exception*
>>>>>
>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>>>> value,
>>>>> timestampType, partition]*;
>>>>> at
>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>>> At.failAnalysis(package.scala:42)
>>>>> at
>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>>>> 
>>>>> 
>>>>>
>>>>> *structured streaming code snippet*
>>>>>
>>>>> String bootstrapServers = "localhost:9092";
>>>>> String subscribeType = "subscribe";
>>>>> String topics = "events";
>>>>>
>>>>> StructType tweetSchema = new StructType()
>>>>> .add("tweetId", "string")
>>>>>   

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
Sorry, I don't think that I understand the question.  Value is just a
binary blob that we get from kafka and pass to you.  If its stored in JSON,
I think the code I provided is a good option, but if you are using a
different encoding you may need to write a UDF.

On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <kaniska.man...@gmail.com>
wrote:

> Hi Michael,
>
> Thanks much for the suggestion.
>
> I was wondering - whats the best way to deserialize the 'value' field
>
>
> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> Encoders can only map data into an object if those columns already
>> exist.  When we are reading from Kafka, we just get a binary blob and
>> you'll need to help Spark parse that first.  Assuming your data is stored
>> in JSON it should be pretty straight forward.
>>
>> streams = spark
>>   .readStream()
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>   .option(subscribeType, topics)
>>   .load()
>>   .withColumn("message", from_json(col("value").cast("string"),
>> tweetSchema)) // cast the binary value to a string and parse it as json
>>   .select("message.*") // unnest the json
>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>> lambda functions on the data using this class
>>
>> Here is some more info on working with JSON and other semi-structured
>> formats
>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>> .
>>
>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Currently , encountering the following exception while working with
>>> below-mentioned code snippet :
>>>
>>> > Please suggest the correct approach for reading the stream into a sql
>>> > schema.
>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>> message -
>>> > we can not change static schema for kafka.
>>>
>>> 
>>> ---
>>>
>>> *exception*
>>>
>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>>> timestampType, partition]*;
>>> at
>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>> At.failAnalysis(package.scala:42)
>>> at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>> 
>>> 
>>>
>>> *structured streaming code snippet*
>>>
>>> String bootstrapServers = "localhost:9092";
>>> String subscribeType = "subscribe";
>>> String topics = "events";
>>>
>>> StructType tweetSchema = new StructType()
>>> .add("tweetId", "string")
>>> .add("tweetText", "string")
>>> .add("location", "string")
>>> .add("timestamp", "string");
>>>
>>>SparkSession spark = SparkSession
>>>   .builder()
>>>   .appName("StreamProcessor")
>>>   .config("spark.master", "local")
>>>   .getOrCreate();
>>>
>>>   Dataset streams = spark
>>>   .readStream()
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers",
>>> bootstrapServers)
>>>   .option(subscribeType, topics)
>>>   .load()
>>>   .as(Encoders.bean(Tweet.class));
>>>
>>>  streams.createOrReplaceTempView("streamsData");
>>>
>>>String sql = "SELECT location,  COUNT(*) as

Re: How to insert nano seconds in the TimestampType in Spark

2017-03-27 Thread Michael Armbrust
The timestamp type is only microsecond precision.  You would need to store
it on your own (as binary or limited range long or something) if you
require nanosecond precision.

On Mon, Mar 27, 2017 at 5:29 AM, Devender Yadav <
devender.ya...@impetus.co.in> wrote:

> Hi All,
>
> I am using spark version - 1.6.1
>
> I have a text table in hive having `timestamp` datatype with nanoseconds
> precision.
>
> Hive Table Schema:
>
> c_timestamp timestamp
>
> Hive Table data:
>
> hive> select * from tbl1;
> OK
> 00:00:00.1
> 12:12:12.123456789
> 23:59:59.9
>
> But as per the docs, from Spark 1.5
>
> *Timestamps are now stored at a precision of 1us, rather than 1ns*
>
>
> Sample code:
>
> SparkConf conf = new SparkConf(true).setMaster("
> yarn-cluster").setAppName("SAMPLE_APP");
> SparkContext sc = new SparkContext(conf);
> HiveContext hc = new HiveContext(sc);
> DataFrame df = hc.table("testdb.tbl1");
>
> Data is truncated to microseconds.
>
> 00:00:00
> 12:12:12.123456
> 23:59:59.99
>
>
> Is there any way to use nanoseconds here?
>
>
> Regards,
> Devender
>
>
> --
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>


Re: unable to stream kafka messages

2017-03-24 Thread Michael Armbrust
Encoders can only map data into an object if those columns already exist.
When we are reading from Kafka, we just get a binary blob and you'll need
to help Spark parse that first.  Assuming your data is stored in JSON it
should be pretty straight forward.

streams = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .withColumn("message", from_json(col("value").cast("string"),
tweetSchema)) // cast the binary value to a string and parse it as json
  .select("message.*") // unnest the json
  .as(Encoders.bean(Tweet.class)) // only required if you want to use
lambda functions on the data using this class

Here is some more info on working with JSON and other semi-structured
formats

.

On Fri, Mar 24, 2017 at 10:49 AM, kaniska  wrote:

> Hi,
>
> Currently , encountering the following exception while working with
> below-mentioned code snippet :
>
> > Please suggest the correct approach for reading the stream into a sql
> > schema.
> > If I add 'tweetSchema' while reading stream, it errors out with message -
> > we can not change static schema for kafka.
>
> 
> ---
>
> *exception*
>
> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
> '`location`' given input columns: [topic, timestamp, key, offset, value,
> timestampType, partition]*;
> at
> org.apache.spark.sql.catalyst.analysis.package$
> AnalysisErrorAt.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(
> CheckAnalysis.scala:77)
> 
> 
>
> *structured streaming code snippet*
>
> String bootstrapServers = "localhost:9092";
> String subscribeType = "subscribe";
> String topics = "events";
>
> StructType tweetSchema = new StructType()
> .add("tweetId", "string")
> .add("tweetText", "string")
> .add("location", "string")
> .add("timestamp", "string");
>
>SparkSession spark = SparkSession
>   .builder()
>   .appName("StreamProcessor")
>   .config("spark.master", "local")
>   .getOrCreate();
>
>   Dataset streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers",
> bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .as(Encoders.bean(Tweet.class));
>
>  streams.createOrReplaceTempView("streamsData");
>
>String sql = "SELECT location,  COUNT(*) as count FROM
> streamsData
> GROUP BY location";
>Dataset countsByLocation = spark.sql(sql);
>
> StreamingQuery query = countsByLocation.writeStream()
>   .outputMode("complete")
>   .format("console")
>   .start();
>
> query.awaitTermination();
> 
> --
>
> *Tweet *
>
> Tweet.java - has public constructor and getter / setter methods
>
> public class Tweet implements Serializable{
>
> private String tweetId;
> private String tweetText;
> private String location;
> private String timestamp;
>
> public Tweet(){
>
> }
> .
>
> 
> 
>
> *pom.xml *
>
>
> 
> org.apache.spark
> spark-core_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-streaming_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-streaming-
> kafka-0-8_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-sql_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-sql-kafka-0-10_2.10
> 2.1.0
> 
> 
> 
>
>

Re: how to read object field within json file

2017-03-24 Thread Michael Armbrust
I'm not sure you can parse this as an Array, but you can hint to the parser
that you would like to treat source as a map instead of as a struct.  This
is a good strategy when you have dynamic columns in your data.

Here is an example of the schema you can use to parse this JSON and also
how to use explode to turn it into separate rows
.
This blog post has more on working with semi-structured data in Spark

.

On Thu, Mar 23, 2017 at 2:49 PM, Yong Zhang  wrote:

> That's why your "source" should be defined as an Array[Struct] type (which
> makes sense in this case, it has an undetermined length  , so you can
> explode it and get the description easily.
>
> Now you need write your own UDF, maybe can do what you want.
>
> Yong
>
> --
> *From:* Selvam Raman 
> *Sent:* Thursday, March 23, 2017 5:03 PM
> *To:* user
> *Subject:* how to read object field within json file
>
> Hi,
>
> {
> "id": "test1",
> "source": {
> "F1": {
>   "id": "4970",
>   "eId": "F1",
>   "description": "test1",
> },
> "F2": {
>   "id": "5070",
>   "eId": "F2",
>   "description": "test2",
> },
> "F3": {
>   "id": "5170",
>   "eId": "F3",
>   "description": "test3",
> },
> "F4":{}
>   etc..
>   "F999":{}
> }
>
> I am having bzip json files like above format.
> some json row contains two objects within source(like F1 and F2), sometime
> five(F1,F2,F3,F4,F5),etc. So the final schema will contains combination of
> all objects for the source field.
>
> Now, every row will contain n number of objects but only some contains
> valid records.
> how can i retreive the value of "description" in "source" field.
>
> source.F1.description - returns the result but how can i get all
> description result for every row..(something like this
> "source.*.description").
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread Michael Armbrust
Another option that would avoid a shuffle would be to use assign and
coalesce, running two separate streams.

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("assign", """{t0: {"0": }, t1:{"0": x}}""")
  .load()
  .coalesce(1)
  .writeStream
  .foreach(... code to write to cassandra ...)

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("assign", """{t0: {"1": }, t1:{"1": x}}""")
  .load()
  .coalesce(1)
  .writeStream
  .foreach(... code to write to cassandra ...)

On Fri, Mar 17, 2017 at 7:35 AM, OUASSAIDI, Sami <sami.ouassa...@mind7.fr>
wrote:

> @Cody : Duly noted.
> @Michael Ambrust : A repartition is out of the question for our project as
> it would be a fairly expensive operation. We tried looking into targeting a
> specific executor so as to avoid this extra cost and directly have well
> partitioned data after consuming the kafka topics. Also we are using Spark
> streaming to save to the cassandra DB and try to keep shuffle operations to
> a strict minimum (at best none). As of now we are not entirely pleased with
> our current performances, that's why I'm doing a kafka topic sharding POC
> and getting the executor to handle the specificied partitions is central.
> ᐧ
>
> 2017-03-17 9:14 GMT+01:00 Michael Armbrust <mich...@databricks.com>:
>
>> Sorry, typo.  Should be a repartition not a groupBy.
>>
>>
>>> spark.readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "...")
>>>   .option("subscribe", "t0,t1")
>>>   .load()
>>>   .repartition($"partition")
>>>   .writeStream
>>>   .foreach(... code to write to cassandra ...)
>>>
>>
>
>
> --
> *Mind7 Consulting*
>
> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
> __
>
> 64 Rue Taitbout, 75009 Paris
>


Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread Michael Armbrust
Sorry, typo.  Should be a repartition not a groupBy.


> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("subscribe", "t0,t1")
>   .load()
>   .repartition($"partition")
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>


Re: Streaming 2.1.0 - window vs. batch duration

2017-03-16 Thread Michael Armbrust
Have you considered trying event time aggregation in structured streaming
instead?

On Thu, Mar 16, 2017 at 12:34 PM, Dominik Safaric 
wrote:

> Hi all,
>
> As I’ve implemented a streaming application pulling data from Kafka every
> 1 second (batch interval), I am observing some quite strange behaviour
> (didn’t use Spark extensively in the past, but continuous operator based
> engines instead of).
>
> Namely the dstream.window(Seconds(60)) windowed stream when written back
> to Kafka contains more messages then they were consumed (for debugging
> purposes using a small dataset of a million Kafka byte array deserialized
> messages). In particular, in total I’ve streamed exactly 1 million
> messages, whereas upon window expiry 60 million messages are written back
> to Kafka.
>
> I’ve read on the official docs that both the window and window slide
> duration must be multiples of the batch interval. Does this mean that when
> consuming messages between two windows every batch interval the RDDs of a
> given batch interval *t* the same batch is being ingested 59 more times
> into the windowed stream?
>
> If I would like to achieve this behaviour (batch every being equal to a
> second, window duration 60 seconds) - how might one achieve this?
>
> I would appreciate if anyone could correct me if I got the internals of
> Spark’s windowed operations wrong and elaborate a bit.
>
> Thanks,
> Dominik
>


Re: [Spark Streaming+Kafka][How-to]

2017-03-16 Thread Michael Armbrust
I think it should be straightforward to express this using structured
streaming.  You could ensure that data from a given partition ID is
processed serially by performing a group by on the partition column.

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "t0,t1")
  .load()
  .groupBy($"partition")
  .writeStream
  .foreach(... code to write to cassandra ...)


On Thu, Mar 16, 2017 at 8:10 AM, Cody Koeninger  wrote:

> Spark just really isn't a good fit for trying to pin particular
> computation to a particular executor, especially if you're relying on that
> for correctness.
>
> On Thu, Mar 16, 2017 at 7:16 AM, OUASSAIDI, Sami 
> wrote:
>
>>
>> Hi all,
>>
>> So I need to specify how an executor should consume data from a kafka
>> topic.
>>
>> Let's say I have 2 topics : t0 and t1 with two partitions each, and two
>> executors e0 and e1 (both can be on the same node so assign strategy does
>> not work since in the case of a multi executor node it works based on round
>> robin scheduling, whatever first available executor consumes the topic
>> partition )
>>
>> What I would like to do is make e0 consume partition 0 from both t0 and
>> t1 while e1 consumes partition 1 from the t0 and t1. Is there no way around
>> it except messing with scheduling ? If so what's the best approach.
>>
>> The reason for doing so is that executors will write to a cassandra
>> database and since we will be in a parallelized context one executor might
>> "collide" with another and therefore data will be lost, by assigning a
>> partition I want to force the executor to process the data sequentially.
>>
>> Thanks
>> Sami
>> --
>> *Mind7 Consulting*
>>
>> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
>> __
>>
>> 64 Rue Taitbout, 75009 Paris
>> ᐧ
>>
>
>


Re: Structured Streaming - Can I start using it?

2017-03-13 Thread Michael Armbrust
I think its very very unlikely that it will get withdrawn.  The primary
reason that the APIs are still marked experimental is that we like to have
several releases before committing to interface stability (in particular
the interfaces to write custom sources and sinks are likely to evolve).
Also, there are currently quite a few limitations in the types of queries
that we can run (i.e. multiple aggregations are disallowed, we don't
support stream-stream joins yet).  In these cases though, we explicitly say
its not supported when you try to start your stream.

For the use cases that are supported in 2.1 though (streaming ETL, event
time aggregation, etc) I'll say that we have been using it in production
for several months and we have customers doing the same.

On Mon, Mar 13, 2017 at 11:21 AM, Gaurav1809 
wrote:

> I read in spark documentation that Structured Streaming is still ALPHA in
> Spark 2.1 and the APIs are still experimental. Shall I use it to re write
> my
> existing spark streaming code? Looks like it is not yet production ready.
> What happens if Structured Streaming project gets withdrawn?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Structured-Streaming-Can-I-
> start-using-it-tp28488.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Michael Armbrust
If you have a reproduction you should open a JIRA.  It would be great if
there is a fix.  I'm just saying I know a similar issue does not exist in
structured streaming.

On Fri, Mar 10, 2017 at 7:46 AM, Justin Miller <
justin.mil...@protectwise.com> wrote:

> Hi Michael,
>
> I'm experiencing a similar issue. Will this not be fixed in Spark
> Streaming?
>
> Best,
> Justin
>
> On Mar 10, 2017, at 8:34 AM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> One option here would be to try Structured Streaming.  We've added an
> option "failOnDataLoss" that will cause Spark to just skip a head when this
> exception is encountered (its off by default though so you don't silently
> miss data).
>
> On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman <
> ram.the.m...@gmail.com> wrote:
>
>> I am using Spark streaming and reading data from Kafka using
>> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
>> smallest.
>>
>> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeE
>> xception
>> and my spark job crashes.
>>
>> I want to understand if there is a graceful way to handle this failure and
>> not kill the job. I want to keep ignoring these exceptions, as some other
>> partitions are fine and I am okay with data loss.
>>
>> Is there any way to handle this and not have my spark job crash? I have no
>> option of increasing the kafka retention period.
>>
>> I tried to have the DStream returned by createDirectStream() wrapped in a
>> Try construct, but since the exception happens in the executor, the Try
>> construct didn't take effect. Do you have any ideas of how to handle this?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetO
>> utOfRangeException-tp26534.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Michael Armbrust
One option here would be to try Structured Streaming.  We've added an
option "failOnDataLoss" that will cause Spark to just skip a head when this
exception is encountered (its off by default though so you don't silently
miss data).

On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman <
ram.the.m...@gmail.com> wrote:

> I am using Spark streaming and reading data from Kafka using
> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
> smallest.
>
> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException
> and my spark job crashes.
>
> I want to understand if there is a graceful way to handle this failure and
> not kill the job. I want to keep ignoring these exceptions, as some other
> partitions are fine and I am okay with data loss.
>
> Is there any way to handle this and not have my spark job crash? I have no
> option of increasing the kafka retention period.
>
> I tried to have the DStream returned by createDirectStream() wrapped in a
> Try construct, but since the exception happens in the executor, the Try
> construct didn't take effect. Do you have any ideas of how to handle this?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-
> OffsetOutOfRangeException-tp26534.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to unit test spark streaming?

2017-03-07 Thread Michael Armbrust
>
> Basically you abstract your transformations to take in a dataframe and
> return one, then you assert on the returned df
>

+1 to this suggestion.  This is why we wanted streaming and batch
dataframes to share the same API.


Re: Why Spark cannot get the derived field of case class in Dataset?

2017-02-28 Thread Michael Armbrust
We only serialize things that are in the constructor.  You would have
access to it in the typed API (df.map(_.day)).  I'd suggest making a
factory method that fills these in and put them in the constructor if you
need to get to it from other dataframe operations.

On Tue, Feb 28, 2017 at 12:03 PM, Yong Zhang  wrote:

> In the following example, the "day" value is in the case class, but I
> cannot get that in the Spark dataset, which I would like to use at runtime?
> Any idea? Do I have to force it to be present in the case class
> constructor? I like to derive it out automatically and used in the dataset
> or dataframe.
>
>
> Thanks
>
>
> scala> spark.versionres12: String = 2.1.0
>
> scala> import java.text.SimpleDateFormatimport java.text.SimpleDateFormat
>
> scala> val dateFormat = new SimpleDateFormat("-MM-dd")dateFormat: 
> java.text.SimpleDateFormat = java.text.SimpleDateFormat@f67a0200
>
> scala> case class Test(time: Long) { |   val day = 
> dateFormat.format(time) | }defined class Testscala> val t = 
> Test(1487185076410L)t: Test = Test(1487185076410)
>
> scala> t.timeres13: Long = 1487185076410
>
> scala> t.dayres14: String = 2017-02-15
>
> scala> val ds = Seq(t).toDS()ds: org.apache.spark.sql.Dataset[Test] = [time: 
> bigint]
>
> scala> ds.show+-+| 
> time|+-+|1487185076410|+-+
>
>
>


Re: Pretty print a dataframe...

2017-02-16 Thread Michael Armbrust
The toString method of Dataset.queryExecution includes the various plans.
I usually just log that directly.

On Thu, Feb 16, 2017 at 8:26 AM, Muthu Jayakumar  wrote:

> Hello there,
>
> I am trying to write to log-line a dataframe/dataset queryExecution and/or
> its logical plan. The current code...
>
> def explain(extended: Boolean): Unit = {
>   val explain = ExplainCommand(queryExecution.logical, extended = extended)
>   
> sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach
>  {
> // scalastyle:off println
> r => println(r.getString(0))
> // scalastyle:on println
>   }
> }
>
> sessionState is not accessible if I were to write my own explain(log:
> LoggingAdapter).
>
> Please advice,
> Muthu
>


Re: Case class with POJO - encoder issues

2017-02-13 Thread Michael Armbrust
You are right, you need that PR.  I pinged the author, but otherwise it
would be great if someone could carry it over the finish line.

On Sat, Feb 11, 2017 at 4:19 PM, Jason White 
wrote:

> I'd like to create a Dataset using some classes from Geotools to do some
> geospatial analysis. In particular, I'm trying to use Spark to distribute
> the work based on ID and label fields that I extract from the polygon data.
>
> My simplified case class looks like this:
> implicit val geometryEncoder: Encoder[Geometry] = Encoders.kryo[Geometry]
> case class IndexedGeometry(label: String, tract: Geometry)
>
> When I try to create a dataset using this case class, it give me this error
> message:
> Exception in thread "main" java.lang.UnsupportedOperationException: No
> Encoder found for com.vividsolutions.jts.geom.Geometry
> - field (class: "com.vividsolutions.jts.geom.Geometry", name: "tract")
> - root class: "org.me.HelloWorld.IndexedGeometry"
>
> If I add another encoder for my case class...:
> implicit val indexedGeometryEncoder: Encoder[IndexedGeometry] =
> Encoders.kryo[IndexedGeometry]
>
> ...it works, but now the entire dataset has a single field, "value", and
> it's a binary blob.
>
> Is there a way to do what I'm trying to do?
> I believe this PR is related, but it's been idle since December:
> https://github.com/apache/spark/pull/15918
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Case-class-with-POJO-encoder-issues-tp28381.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Michael Armbrust
I think the fastest way is likely to use a combination of conditionals
(when / otherwise), first (ignoring nulls), while grouping by the id.  This
should get the answer with only a single shuffle.

Here is an example

.

On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski  wrote:

> Hi Everett,
>
> That's pretty much what I'd do. Can't think of a way to beat your
> solution. Why do you "feel vaguely uneasy about it"?
>
> I'd also check out the execution plan (with explain) to see how it's
> gonna work at runtime. I may have seen groupBy + join be better than
> window (there were more exchanges in play for windows I reckon).
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson 
> wrote:
> >
> >
> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski 
> wrote:
> >>
> >> Hi,
> >>
> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
> >> help here too.
> >
> >
> > This seems to work, but I do feel vaguely uneasy about it. :)
> >
> > // First add a 'rank' column which is priority order just in case
> priorities
> > aren't
> > // from 1 with no gaps.
> > val temp1 = data.withColumn("rank", functions.dense_rank()
> >.over(Window.partitionBy("id", "name").orderBy("priority")))
> >
> > +---++-+--+++
> > | id|name|extra|  data|priority|rank|
> > +---++-+--+++
> > |  1|Fred|8|value1|   1|   1|
> > |  1|Fred|8|value8|   2|   2|
> > |  1|Fred|8|value5|   3|   3|
> > |  2| Amy|9|value3|   1|   1|
> > |  2| Amy|9|value5|   2|   2|
> > +---++-+--+++
> >
> > // Now move all the columns we want to denormalize into a struct column
> to
> > keep them together.
> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
> > temp1("data"), temp1("priority")))
> >   .drop("extra", "data", "priority")
> >
> > +---++++
> > | id|name|rank| temp_struct|
> > +---++++
> > |  1|Fred|   1|[8,value1,1]|
> > |  1|Fred|   2|[8,value8,2]|
> > |  1|Fred|   3|[8,value5,3]|
> > |  2| Amy|   1|[9,value3,1]|
> > |  2| Amy|   2|[9,value5,2]|
> > +---++++
> >
> > // groupBy, again, but now pivot the rank column. We need an aggregate
> > function after pivot,
> > // so use first -- there will only ever be one element.
> > val temp3 = temp2.groupBy("id", "name")
> >   .pivot("rank", Seq("1", "2", "3"))
> >   .agg(functions.first("temp_struct"))
> >
> > +---+++++
> > | id|name|   1|   2|   3|
> > +---+++++
> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
> > |  2| Amy|[9,value3,1]|[9,value5,2]|null|
> > +---+++++
> >
> > // Now just moving things out of the structs and clean up.
> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
> >  .withColumn("data1", temp3("1").getField("data"))
> >  .withColumn("priority1", temp3("1").getField("priority"))
> >  .withColumn("extra2", temp3("2").getField("extra"))
> >  .withColumn("data2", temp3("2").getField("data"))
> >  .withColumn("priority2", temp3("2").getField("priority"))
> >  .withColumn("extra3", temp3("3").getField("extra"))
> >  .withColumn("data3", temp3("3").getField("data"))
> >  .withColumn("priority3", temp3("3").getField("priority"))
> >  .drop("1", "2", "3")
> >
> > +---++--+--+-+--+--+-+--
> +--+-+
> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
> > data3|priority3|
> > +---++--+--+-+--+--+-+--
> +--+-+
> > |  1|Fred| 8|value1|1| 8|value8|2| 8|value5|
> > 3|
> > |  2| Amy| 9|value3|1| 9|value5|2|  null|  null|
> > null|
> > +---++--+--+-+--+--+-+--
> +--+-+
> >
> >
> >
> >
> >
> >
> >
> >>
> >>
> >> Jacek
> >>
> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" 
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm trying to un-explode or denormalize a table like
> >>>
> >>> +---++-+--++
> >>> |id |name|extra|data  |priority|
> >>> +---++-+--++
> >>> |1  |Fred|8|value1|1   |
> >>> |1  |Fred|8|value8|2   |
> >>> |1  |Fred|8|value5|3   |
> >>> |2  |Amy |9|value3|1   |
> >>> |2  |Amy |9|value5|2   |
> >>> +---++-+--++
> >>>
> >>> into something 

Re: specifing schema on dataframe

2017-02-05 Thread Michael Armbrust
If you already have the expected schema, and you know that all numbers will
always be formatted as strings in the input JSON, you could probably derive
this list automatically.

Wouldn't it be simpler to just regex replace the numbers to remove the
> quotes?


I think this is likely to be a slower and less robust solution.  You would
have to make sure that you got all the corner cases right (i.e. escaping
and what not).

On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com> wrote:

> I see so for the connector I need to pass in an array/list of numerical
> columns?
>
> Wouldnt it be simpler to just regex replace the numbers to remove the
> quotes?
>
>
> Regards
> Sam
>
> On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Specifying the schema when parsing JSON will only let you pick between
>> similar datatypes (i.e should this be a short, long float, double etc).  It
>> will not let you perform conversions like string <-> number.  This has to
>> be done with explicit casts after the data has been loaded.
>>
>> I think you can make a solution that uses select or withColumn generic.
>> Just load the dataframe with a "parse schema" that treats numbers as
>> strings.  Then construct a list of columns that should be numbers and apply
>> the necessary conversions.
>>
>> import org.apache.spark.sql.functions.col
>> var df = spark.read.schema(parseSchema).json("...")
>> numericColumns.foreach { columnName =>
>>   df = df.withColumn(columnName, col(columnName).cast("long"))
>> }
>>
>>
>>
>> On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com>
>> wrote:
>>
>>> Thanks Micheal
>>>
>>> I've been spending the past few days researching this
>>>
>>> The problem is the generated json has double quotes on fields that are
>>> numbers because the producing datastore doesn't want to lose precision
>>>
>>> I can change the data type true but that would be on specific to a job
>>> rather than a generic streaming job. I'm writing a structured streaming
>>> connector and I have the schema the generated dataframe should match.
>>>
>>> Unfortunately using withColumn won't help me here since the solution
>>> needs to be generic
>>>
>>> To summarise assume I have the following json
>>>
>>> [{
>>> "customerid": "535137",
>>> "foo": "bar"
>>> }]
>>>
>>>
>>> and I know the schema should be:
>>> StructType(Array(StructField("customerid",LongType,true),Str
>>> uctField("foo",StringType,true)))
>>>
>>> Whats the best way of solving this?
>>>
>>> My current approach is to iterate over the JSON and identify which
>>> fields are numbers and which arent then recreate the json
>>>
>>> But to be honest that doesnt seem like the cleanest approach, so happy
>>> for advice on this
>>>
>>> Regards
>>> Sam
>>>
>>> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com>
>>> wrote:
>>>
>>>> -dev
>>>>
>>>> You can use withColumn to change the type after the data has been
>>>> loaded
>>>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
>>>> .
>>>>
>>>> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Direceu
>>>>
>>>> Thanks your right! that did work
>>>>
>>>>
>>>> But now im facing an even bigger problem since i dont have access to
>>>> change the underlying data, I just want to apply a schema over something
>>>> that was written via the sparkContext.newAPIHadoopRDD
>>>>
>>>> Basically I am reading in a RDD[JsonObject] and would like to convert
>>>> it into a dataframe which I pass the schema into
>>>>
>>>> Whats the best way to do this?
>>>>
>>>> I doubt removing all the quotes in the JSON is the best solution is it?
>>>>
>>>> Regards
>>>> Sam
>>>>
>>>> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
>>>> dirceu.semigh...@gmail.com> wrote:
>>>>
>>>> Hi Sam
&g

Re: specifing schema on dataframe

2017-02-05 Thread Michael Armbrust
Specifying the schema when parsing JSON will only let you pick between
similar datatypes (i.e should this be a short, long float, double etc).  It
will not let you perform conversions like string <-> number.  This has to
be done with explicit casts after the data has been loaded.

I think you can make a solution that uses select or withColumn generic.
Just load the dataframe with a "parse schema" that treats numbers as
strings.  Then construct a list of columns that should be numbers and apply
the necessary conversions.

import org.apache.spark.sql.functions.col
var df = spark.read.schema(parseSchema).json("...")
numericColumns.foreach { columnName =>
  df = df.withColumn(columnName, col(columnName).cast("long"))
}



On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com> wrote:

> Thanks Micheal
>
> I've been spending the past few days researching this
>
> The problem is the generated json has double quotes on fields that are
> numbers because the producing datastore doesn't want to lose precision
>
> I can change the data type true but that would be on specific to a job
> rather than a generic streaming job. I'm writing a structured streaming
> connector and I have the schema the generated dataframe should match.
>
> Unfortunately using withColumn won't help me here since the solution needs
> to be generic
>
> To summarise assume I have the following json
>
> [{
> "customerid": "535137",
> "foo": "bar"
> }]
>
>
> and I know the schema should be:
> StructType(Array(StructField("customerid",LongType,true),
> StructField("foo",StringType,true)))
>
> Whats the best way of solving this?
>
> My current approach is to iterate over the JSON and identify which fields
> are numbers and which arent then recreate the json
>
> But to be honest that doesnt seem like the cleanest approach, so happy for
> advice on this
>
> Regards
> Sam
>
> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> -dev
>>
>> You can use withColumn to change the type after the data has been loaded
>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
>> .
>>
>> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com>
>> wrote:
>>
>> Hi Direceu
>>
>> Thanks your right! that did work
>>
>>
>> But now im facing an even bigger problem since i dont have access to
>> change the underlying data, I just want to apply a schema over something
>> that was written via the sparkContext.newAPIHadoopRDD
>>
>> Basically I am reading in a RDD[JsonObject] and would like to convert it
>> into a dataframe which I pass the schema into
>>
>> Whats the best way to do this?
>>
>> I doubt removing all the quotes in the JSON is the best solution is it?
>>
>> Regards
>> Sam
>>
>> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
>> dirceu.semigh...@gmail.com> wrote:
>>
>> Hi Sam
>> Remove the " from the number that it will work
>>
>> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com>
>> escreveu:
>>
>> Hi All
>>
>> I would like to specify a schema when reading from a json but when trying
>> to map a number to a Double it fails, I tried FloatType and IntType with no
>> joy!
>>
>>
>> When inferring the schema customer id is set to String, and I would like
>> to cast it as Double
>>
>> so df1 is corrupted while df2 shows
>>
>>
>> Also FYI I need this to be generic as I would like to apply it to any
>> json, I specified the below schema as an example of the issue I am facing
>>
>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>> DoubleType,FloatType, StructType, LongType,DecimalType}
>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>> val df1 = 
>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> val df2 = 
>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> df1.show(1)
>> df2.show(1)
>>
>>
>> Any help would be appreciated, I am sure I am missing something obvious
>> but for the life of me I cant tell what it is!
>>
>>
>> Kind Regards
>> Sam
>>
>>
>>
>>


  1   2   3   4   5   6   7   8   9   10   >