Re: Announcing Delta Lake 0.2.0

2019-06-21 Thread Michael Armbrust
> > Thanks for confirmation. We are using the workaround to create a separate > Hive external table STORED AS PARQUET with the exact location of Delta > table. Our use case is batch-driven and we are running VACUUM with 0 > retention after every batch is completed. Do you see any potential problem

Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

2018-05-11 Thread Michael Armbrust
Hmm yeah that does look wrong. Would be great if someone opened a PR to correct the docs :) On Thu, May 10, 2018 at 5:13 PM Yuta Morisawa wrote: > The problem is solved. > The actual schema of Kafka message is different from documentation. > > >

Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Michael Armbrust
You can calculate argmax using a struct. df.groupBy($"id").agg(max($"my_timestamp", struct($"*").as("data")).getField("data").select($"data.*") You could transcode this to SQL, it'll just be complicated nested queries. On Wed, Apr 18, 2018 at 3:40 PM, kant kodali wrote: >

Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

2018-03-20 Thread Michael Armbrust
Those options will not affect structured streaming. You are looking for .option("maxOffsetsPerTrigger", "1000") We are working on improving this by building a generic mechanism into the Streaming DataSource V2 so that the engine can do admission control on the amount of data returned in a

Re: [Structured Streaming] Deserializing avro messages from kafka source using schema registry

2018-02-09 Thread Michael Armbrust
This isn't supported yet, but there is on going work at spark-avro to enable this use case. Stay tuned. On Fri, Feb 9, 2018 at 3:07 PM, Bram wrote: > Hi, > > I couldn't find any documentation about avro message

Re: [Structured Streaming] Commit protocol to move temp files to dest path only when complete, with code

2018-02-09 Thread Michael Armbrust
We didn't go this way initially because it doesn't work on storage systems that have weaker guarantees than HDFS with respect to rename. That said, I'm happy to look at other options if we want to make this configurable. On Fri, Feb 9, 2018 at 2:53 PM, Dave Cameron

Re: Prefer Structured Streaming over Spark Streaming (DStreams)?

2018-01-31 Thread Michael Armbrust
At this point I recommend that new applications are built using structured streaming. The engine was GA-ed as of Spark 2.2 and I know of several very large (trillions of records) production jobs that are running in Structured Streaming. All of our production pipelines at databricks are written

Re: Max number of streams supported ?

2018-01-31 Thread Michael Armbrust
-dev +user > Similarly for structured streaming, Would there be any limit on number of > of streaming sources I can have ? > There is no fundamental limit, but each stream will have a thread on the driver that is doing coordination of execution. We comfortably run 20+ streams on a single

Re: Dataset API inconsistencies

2018-01-10 Thread Michael Armbrust
I wrote Datasets, and I'll say I only use them when I really need to (i.e. when it would be very cumbersome to express what I am trying to do relationally). Dataset operations are almost always going to be slower than their DataFrame equivalents since they usually require materializing objects

Re: Spark error while trying to spark.read.json()

2017-12-19 Thread Michael Armbrust
- dev java.lang.AbstractMethodError almost always means that you have different libraries on the classpath than at compilation time. In this case I would check to make sure you have the correct version of Scala (and only have one version of scala) on the classpath. On Tue, Dec 19, 2017 at 5:42

Re: Kafka version support

2017-11-30 Thread Michael Armbrust
Oh good question. I was saying that the stock structured streaming connector should be able to talk to 0.11 or 1.0 brokers. On Thu, Nov 30, 2017 at 1:12 PM, Cody Koeninger wrote: > Are you talking about the broker version, or the kafka-clients artifact > version? > > On

Re: Kafka version support

2017-11-30 Thread Michael Armbrust
I would expect that to work. On Wed, Nov 29, 2017 at 10:17 PM, Raghavendra Pandey < raghavendra.pan...@gmail.com> wrote: > Just wondering if anyone has tried spark structured streaming kafka > connector (2.2) with Kafka 0.11 or Kafka 1.0 version > > Thanks > Raghav >

Re: Generate windows on processing time in Spark Structured Streaming

2017-11-10 Thread Michael Armbrust
Hmmm, we should allow that. current_timestamp() is acutally deterministic within any given batch. Could you open a JIRA ticket? On Fri, Nov 10, 2017 at 1:52 AM, wangsan wrote: > Hi all, > > How can I use current processing time to generate windows in streaming > processing? >

Re: [Spark Structured Streaming] Changing partitions of (flat)MapGroupsWithState

2017-11-08 Thread Michael Armbrust
The relevant config is spark.sql.shuffle.partitions. Note that once you start a query, this number is fixed. The config will only affect queries starting from an empty checkpoint. On Wed, Nov 8, 2017 at 7:34 AM, Teemu Heikkilä wrote: > I have spark structured streaming job

Re: Structured Stream equivalent of reduceByKey

2017-11-06 Thread Michael Armbrust
ate > store interactions. > > Also anyone aware of any design doc or some example about how we can add > new operation on dataSet and corresponding physical plan. > > > > On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mich...@databricks.com> > wrote: >

Re: Structured Stream equivalent of reduceByKey

2017-10-26 Thread Michael Armbrust
- dev I think you should be able to write an Aggregator . You probably want to run in update mode if you are looking for it to output any group that has changed in the batch. On Wed, Oct 25,

Re: Implement Dataset reader from SEQ file with protobuf to Dataset

2017-10-08 Thread Michael Armbrust
spark-avro would be a good example to start with. On Sun, Oct 8, 2017 at 3:00 AM, Serega Sheypak wrote: > Hi, did anyone try to implement Spark SQL dataset reader from SEQ file > with protobuf inside to Dataset? > > Imagine I

Re: Chaining Spark Streaming Jobs

2017-09-18 Thread Michael Armbrust
uery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222) > > While running on the EMR cluster all paths poin

Re: [SS] Any way to optimize memory consumption of SS?

2017-09-14 Thread Michael Armbrust
rt() > > query.awaitTermination() > > *and I use play json to parse input logs from kafka ,the parse function is > like* > > def parseFunction(str: String): (Long, String) = { > val json = Json.parse(str) > val timestamp = (json \ "time").get.toString(

Re: [SS]How to add a column with custom system time?

2017-09-14 Thread Michael Armbrust
.groupBy($"date") >>> .count() >>> .withColumn("window", window(current_timestamp(), "15 minutes")) >>> >>> /** >>> * output >>> */ >>> val query = results >>> .writeSt

Re: [Structured Streaming] Multiple sources best practice/recommendation

2017-09-14 Thread Michael Armbrust
I would probably suggest that you partition by format (though you can get the file name from the build in function input_file_name()). You can load multiple streams from different directories and union them together as long as the schema is the same after parsing. Otherwise you can just run

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread Michael Armbrust
Can you show all the code? This works for me. On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote: > The spark version is 2.2.0 > > Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道: > >> Which version of spark? >> >

Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread Michael Armbrust
Can you show the full query you are running? On Tue, Sep 12, 2017 at 10:11 AM, 张万新 wrote: > Hi, > > I'm using structured streaming to count unique visits of our website. I > use spark on yarn mode with 4 executor instances and from 2 cores * 5g > memory to 4 cores * 10g

Re: [SS]How to add a column with custom system time?

2017-09-11 Thread Michael Armbrust
inistic expressions are only allowed in > > Project, Filter, Aggregate or Window" > > Can you give more advice? > > Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道: > >> import org.apache.spark.sql.functions._ >> >> df.withColumn("w

Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-11 Thread Michael Armbrust
The following will convert the whole row to JSON. import org.apache.spark.sql.functions.* df.select(to_json(struct(col("*" On Sat, Sep 9, 2017 at 6:27 PM, kant kodali wrote: > Thanks Ryan! In this case, I will have Dataset so is there a way to > convert Row to Json

Re: Need some Clarification on checkpointing w.r.t Spark Structured Streaming

2017-09-11 Thread Michael Armbrust
Checkpoints record what has been processed for a specific query, and as such only need to be defined when writing (which is how you "start" a query). You can use the DataFrame created with readStream to start multiple queries, so it wouldn't really make sense to have a single checkpoint there.

Re: [SS]How to add a column with custom system time?

2017-09-11 Thread Michael Armbrust
import org.apache.spark.sql.functions._ df.withColumn("window", window(current_timestamp(), "15 minutes")) On Mon, Sep 11, 2017 at 3:03 AM, 张万新 wrote: > Hi, > > In structured streaming how can I add a column to a dataset with current > system time aligned with 15

Re: Joining 2 dataframes, getting result as nested list/structure in dataframe

2017-08-23 Thread Michael Armbrust
You can create a nested struct that contains multiple columns using struct(). Here's a pretty complete guide on working with nested data: https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html On Wed, Aug 23, 2017 at 2:30 PM, JG Perrin

Re: Chaining Spark Streaming Jobs

2017-08-23 Thread Michael Armbrust
If you use structured streaming and the file sink, you can have a subsequent stream read using the file source. This will maintain exactly once processing even if there are hiccups or failures. On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind wrote: > Hello Spark Experts,

Re: Question on how to get appended data from structured streaming

2017-08-20 Thread Michael Armbrust
What is your end goal? Right now the foreach writer is the way to do arbitrary processing on the data produced by various output modes. On Sun, Aug 20, 2017 at 12:23 PM, Yanpeng Lin wrote: > Hello, > > I am new to Spark. > It would be appreciated if anyone could help me

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Michael Armbrust
See https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing Though I think that this currently doesn't work with the console sink. On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep wrote: > Hi, > >> >>

Re: Question about 'Structured Streaming'

2017-08-08 Thread Michael Armbrust
> > 1) Parsing data/Schema creation: The Bro IDS logs have a 8 line header > that contains the 'schema' for the data, each log http/dns/etc will have > different columns with different data types. So would I create a specific > CSV reader inherited from the general one? Also I'm assuming this

Re: Question about 'Structured Streaming'

2017-08-08 Thread Michael Armbrust
Cool stuff! A pattern I have seen is to use our CSV/TSV or JSON support to read bro logs, rather than a python library. This is likely to have much better performance since we can do all of the parsing on the JVM without having to flow it though an external python process. On Tue, Aug 8, 2017 at

Re: [SS] Console sink not supporting recovering from checkpoint location? Why?

2017-08-07 Thread Michael Armbrust
I think there is really no good reason for this limitation. On Mon, Aug 7, 2017 at 2:58 AM, Jacek Laskowski wrote: > Hi, > > While exploring checkpointing with kafka source and console sink I've > got the exception: > > // today's build from the master > scala> spark.version >

Re: how to convert the binary from kafak to srring pleaae

2017-07-24 Thread Michael Armbrust
There are end to end examples of using Kafka in in this blog: https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html On Sun, Jul 23, 2017 at 7:44 PM, 萝卜丝炒饭 <1427357...@qq.com> wrote: > Hi all > > I want to change the binary from

Re: custom joins on dataframe

2017-07-23 Thread Michael Armbrust
> > left.join(right, my_fuzzy_udf (left("cola"),right("cola"))) > While this could work, the problem will be that we'll have to check every possible combination of tuples from left and right using your UDF. It would be best if you could somehow partition the problem so that we could reduce the

Re: Flatten JSON to multiple columns in Spark

2017-07-18 Thread Michael Armbrust
Here is an overview of how to work with complex JSON in Spark: https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html (works in streaming and batch) On Tue, Jul 18, 2017 at 10:29 AM, Riccardo Ferrari wrote: > What's

[ANNOUNCE] Announcing Apache Spark 2.2.0

2017-07-11 Thread Michael Armbrust
Hi all, Apache Spark 2.2.0 is the third release of the Spark 2.x line. This release removes the experimental tag from Structured Streaming. In addition, this release focuses on usability, stability, and polish, resolving over 1100 tickets. We'd like to thank our contributors and users for their

Re: Event time aggregation is possible in Spark Streaming ?

2017-07-10 Thread Michael Armbrust
Event-time aggregation is only supported in Structured Streaming. On Sat, Jul 8, 2017 at 4:18 AM, Swapnil Chougule wrote: > Hello, > > I want to know whether event time aggregation in spark streaming. I could > see it's possible in structured streaming. As I am working

Re: Union of 2 streaming data frames

2017-07-10 Thread Michael Armbrust
gt; going to be out soon? Do you have some sort of ETA? > > > > *From: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> > *Date: *Friday, July 7, 2017 at 5:46 PM > *To: *Michael Armbrust <mich...@databricks.com> > > *Cc: *"user@spark.apache.or

Re: Union of 2 streaming data frames

2017-07-07 Thread Michael Armbrust
pache.spark.sql.execution.streaming. > StreamExecution$$anonfun$org$apache$spark$sql$execution$ > streaming$StreamExecution$$runBatches$1.apply$mcZ$sp( > StreamExecution.scala:244) > > at org.apache.spark.sql.execution.streaming. > ProcessingTimeExecutor.execute(TriggerEx

Re: Union of 2 streaming data frames

2017-07-07 Thread Michael Armbrust
df.union(df2) should be supported when both DataFrames are created from a streaming source. What error are you seeing? On Fri, Jul 7, 2017 at 11:27 AM, Lalwani, Jayesh < jayesh.lalw...@capitalone.com> wrote: > In structured streaming, Is there a way to Union 2 streaming data frames? > Are there

Re: If I pass raw SQL string to dataframe do I still get the Spark SQL optimizations?

2017-07-06 Thread Michael Armbrust
It goes through the same optimization pipeline. More in this video . On Thu, Jul 6, 2017 at 5:28 PM, kant kodali wrote: > HI All, > > I am wondering If I pass a raw SQL string to dataframe do I still get the > Spark SQL optimizations? why

Re: Interesting Stateful Streaming question

2017-06-30 Thread Michael Armbrust
This does sound like a good use case for that feature. Note that Spark 2.2. adds a similar [flat]MapGroupsWithState operation to structured streaming. Stay tuned for a blog post on that! On Thu, Jun 29, 2017 at 6:11 PM, kant kodali wrote: > Is mapWithState an answer for

Re: org.apache.spark.sql.types missing from spark-sql_2.11-2.1.1.jar?

2017-06-20 Thread Michael Armbrust
It's in the spark-catalyst_2.11-2.1.1.jar since the logical query plans and optimization also need to know about types. On Tue, Jun 20, 2017 at 1:14 PM, Jean Georges Perrin wrote: > Hey all, > > i was giving a run to 2.1.1 and got an error on one of my test program: > > package

Re: how many topics spark streaming can handle

2017-06-19 Thread Michael Armbrust
I don't think that there is really a Spark specific limit here. It would be a function of the size of your spark / kafka clusters and the type of processing you are trying to do. On Mon, Jun 19, 2017 at 12:00 PM, Ashok Kumar wrote: > Hi Gurus, > > Within one Spark

Re: the scheme in stream reader

2017-06-19 Thread Michael Armbrust
The socket source can't know how to parse your data. I think the right thing would be for it to throw an exception saying that you can't set the schema here. Would you mind opening a JIRA ticket? If you are trying to parse data from something like JSON then you should use from_json` on the

Re: Nested "struct" fonction call creates a compilation error in Spark SQL

2017-06-15 Thread Michael Armbrust
> you think ? > > Regards, > > Olivier. > > > 2017-06-15 21:08 GMT+02:00 Michael Armbrust <mich...@databricks.com>: > >> Which version of Spark? If its recent I'd open a JIRA. >> >> On Thu, Jun 15, 2017 at 6:04 AM, Olivier Girardot < >>

Re: Nested "struct" fonction call creates a compilation error in Spark SQL

2017-06-15 Thread Michael Armbrust
Which version of Spark? If its recent I'd open a JIRA. On Thu, Jun 15, 2017 at 6:04 AM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: > Hi everyone, > when we create recursive calls to "struct" (up to 5 levels) for extending > a complex datastructure we end up with the following

Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-15 Thread Michael Armbrust
anks! > > On Wed, Jun 14, 2017 at 5:32 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> This a good question. I really like using Kafka as a centralized source >> for streaming data in an organization and, with Spark 2.2, we have full >> support

Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-14 Thread Michael Armbrust
This a good question. I really like using Kafka as a centralized source for streaming data in an organization and, with Spark 2.2, we have full support for reading and writing data to/from Kafka in both streaming and batch

Re: Running into the same problem as JIRA SPARK-20325

2017-05-31 Thread Michael Armbrust
> > So, my question is the same as stated in the following ticket which is Do > we need create a checkpoint directory for each individual query? > Yes. Checkpoints record what data has been processed. Thus two different queries need their own checkpoints.

Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread Michael Armbrust
-dev Have you tried clearing out the checkpoint directory? Can you also give the full stack trace? On Wed, May 24, 2017 at 3:45 PM, kant kodali wrote: > Even if I do simple count aggregation like below I get the same error as >

Re: Impact of coalesce operation before writing dataframe

2017-05-23 Thread Michael Armbrust
coalesce is nice because it does not shuffle, but the consequence of avoiding a shuffle is it will also reduce parallelism of the preceding computation. Have you tried using repartition instead? On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi < andrii.bilets...@yahoo.com.invalid> wrote: > Hi

Re: Are there any Kafka forEachSink examples?

2017-05-23 Thread Michael Armbrust
There is an example in this post: https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html On Tue, May 23, 2017 at 11:35 AM, kant kodali wrote: > Hi All, > > Are there any Kafka forEachSink examples

Re: 2.2. release date ?

2017-05-23 Thread Michael Armbrust
Mark is right. I will cut another RC as soon as the known issues are resolve. In the mean time it would be very helpful for people to test RC2 and report issues. On Tue, May 23, 2017 at 11:10 AM, Mark Hamstra wrote: > I heard that once we reach release candidates it's

Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-22 Thread Michael Armbrust
There is an RC here. Please test! http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html On Fri, May 19, 2017 at 4:07 PM, kant kodali wrote: > Hi Patrick, > > I am using 2.1.1 and I tried the above code you sent and I get > >

Re: How to see the full contents of dataset or dataframe is structured streaming?

2017-05-18 Thread Michael Armbrust
You can write it to the memory sink. df.writeStream.format("memory").queryName("myStream").start() spark.table("myStream").show() On Wed, May 17, 2017 at 7:55 PM, kant kodali wrote: > Hi All, > > How to see the full contents of dataset or dataframe is structured >

Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread Michael Armbrust
is in all spark machines under SPARK_HOME/jars. > > Still same error seems to persist. Is that the right jar or is there > anything else I need to add? > > Thanks! > > > > On Tue, May 16, 2017 at 1:40 PM, Michael Armbrust <mich...@databricks.com> > wrote: > &

Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread Michael Armbrust
Looks like you are missing the kafka dependency. On Tue, May 16, 2017 at 1:04 PM, kant kodali wrote: > Looks like I am getting the following runtime exception. I am using Spark > 2.1.0 and the following jars > > *spark-sql_2.11-2.1.0.jar* > >

Re: what is the difference between json format vs kafka format?

2017-05-15 Thread Michael Armbrust
For that simple count, you don't actually have to even parse the JSON data. You can just do a count. The following code assumes you are running Spark 2.2 .

Re: Spark SQL DataFrame to Kafka Topic

2017-05-15 Thread Michael Armbrust
The foreach sink from that blog post requires that you have a DataFrame with two columns in the form of a Tuple2, (String, String), where as your dataframe has only a single column `payload`. You could change the KafkaSink to extend ForeachWriter[KafkaMessage] and then it would work. I'd also

Re: Reading Avro messages from Kafka using Structured Streaming in Spark 2.1

2017-05-12 Thread Michael Armbrust
I believe that Avro/Kafka messages have a few bytes at the beginning of the message to denote which schema is being used. Have you tried using the KafkaAvroDecoder inside of the map instead? On Fri, May 12, 2017 at 9:26 AM, Revin Chalil wrote: > Just following up on this;

Re: Convert DStream into Streaming Dataframe

2017-05-12 Thread Michael Armbrust
Are there any particular things that the DataFrame or Dataset API are missing? On Fri, May 12, 2017 at 9:49 AM, Tejinder Aulakh wrote: > Hi, > > Is there any way to convert a DStream to a streaming dataframe? I want to > use Structured streaming in a new common module

Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Michael Armbrust
s. if that's clear, I could probably annotate my > bean class properly > > On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust <mich...@databricks.com> > wrote: > >> I think you are supposed to set BeanProperty on a var as they do here >> <https://github.com/apache/

Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Michael Armbrust
eDataFrame( > SparkSession.scala:251) > at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:278) > ... 54 elided > > On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust <mich...@databricks.com> > wrote: > >> I think you are supposed to set Bea

Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Michael Armbrust
I think you are supposed to set BeanProperty on a var as they do here . If you are using scala though I'd consider using the case

Re: What are Analysis Errors With respect to Spark Sql DataFrames and DataSets?

2017-05-03 Thread Michael Armbrust
> > if I do dataset.select("nonExistentColumn") then the Analysis Error is > thrown at compile time right? > if you do df.as[MyClass].map(_.badFieldName) you will get a compile error. However, if df doesn't have the right columns for MyClass, that error will only be thrown at runtime (whether DF

Re: What are Analysis Errors With respect to Spark Sql DataFrames and DataSets?

2017-05-03 Thread Michael Armbrust
An analysis exception occurs whenever the scala/java/python program is valid, but the dataframe operations being performed are not. For example, df.select("nonExistentColumn") would throw an analysis exception. On Wed, May 3, 2017 at 1:38 PM, kant kodali wrote: > Hi All, >

[ANNOUNCE] Apache Spark 2.1.1

2017-05-02 Thread Michael Armbrust
We are happy to announce the availability of Spark 2.1.1! Apache Spark 2.1.1 is a maintenance release, based on the branch-2.1 maintenance branch of Spark. We strongly recommend all 2.1.x users to upgrade to this stable release. To download Apache Spark 2.1.1 visit

Re: Schema Evolution for nested Dataset[T]

2017-05-01 Thread Michael Armbrust
Oh, and if you want a default other than null: import org.apache.spark.sql.functions._ df.withColumn("address", coalesce($"address", lit()) On Mon, May 1, 2017 at 10:29 AM, Michael Armbrust <mich...@databricks.com> wrote: > The following should work

Re: Schema Evolution for nested Dataset[T]

2017-05-01 Thread Michael Armbrust
The following should work: val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema spark.read.schema(schema).parquet("data.parquet").as[Course] Note this will only work for nullable files (i.e. if you add a primitive like Int you need to make it an Option[Int]) On Sun, Apr 30, 2017

Re: Arraylist is empty after JavaRDD.foreach

2017-04-24 Thread Michael Armbrust
Foreach runs on the executors and so is not able to modify an array list that is only present on the driver. You should just call collectAsList on the DataFrame. On Mon, Apr 24, 2017 at 10:36 AM, Devender Yadav < devender.ya...@impetus.co.in> wrote: > Hi All, > > > I am using Spark 1.6.2 and

Re: SPARK-20325 - Spark Structured Streaming documentation Update: checkpoint configuration

2017-04-14 Thread Michael Armbrust
> > 1) could we update documentation for Structured Streaming and describe > that checkpointing could be specified by > spark.sql.streaming.checkpointLocation > on SparkSession level and thus automatically checkpoint dirs will be > created per foreach query? > > Sure, please open a pull request.

Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Michael Armbrust
s > updates and produce results every second. I also need to reset the state > (the count) back to zero every 24 hours. > > > > > > > On Mon, Apr 10, 2017 at 11:49 AM, Michael Armbrust <mich...@databricks.com > > wrote: > >> Nope, structured streaming

Re: Cant convert Dataset to case class with Option fields

2017-04-10 Thread Michael Armbrust
Options should work. Can you give a full example that is freezing? Which version of Spark are you using? On Fri, Apr 7, 2017 at 6:59 AM, Dirceu Semighini Filho < dirceu.semigh...@gmail.com> wrote: > Hi Devs, > I've some case classes here, and it's fields are all optional > case class

Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Michael Armbrust
Nope, structured streaming eliminates the limitation that micro-batching should affect the results of your streaming query. Trigger is just an indication of how often you want to produce results (and if you leave it blank we just run as quickly as possible). To control how tuples are grouped

Re: map transform on array in spark sql

2017-04-04 Thread Michael Armbrust
If you can find the name of the struct field from the schema you can just do: df.select($"arrayField.a") Selecting a field from an array returns an array with that field selected from each element. On Mon, Apr 3, 2017 at 8:18 PM, Koert Kuipers wrote: > i have a DataFrame

Re: Convert Dataframe to Dataset in pyspark

2017-04-03 Thread Michael Armbrust
You don't need encoders in python since its all dynamically typed anyway. You can just do the following if you want the data as a string. sqlContext.read.text("/home/spark/1.6/lines").rdd.map(lambda row: row.value) 2017-04-01 5:36 GMT-07:00 Selvam Raman : > In Scala, > val ds

Re: Why VectorUDT private?

2017-03-30 Thread Michael Armbrust
I think really the right way to think about things that are marked private is, "this may disappear or change in a future minor release". If you are okay with that, working about the visibility restrictions is reasonable. On Thu, Mar 30, 2017 at 5:52 AM, Koert Kuipers wrote:

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
t; .select("message.*") // unnest the json > .as(Encoders.bean(Tweet.class)) > > Thanks > Kaniska > > - > > On Mon, Mar 27, 2017 at 1:25 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >>

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
ified in the email) ? > > col("value").cast("string") - throwing an error 'cannot find symbol > method col(java.lang.String)' > I tried $"value" which results into similar compilation error. > > Thanks > Kaniska > > > > On Mon, Mar 27, 2017 at

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
, kaniska Mandal <kaniska.man...@gmail.com> wrote: > Hi Michael, > > Thanks much for the suggestion. > > I was wondering - whats the best way to deserialize the 'value' field > > > On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <mich...@databricks.com > >

Re: How to insert nano seconds in the TimestampType in Spark

2017-03-27 Thread Michael Armbrust
The timestamp type is only microsecond precision. You would need to store it on your own (as binary or limited range long or something) if you require nanosecond precision. On Mon, Mar 27, 2017 at 5:29 AM, Devender Yadav < devender.ya...@impetus.co.in> wrote: > Hi All, > > I am using spark

Re: unable to stream kafka messages

2017-03-24 Thread Michael Armbrust
Encoders can only map data into an object if those columns already exist. When we are reading from Kafka, we just get a binary blob and you'll need to help Spark parse that first. Assuming your data is stored in JSON it should be pretty straight forward. streams = spark .readStream()

Re: how to read object field within json file

2017-03-24 Thread Michael Armbrust
I'm not sure you can parse this as an Array, but you can hint to the parser that you would like to treat source as a map instead of as a struct. This is a good strategy when you have dynamic columns in your data. Here is an example of the schema you can use to parse this JSON and also how to use

Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread Michael Armbrust
ing to save to the cassandra DB and try to keep shuffle operations to > a strict minimum (at best none). As of now we are not entirely pleased with > our current performances, that's why I'm doing a kafka topic sharding POC > and getting the executor to handle the specificied partitio

Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread Michael Armbrust
Sorry, typo. Should be a repartition not a groupBy. > spark.readStream > .format("kafka") > .option("kafka.bootstrap.servers", "...") > .option("subscribe", "t0,t1") > .load() > .repartition($"partition") > .writeStream > .foreach(... code to write to cassandra ...) >

Re: Streaming 2.1.0 - window vs. batch duration

2017-03-16 Thread Michael Armbrust
Have you considered trying event time aggregation in structured streaming instead? On Thu, Mar 16, 2017 at 12:34 PM, Dominik Safaric wrote: > Hi all, > > As I’ve implemented a streaming application pulling data from Kafka every > 1 second (batch interval), I am

Re: [Spark Streaming+Kafka][How-to]

2017-03-16 Thread Michael Armbrust
I think it should be straightforward to express this using structured streaming. You could ensure that data from a given partition ID is processed serially by performing a group by on the partition column. spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "...")

Re: Structured Streaming - Can I start using it?

2017-03-13 Thread Michael Armbrust
I think its very very unlikely that it will get withdrawn. The primary reason that the APIs are still marked experimental is that we like to have several releases before committing to interface stability (in particular the interfaces to write custom sources and sinks are likely to evolve). Also,

Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Michael Armbrust
'm experiencing a similar issue. Will this not be fixed in Spark > Streaming? > > Best, > Justin > > On Mar 10, 2017, at 8:34 AM, Michael Armbrust <mich...@databricks.com> > wrote: > > One option here would be to try Structured Streaming. We've added an > option

Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Michael Armbrust
One option here would be to try Structured Streaming. We've added an option "failOnDataLoss" that will cause Spark to just skip a head when this exception is encountered (its off by default though so you don't silently miss data). On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman <

Re: How to unit test spark streaming?

2017-03-07 Thread Michael Armbrust
> > Basically you abstract your transformations to take in a dataframe and > return one, then you assert on the returned df > +1 to this suggestion. This is why we wanted streaming and batch dataframes to share the same API.

Re: Why Spark cannot get the derived field of case class in Dataset?

2017-02-28 Thread Michael Armbrust
We only serialize things that are in the constructor. You would have access to it in the typed API (df.map(_.day)). I'd suggest making a factory method that fills these in and put them in the constructor if you need to get to it from other dataframe operations. On Tue, Feb 28, 2017 at 12:03 PM,

Re: Pretty print a dataframe...

2017-02-16 Thread Michael Armbrust
The toString method of Dataset.queryExecution includes the various plans. I usually just log that directly. On Thu, Feb 16, 2017 at 8:26 AM, Muthu Jayakumar wrote: > Hello there, > > I am trying to write to log-line a dataframe/dataset queryExecution and/or > its logical

Re: Case class with POJO - encoder issues

2017-02-13 Thread Michael Armbrust
You are right, you need that PR. I pinged the author, but otherwise it would be great if someone could carry it over the finish line. On Sat, Feb 11, 2017 at 4:19 PM, Jason White wrote: > I'd like to create a Dataset using some classes from Geotools to do some >

Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Michael Armbrust
I think the fastest way is likely to use a combination of conditionals (when / otherwise), first (ignoring nulls), while grouping by the id. This should get the answer with only a single shuffle. Here is an example

Re: specifing schema on dataframe

2017-02-05 Thread Michael Armbrust
ns? > > Wouldnt it be simpler to just regex replace the numbers to remove the > quotes? > > > Regards > Sam > > On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> Specifying the schema when parsing JSON wil

Re: specifing schema on dataframe

2017-02-05 Thread Michael Armbrust
olving this? > > My current approach is to iterate over the JSON and identify which fields > are numbers and which arent then recreate the json > > But to be honest that doesnt seem like the cleanest approach, so happy for > advice on this > > Rega

  1   2   3   4   5   6   7   8   9   10   >