Re: specifing schema on dataframe

2017-02-05 Thread Michael Armbrust
-dev

You can use withColumn to change the type after the data has been loaded

.

On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin  wrote:

> Hi Direceu
>
> Thanks your right! that did work
>
>
> But now im facing an even bigger problem since i dont have access to
> change the underlying data, I just want to apply a schema over something
> that was written via the sparkContext.newAPIHadoopRDD
>
> Basically I am reading in a RDD[JsonObject] and would like to convert it
> into a dataframe which I pass the schema into
>
> Whats the best way to do this?
>
> I doubt removing all the quotes in the JSON is the best solution is it?
>
> Regards
> Sam
>
> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hi Sam
>> Remove the " from the number that it will work
>>
>> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" 
>> escreveu:
>>
>>> Hi All
>>>
>>> I would like to specify a schema when reading from a json but when
>>> trying to map a number to a Double it fails, I tried FloatType and IntType
>>> with no joy!
>>>
>>>
>>> When inferring the schema customer id is set to String, and I would like
>>> to cast it as Double
>>>
>>> so df1 is corrupted while df2 shows
>>>
>>>
>>> Also FYI I need this to be generic as I would like to apply it to any
>>> json, I specified the below schema as an example of the issue I am facing
>>>
>>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>>> DoubleType,FloatType, StructType, LongType,DecimalType}
>>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>>> val df1 = 
>>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>>> val df2 = 
>>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>>> df1.show(1)
>>> df2.show(1)
>>>
>>>
>>> Any help would be appreciated, I am sure I am missing something obvious
>>> but for the life of me I cant tell what it is!
>>>
>>>
>>> Kind Regards
>>> Sam
>>>
>>
>


Re: frustration with field names in Dataset

2017-02-02 Thread Michael Armbrust
That might be reasonable.  At least I can't think of any problems with
doing that.

On Thu, Feb 2, 2017 at 7:39 AM, Koert Kuipers  wrote:

> since a dataset is a typed object you ideally don't have to think about
> field names.
>
> however there are operations on Dataset that require you to provide a
> Column, like for example joinWith (and joinWith returns a strongly typed
> Dataset, not DataFrame). once you have to provide a Column you are back to
> thinking in field names, and worrying about duplicate field names, which is
> something that can easily happen in a Dataset without you realizing it.
>
> so under the hood Dataset has unique identifiers for every column, as in
> dataset.queryExecution.logical.output, but these are expressions
> (attributes) that i cannot turn back into columns since the constructors
> for this are private in spark.
>
> so how about having Dataset.apply(i: Int): Column to allow me to pick
> columns by position without having to worry about (duplicate) field names?
> then i could do something like:
>
> dataset.joinWith(otherDataset, dataset(0) === otherDataset(0), joinType)
>


Re: Parameterized types and Datasets - Spark 2.1.0

2017-02-01 Thread Michael Armbrust
This is the error, you are missing an import:

:13: error: not found: type Encoder
   abstract class RawTable[A : Encoder](inDir: String) {

Works for me in a REPL.
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/204687029790319/2840265927289860/latest.html>

On Wed, Feb 1, 2017 at 3:34 PM, Don Drake <dondr...@gmail.com> wrote:

> Thanks for the reply.   I did give that syntax a try [A : Encoder]
> yesterday, but I kept getting this exception in a spark-shell and Zeppelin
> browser.
>
> scala> import org.apache.spark.sql.Encoder
> import org.apache.spark.sql.Encoder
>
> scala>
>
> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
> java.sql.Timestamp, data_filename: String)
> defined class RawTemp
>
> scala>
>
> scala> import spark.implicits._
> import spark.implicits._
>
> scala>
>
> scala> abstract class RawTable[A : Encoder](inDir: String) {
>  | import spark.implicits._
>  | def load() = {
>  | import spark.implicits._
>  | spark.read
>  | .option("header", "true")
>  | .option("mode", "FAILFAST")
>  | .option("escape", "\"")
>  | .option("nullValue", "")
>  | .option("indferSchema", "true")
>  | .csv(inDir)
>  | .as[A]
>  | }
>  | }
> :13: error: not found: type Encoder
>abstract class RawTable[A : Encoder](inDir: String) {
>^
> :24: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing spark.implicits._  Support for serializing other
> types will be added in future releases.
>.as[A]
>
>
> I gave it a try today in a Scala application and it seems to work.  Is
> this a known issue in a spark-shell?
>
> In my Scala application, this is being defined in a separate file, etc.
> without direct access to a Spark session.
>
> I had to add the following code snippet so the import spark.implicits._
> would take effect:
>
> // ugly hack to get around Encoder can't be found compile time errors
>
> private object myImplicits extends SQLImplicits {
>
>   protected override def _sqlContext: SQLContext = MySparkSingleton.
> getCurrentSession().sqlContext
>
> }
>
> import myImplicits._
>
> I found that in about the hundredth SO post I searched for this problem.
> Is this the best way to let implicits do its thing?
>
> Thanks.
>
> -Don
>
>
>
> On Wed, Feb 1, 2017 at 3:16 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> You need to enforce that an Encoder is available for the type A using a 
>> context
>> bound <http://docs.scala-lang.org/tutorials/FAQ/context-bounds>.
>>
>> import org.apache.spark.sql.Encoder
>> abstract class RawTable[A : Encoder](inDir: String) {
>>   ...
>> }
>>
>> On Tue, Jan 31, 2017 at 8:12 PM, Don Drake <dondr...@gmail.com> wrote:
>>
>>> I have a set of CSV that I need to perform ETL on, with the plan to
>>> re-use a lot of code between each file in a parent abstract class.
>>>
>>> I tried creating the following simple abstract class that will have a
>>> parameterized type of a case class that represents the schema being read in.
>>>
>>> This won't compile, it just complains about not being able to find an
>>> encoder, but I'm importing the implicits and don't believe this error.
>>>
>>>
>>> scala> import spark.implicits._
>>> import spark.implicits._
>>>
>>> scala>
>>>
>>> scala> case class RawTemp(f1: String, f2: String, temp: Long,
>>> created_at: java.sql.Timestamp, data_filename: String)
>>> defined class RawTemp
>>>
>>> scala>
>>>
>>> scala> abstract class RawTable[A](inDir: String) {
>>>  | def load() = {
>>>  | spark.read
>>>  | .option("header", "true")
>>>  | .option("mode", "FAILFAST")
>>>  | .option("escape", "\"")
>>>  | .option("nullValue", "")
>>>  | .option("indferSchema", "true")
>>>  | .csv(inDir)
>>>

Re: Parameterized types and Datasets - Spark 2.1.0

2017-02-01 Thread Michael Armbrust
You need to enforce that an Encoder is available for the type A using a context
bound .

import org.apache.spark.sql.Encoder
abstract class RawTable[A : Encoder](inDir: String) {
  ...
}

On Tue, Jan 31, 2017 at 8:12 PM, Don Drake  wrote:

> I have a set of CSV that I need to perform ETL on, with the plan to re-use
> a lot of code between each file in a parent abstract class.
>
> I tried creating the following simple abstract class that will have a
> parameterized type of a case class that represents the schema being read in.
>
> This won't compile, it just complains about not being able to find an
> encoder, but I'm importing the implicits and don't believe this error.
>
>
> scala> import spark.implicits._
> import spark.implicits._
>
> scala>
>
> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
> java.sql.Timestamp, data_filename: String)
> defined class RawTemp
>
> scala>
>
> scala> abstract class RawTable[A](inDir: String) {
>  | def load() = {
>  | spark.read
>  | .option("header", "true")
>  | .option("mode", "FAILFAST")
>  | .option("escape", "\"")
>  | .option("nullValue", "")
>  | .option("indferSchema", "true")
>  | .csv(inDir)
>  | .as[A]
>  | }
>  | }
> :27: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing spark.implicits._  Support for serializing other
> types will be added in future releases.
>.as[A]
>
> scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
> :13: error: not found: type RawTable
>class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>   ^
>
> What's odd is that this output looks okay:
>
> scala> val RTEncoder = Encoders.product[RawTemp]
> RTEncoder: org.apache.spark.sql.Encoder[RawTemp] = class[f1[0]: string,
> f2[0]: string, temp[0]: bigint, created_at[0]: timestamp, data_filename[0]:
> string]
>
> scala> RTEncoder.schema
> res4: org.apache.spark.sql.types.StructType = 
> StructType(StructField(f1,StringType,true),
> StructField(f2,StringType,true), StructField(temp,LongType,false),
> StructField(created_at,TimestampType,true), StructField(data_filename,
> StringType,true))
>
> scala> RTEncoder.clsTag
> res5: scala.reflect.ClassTag[RawTemp] = RawTemp
>
> Any ideas?
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake 
> 800-733-2143 <(800)%20733-2143>
>


Re: using withWatermark on Dataset

2017-02-01 Thread Michael Armbrust
Can you give the full stack trace?  Also which version of Spark are you
running?

On Wed, Feb 1, 2017 at 10:38 AM, Jerry Lam  wrote:

> Hi everyone,
>
> Anyone knows how to use withWatermark  on Dataset?
>
> I have tried the following but hit this exception:
>
> dataset org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
> cannot be cast to "MyType"
>
> The code looks like the following:
>
> dataset
> .withWatermark("timestamp", "5 seconds")
> .groupBy("timestamp", "customer_id")
> .agg(MyAggregator)
> .writeStream
>
> Where dataset has MyType for each row.
> Where MyType is:
> case class MyTpe(customer_id: Long, timestamp: Timestamp, product_id: Long)
>
> MyAggregator which takes MyType as the input type did some maths on the
> product_id and outputs a set of product_ids.
>
> Best Regards,
>
> Jerry
>
>
>
>
>
>
>


Re: kafka structured streaming source refuses to read

2017-01-30 Thread Michael Armbrust
Thanks for for following up!  I've linked the relevant tickets to
SPARK-18057  and I
targeted it for Spark 2.2.

On Sat, Jan 28, 2017 at 10:15 AM, Koert Kuipers  wrote:

> there was also already an existing spark ticket for this:
> SPARK-18779 
>
> On Sat, Jan 28, 2017 at 1:13 PM, Koert Kuipers  wrote:
>
>> it seems the bug is:
>> https://issues.apache.org/jira/browse/KAFKA-4547
>>
>> i would advise everyone not to use kafka-clients 0.10.0.2, 0.10.1.0 or
>> 0.10.1.1
>>
>> On Fri, Jan 27, 2017 at 3:56 PM, Koert Kuipers  wrote:
>>
>>> in case anyone else runs into this:
>>>
>>> the issue is that i was using kafka-clients 0.10.1.1
>>>
>>> it works when i use kafka-clients 0.10.0.1 with spark structured
>>> streaming
>>>
>>> my kafka server is 0.10.1.1
>>>
>>> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers 
>>> wrote:
>>>
 i checked my topic. it has 5 partitions but all the data is written to
 a single partition: wikipedia-2
 i turned on debug logging and i see this:

 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
 consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
 wikipedia-1]. Seeking to the end.
 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
 partition wikipedia-0
 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
 partition wikipedia-4
 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
 partition wikipedia-3
 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
 partition wikipedia-2
 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
 partition wikipedia-1
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-0 to latest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-0
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-0 to earliest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-0
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-4 to latest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-4
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-4 to earliest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-4
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-3 to latest offset.
 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
 successful heartbeat response for group spark-kafka-source-fac4f749-fd
 56-4a32-82c7-e687aadf520b-1923704552-driver-0
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-3
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-3 to earliest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-3
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-2 to latest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=152908} for partition wikipedia-2
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-2 to earliest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-2
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-1 to latest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-1
 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
 partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
 wikipedia-3 -> 0, wikipedia-0 -> 0)

 what is confusing to me is this:
 Resetting offset for partition wikipedia-2 to latest offset.
 Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
 Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 ->
 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)

 why does it find latest offset 152908 for wikipedia-2 but then sets
 latest offset to 0 for that partition? or am i misunderstanding?

 On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers 
 wrote:

> code:
>   val query = spark.readStream
> .format("kafka")
> 

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Michael Armbrust
Yeah, kafka server client compatibility can be pretty confusing and does
not give good errors in the case of mismatches.  This should be addressed
in the next release of kafka (they are adding an API to query the servers
capabilities).

On Fri, Jan 27, 2017 at 12:56 PM, Koert Kuipers  wrote:

> in case anyone else runs into this:
>
> the issue is that i was using kafka-clients 0.10.1.1
>
> it works when i use kafka-clients 0.10.0.1 with spark structured streaming
>
> my kafka server is 0.10.1.1
>
> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers  wrote:
>
>> i checked my topic. it has 5 partitions but all the data is written to a
>> single partition: wikipedia-2
>> i turned on debug logging and i see this:
>>
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
>> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
>> wikipedia-1]. Seeking to the end.
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
>> successful heartbeat response for group spark-kafka-source-fac4f749-fd
>> 56-4a32-82c7-e687aadf520b-1923704552-driver-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=152908} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-1 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
>> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
>> wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> what is confusing to me is this:
>> Resetting offset for partition wikipedia-2 to latest offset.
>> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
>> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 ->
>> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> why does it find latest offset 152908 for wikipedia-2 but then sets
>> latest offset to 0 for that partition? or am i misunderstanding?
>>
>> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers  wrote:
>>
>>> code:
>>>   val query = spark.readStream
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", "somenode:9092")
>>> .option("subscribe", "wikipedia")
>>> .load
>>> .select(col("value") cast StringType)
>>> .writeStream
>>> .format("console")
>>> .outputMode(OutputMode.Append)
>>> .start()
>>>
>>>   while (true) {
>>> Thread.sleep(1)
>>> println(query.lastProgress)
>>>   }
>>> }
>>>
>>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <
>>> alons...@gmail.com> wrote:
>>>
 lets see the code...

 Alonso Isidoro Roman
 

Re: printSchema showing incorrect datatype?

2017-01-25 Thread Michael Armbrust
Encoders are just an object based view on a Dataset.  Until you actually
materialize and object, they are not used and thus will not change the
schema of the dataframe.

On Tue, Jan 24, 2017 at 8:28 AM, Koert Kuipers  wrote:

> scala> val x = Seq("a", "b").toDF("x")
> x: org.apache.spark.sql.DataFrame = [x: string]
>
> scala> x.as[Array[Byte]].printSchema
> root
>  |-- x: string (nullable = true)
>
> scala> x.as[Array[Byte]].map(x => x).printSchema
> root
>  |-- value: binary (nullable = true)
>
> why does the first schema show string instead of binary?
>


Re: Setting startingOffsets to earliest in structured streaming never catches up

2017-01-23 Thread Michael Armbrust
+1 to Ryan's suggestion of setting maxOffsetsPerTrigger.  This way you can
at least see how quickly it is making progress towards catching up.

On Sun, Jan 22, 2017 at 7:02 PM, Timothy Chan  wrote:

> I'm using version 2.02.
>
> The difference I see between using latest and earliest is a series of jobs
> that take less than a second vs. one job that goes on for over 24 hours.
>
> On Sun, Jan 22, 2017 at 6:54 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Which Spark version are you using? If you are using 2.1.0, could you use
>> the monitoring APIs (http://spark.apache.org/docs/
>> latest/structured-streaming-programming-guide.html#
>> monitoring-streaming-queries) to check the input rate and the processing
>> rate? One possible issue is that the Kafka source launched a pretty large
>> batch and it took too long to finish it. If so, you can use
>> "maxOffsetsPerTrigger" option to limit the data size in a batch in order to
>> observe the progress.
>>
>> On Sun, Jan 22, 2017 at 10:22 AM, Timothy Chan 
>> wrote:
>>
>> I'm running my structured streaming jobs in EMR. We were thinking a worst
>> case scenario recovery situation would be to spin up another cluster and
>> set startingOffsets to earliest (our Kafka cluster has a retention policy
>> of 7 days).
>>
>> My observation is that the job never catches up to latest. This is not
>> acceptable. I've set the number of partitions for the topic to 6. I've
>> tried using a cluster of 4 in EMR.
>>
>> The producer rate for this topic is 4 events/second. Does anyone have any
>> suggestions on what I can do to have my consumer catch up to latest faster?
>>
>>
>>


Re: Dataset Type safety

2017-01-10 Thread Michael Armbrust
>
> As I've specified *.as[Person]* which does schema inferance then
> *"option("inferSchema","true")" *is redundant and not needed!


The resolution of fields is done by name, not by position for case
classes.  This is what allows us to support more complex things like JSON
or nested structures.  If you you just want to map it by position you can
do .as[(String, Long)] to map it to a tuple instead.

And lastly does .as[Person] check that column value matches with data type
> i.e. "age Long" would fail if it gets a non numeric value! because the
> input file could be millions of row which could be very time consuming.


No, this is a static check based on the schema.  It does not scan the data
(though schema inference does).

On Tue, Jan 10, 2017 at 11:34 AM, A Shaikh  wrote:

> I have a simple people.csv and following SimpleApp
>
>
> people.csv
> --
> name,age
> abc,22
> xyz,32
>
> 
> Working Code
> 
> Object SimpleApp {}
>   case class Person(name: String, age: Long)
>   def main(args: Array[String]): Unit = {
> val spark = SparkFactory.getSparkSession("PIPE2Dataset")
> import spark.implicits._
>
> val peopleDS = spark.read.option("inferSchema","true").option("header",
> "true").option("delimiter", ",").csv("/people.csv").as[Person]
> }
> 
>
>
> 
> Fails for data with no header
> 
> Removing header record "name,age" AND switching header option off
> =>.option("header", "false") return error => *cannot resolve '`name`'
> given input columns: [_c0, _c1]*
> val peopleDS = spark.read.option("inferSchema","true").option("header",
> "false").option("delimiter", ",").csv("/people.csv").as[Person]
>
> Should'nt this just assing the header from Person class
>
>
> 
> invalid data
> 
> As I've specified *.as[Person]* which does schema inferance then 
> *"option("inferSchema","true")"
> *is redundant and not needed!
>
>
> And lastly does .as[Person] check that column value matches with data type
> i.e. "age Long" would fail if it gets a non numeric value! because the
> input file could be millions of row which could be very time consuming.
>


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2016-12-29 Thread Michael Armbrust
We don't support this yet, but I've opened this JIRA as it sounds generally
useful: https://issues.apache.org/jira/browse/SPARK-19031

In the mean time you could try implementing your own Source, but that is
pretty low level and is not yet a stable API.

On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" 
wrote:

> Hi all,
>
> Thanks a lot for your contributions to bring us new technologies.
>
> I don't want to waste your time, so before I write to you, I googled,
> checked stackoverflow and mailing list archive with keywords "streaming"
> and "jdbc". But I was not able to get any solution to my use case. I hope I
> can get some clarification from you.
>
> The use case is quite straightforward, I need to harvest a relational
> database via jdbc, do something with data, and store result into Kafka. I
> am stuck at the first step, and the difficulty is as follows:
>
> 1. The database is too large to ingest with one thread.
> 2. The database is dynamic and time series data comes in constantly.
>
> Then an ideal workflow is that multiple workers process partitions of data
> incrementally according to a time window. For example, the processing
> starts from the earliest data with each batch containing data for one hour.
> If data ingestion speed is faster than data production speed, then
> eventually the entire database will be harvested and those workers will
> start to "tail" the database for new data streams and the processing
> becomes real time.
>
> With Spark SQL I can ingest data from a JDBC source with partitions
> divided by time windows, but how can I dynamically increment the time
> windows during execution? Assume that there are two workers ingesting data
> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next task
> for 2017-01-03. But I am not able to find out how to increment those values
> during execution.
>
> Then I looked into Structured Streaming. It looks much more promising
> because window operations based on event time are considered during
> streaming, which could be the solution to my use case. However, from
> documentation and code example I did not find anything related to streaming
> data from a growing database. Is there anything I can read to achieve my
> goal?
>
> Any suggestion is highly appreciated. Thank you very much and have a nice
> day.
>
> Best regards,
> Yang
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to get recent value in spark dataframe

2016-12-16 Thread Michael Armbrust
Oh and to get the null for missing years, you'd need to do an outer join
with a table containing all of the years you are interested in.

On Fri, Dec 16, 2016 at 3:24 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Are you looking for argmax? Here is an example
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html>
> .
>
> On Wed, Dec 14, 2016 at 8:49 PM, Milin korath <milin.kor...@impelsys.com>
> wrote:
>
>> Hi
>>
>> I have a spark data frame with following structure
>>
>>  id  flag price date
>>   a   0100  2015
>>   a   050   2015
>>   a   1200  2014
>>   a   1300  2013
>>   a   0400  2012
>>
>> I need to create a data frame with recent value of flag 1 and updated in
>> the flag 0 rows.
>>
>>   id  flag price date new_column
>>   a   0100  2015200
>>   a   050   2015200
>>   a   1200  2014null
>>   a   1300  2013null
>>   a   0400  2012null
>>
>> We have 2 rows having flag=0. Consider the first row(flag=0),I will have
>> 2 values(200 and 300) and I am taking the recent one 200(2014). And the
>> last row I don't have any recent value for flag 1 so it is updated with
>> null.
>>
>> Looking for a solution using scala. Any help would be appreciated.Thanks
>>
>> Thanks
>> Milin
>>
>
>


Re: How to get recent value in spark dataframe

2016-12-16 Thread Michael Armbrust
Are you looking for argmax? Here is an example

.

On Wed, Dec 14, 2016 at 8:49 PM, Milin korath 
wrote:

> Hi
>
> I have a spark data frame with following structure
>
>  id  flag price date
>   a   0100  2015
>   a   050   2015
>   a   1200  2014
>   a   1300  2013
>   a   0400  2012
>
> I need to create a data frame with recent value of flag 1 and updated in
> the flag 0 rows.
>
>   id  flag price date new_column
>   a   0100  2015200
>   a   050   2015200
>   a   1200  2014null
>   a   1300  2013null
>   a   0400  2012null
>
> We have 2 rows having flag=0. Consider the first row(flag=0),I will have 2
> values(200 and 300) and I am taking the recent one 200(2014). And the last
> row I don't have any recent value for flag 1 so it is updated with null.
>
> Looking for a solution using scala. Any help would be appreciated.Thanks
>
> Thanks
> Milin
>


Re: Dataset encoders for further types?

2016-12-15 Thread Michael Armbrust
I would have sworn there was a ticket, but I can't find it.  So here you
go: https://issues.apache.org/jira/browse/SPARK-18891

A work around until that is fixed would be for you to manually specify the kryo
encoder

.

On Thu, Dec 15, 2016 at 8:18 AM, Jakub Dubovsky <
spark.dubovsky.ja...@gmail.com> wrote:

> Hey,
>
> I want to ask whether there is any roadmap/plan for adding Encoders for
> further types in next releases of Spark. Here is a list
>  of
> currently supported types. We would like to use Datasets with our
> internally defined case classes containing scala.collection.immutable.List(s).
> This does not work now because these lists are converted to ArrayType
> (Seq). This then fails a constructor lookup because of seq-is-not-a-list
> error...
>
> This means that for now we are stuck with using RDDs.
>
> Thanks for any insights!
>
> Jakub Dubovsky
>
>


Re: When will multiple aggregations be supported in Structured Streaming?

2016-12-15 Thread Michael Armbrust
What is your use case?

On Thu, Dec 15, 2016 at 10:43 AM, ljwagerfield 
wrote:

> The current version of Spark (2.0.2) only supports one aggregation per
> structured stream (and will throw an exception if multiple aggregations are
> applied).
>
> Roughly when will Spark support multiple aggregations?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/When-will-multiple-aggregations-be-
> supported-in-Structured-Streaming-tp28219.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [DataFrames] map function - 2.0

2016-12-15 Thread Michael Armbrust
Experimental in Spark really just means that we are not promising binary
compatibly for those functions in the 2.x release line.  For Datasets in
particular, we want a few releases to make sure the APIs don't have any
major gaps before removing the experimental tag.

On Thu, Dec 15, 2016 at 1:17 PM, Ninad Shringarpure 
wrote:

> Hi Team,
>
> When going through Dataset class for Spark 2.0 it comes across that both
> overloaded map functions with encoder and without are marked as
> experimental.
>
> Is there a reason and issues that developers whould be aware of when using
> this for production applications. Also is there a "non-experimental" way of
> using map function on Dataframe in Spark 2.0
>
> Thanks,
> Ninad
>


Re: Cached Tables SQL Performance Worse than Uncached

2016-12-15 Thread Michael Armbrust
Its hard to comment on performance without seeing query plans.  I'd suggest
posting the result of an explain.

On Thu, Dec 15, 2016 at 2:14 PM, Warren Kim 
wrote:

> Playing with TPC-H and comparing performance between cached (serialized
> in-memory tables) and uncached (DF from parquet) results in various
> SQL queries performing much worse, duration-wise.
>
>
> I see some physical plans have an extra layer of shuffle/sort/merge under
> cached scenario.
>
>
> I could do some filtering by key to optimize, but I'm just curious as to
> why out-of-the-box planning is more complex and slower when tables are
> cached to mem.
>
>
> Thanks!
>


Re: [Spark-SQL] collect_list() support for nested collection

2016-12-13 Thread Michael Armbrust
Yes

https://databricks-prod-cloudfront.cloud.databricks.com/public/
4027ec902e239c93eaaa8714f173bcfc/1023043053387187/4464261896877850/
2840265927289860/latest.html

On Tue, Dec 13, 2016 at 10:43 AM, Ninad Shringarpure 
wrote:

>
> Hi Team,
>
> Does Spark 2.0 support non-primitive types in collect_list for inserting
> nested collections?
> Would appreciate any references or samples.
>
> Thanks,
> Ninad
>
>
>


Re: When will Structured Streaming support stream-to-stream joins?

2016-12-08 Thread Michael Armbrust
I would guess Spark 2.3, but maybe sooner maybe later depending on demand.
I created https://issues.apache.org/jira/browse/SPARK-18791 so people can
describe their requirements / stay informed.

On Thu, Dec 8, 2016 at 11:16 AM, ljwagerfield 
wrote:

> Hi there,
>
> Structured Streaming currently only supports stream-to-batch joins.
>
> Is there an ETA for stream-to-stream joins?
>
> Kindest regards (and keep up the awesome work!),
> Lawrence
>
> (p.s. I've traversed the JIRA roadmaps but couldn't see anything)
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/When-will-Structured-Streaming-
> support-stream-to-stream-joins-tp28185.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: few basic questions on structured streaming

2016-12-08 Thread Michael Armbrust
>
> 1. what happens if an event arrives few days late? Looks like we have an
> unbound table with sorted time intervals as keys but I assume spark doesn't
> keep several days worth of data in memory but rather it would checkpoint
> parts of the unbound table to a storage at a specified interval such that
> if an event comes few days late it would update the part of the table that
> is in memory plus the parts of the table that are in storage which contains
> the interval (Again this is just my assumption, I don't know what it really
> does). is this correct so far?
>

The state we need to keep will be unbounded, unless you specify a
watermark.  This watermark tells us how long to wait for late data to
arrive and thus allows us to bound the amount of state that we keep in
memory.  Since we purge state for aggregations that are below the
watermark, we must also drop data that arrives even later than your
specified watermark (if any).  Note that the watermark is calculated based
on observed data, not on the actual time of processing.  So we should be
robust to cases where the stream is down for extended periods of time.


> 2.  Say I am running a Spark Structured streaming Job for 90 days with a
> window interval of 10 mins and a slide interval of 5 mins. Does the output
> of this Job always return the entire history in a table? other words the
> does the output on 90th day contains a table of 10 minute time intervals
> from day 1 to day 90? If so, wouldn't that be too big to return as an
> output?
>

This depends on the output mode.  In complete mode, we output the entire
result every time (thus, complete mode probably doesn't make sense for this
use case).  In update mode
, we will output
continually updated estimates of the final answer as the stream progresses
(useful if you are for example updating a database).  In append mode
(supported in 2.1) we only output finalized aggregations that have fallen
beneath the watermark.

Relatedly, SPARK-16738
 talks
about making the distributed state store queryable.  With this feature, you
could run your query in complete mode (given enough machines).  Even though
the results are large, you can still interact with the complete results of
the aggregation as a distributed DataFrame.


> 3. For Structured Streaming is it required to have a distributed storage
> such as HDFS? my guess would be yes (based on what I said in #1) but I
> would like to confirm.
>

Currently this is the only place that we can write the offset log (records
what data is in each batch) and the state checkpoints.  I think its likely
that we'll add support for other storage systems here in the future.


> 4. I briefly heard about watermarking. Are there any pointers where I can
> know them more in detail? Specifically how watermarks could help in
> structured streaming and so on.
>

Here's the best docs available: https://github.com/apache/spark/pull/15702

We are working on something for the programming guide / a blog post in the
next few weeks.


Re: get corrupted rows using columnNameOfCorruptRecord

2016-12-06 Thread Michael Armbrust
.where("xxx IS NOT NULL") will give you the rows that couldn't be parsed.

On Tue, Dec 6, 2016 at 6:31 AM, Yehuda Finkelstein <
yeh...@veracity-group.com> wrote:

> Hi all
>
>
>
> I’m trying to parse json using existing schema and got rows with NULL’s
>
> //get schema
>
> val df_schema = spark.sqlContext.sql("select c1,c2,…cn t1  limit 1")
>
> //read json file
>
> val f = sc.textFile("/tmp/x")
>
> //load json into data frame using schema
>
> var df = spark.sqlContext.read.option("columnNameOfCorruptRecord","
> xxx").option("mode","PERMISSIVE").schema(df_schema.schema).json(f)
>
>
>
> in documentation it say that you can query the corrupted rows by this
> columns à columnNameOfCorruptRecord
>
> o“columnNameOfCorruptRecord (default is the value specified in
> spark.sql.columnNameOfCorruptRecord): allows renaming the new field
> having malformed string created by PERMISSIVE mode. This overrides
> spark.sql.columnNameOfCorruptRecord.”
>
>
>
> The question is how to fetch those corrupted rows ?
>
>
>
>
>
> Thanks
>
> Yehuda
>
>
>
>
>


Re: Writing DataFrame filter results to separate files

2016-12-05 Thread Michael Armbrust
>
> 1. In my case, I'd need to first explode my data by ~12x to assign each
> record to multiple 12-month rolling output windows. I'm not sure Spark SQL
> would be able to optimize this away, combining it with the output writing
> to do it incrementally.
>

You are right, but I wouldn't worry about the RAM use.  If implemented
properly (or if you just use the builtin window

function), it should all be pipelined.


> 2. Wouldn't each partition -- window in my case -- be shuffled to a single
> machine and then written together as one output shard? For a large amount
> of data per window, that seems less than ideal.
>

Oh sorry, I thought you wanted one file per value.  If you drop the
repartition then it won't shuffle, but will just write in parallel on each
machine.


Re: Writing DataFrame filter results to separate files

2016-12-05 Thread Michael Armbrust
If you repartition($"column") and then do .write.partitionBy("column") you
should end up with a single file for each value of the partition column.

On Mon, Dec 5, 2016 at 10:59 AM, Everett Anderson 
wrote:

> Hi,
>
> I have a DataFrame of records with dates, and I'd like to write all
> 12-month (with overlap) windows to separate outputs.
>
> Currently, I have a loop equivalent to:
>
> for ((windowStart, windowEnd) <- windows) {
> val windowData = allData.filter(
> getFilterCriteria(windowStart, windowEnd))
> windowData.write.format(...).save(...)
> }
>
> This works fine, but has the drawback that since Spark doesn't parallelize
> the writes, there is a fairly cost based on the number of windows.
>
> Is there a way around this?
>
> In MapReduce, I'd probably multiply the data in a Mapper with a window ID
> and then maybe use something like MultipleOutputs
> .
> But I'm a bit worried of trying to do this in Spark because of the data
> explosion and RAM use. What's the best approach?
>
> Thanks!
>
> - Everett
>
>


Re: [structured streaming] How to remove outdated data when use Window Operations

2016-12-01 Thread Michael Armbrust
Yes

!

On Thu, Dec 1, 2016 at 12:57 PM, ayan guha  wrote:

> Thanks TD. Will it be available in pyspark too?
> On 1 Dec 2016 19:55, "Tathagata Das"  wrote:
>
>> In the meantime, if you are interested, you can read the design doc in
>> the corresponding JIRA - https://issues.apache.org/ji
>> ra/browse/SPARK-18124
>>
>> On Thu, Dec 1, 2016 at 12:53 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> That feature is coming in 2.1.0. We have added watermarking, that will
>>> track the event time of the data and accordingly close old windows, output
>>> its corresponding aggregate and then drop its corresponding state. But in
>>> that case, you will have to use append mode, and aggregated data of a
>>> particular window will be evicted only when the windows is closed. You will
>>> be able to control the threshold on how long to wait for late, out-of-order
>>> data before closing a window.
>>>
>>> We will be updated the docs soon to explain this.
>>>
>>> On Tue, Nov 29, 2016 at 8:30 PM, Xinyu Zhang  wrote:
>>>
 Hi

 I want to use window operations. However, if i don't remove any data,
 the "complete" table will become larger and larger as time goes on. So I
 want to remove some outdated data in the complete table that I would never
 use.
 Is there any method to meet my requirement?

 Thanks!





>>>
>>>
>>


Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-28 Thread Michael Armbrust
You could open up a JIRA to add a version of from_json that supports schema
inference, but unfortunately that would not be super easy to implement.  In
particular, it would introduce a weird case where only this specific
function would block for a long time while we infer the schema (instead of
waiting for an action).  This blocking would be kind of odd for a call like
df.select(...).  If there is enough interest, though, we should still do it.

To give a little more detail, your version of the code is actually doing
two passes over the data: one to infer the schema and a second for whatever
processing you are asking it to do.  We have to know the schema at each
step of DataFrame construction, so we'd have to do this even before you
called an action.

Personally, I usually take a small sample of data and use schema inference
on that.  I then hardcode that schema into my program.  This makes your
spark jobs much faster and removes the possibility of the schema changing
underneath the covers.

Here's some code I use to build the static schema code automatically
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1128172975083446/2840265927289860/latest.html>
.

Would that work for you? If not, why not?

On Wed, Nov 23, 2016 at 2:48 AM, kant kodali <kanth...@gmail.com> wrote:

> Hi Michael,
>
> Looks like all from_json functions will require me to pass schema and that
> can be little tricky for us but the code below doesn't require me to pass
> schema at all.
>
> import org.apache.spark.sql._
> val rdd = df2.rdd.map { case Row(j: String) => j }
> spark.read.json(rdd).show()
>
>
> On Tue, Nov 22, 2016 at 2:42 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> The first release candidate should be coming out this week. You can
>> subscribe to the dev list if you want to follow the release schedule.
>>
>> On Mon, Nov 21, 2016 at 9:34 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi Michael,
>>>
>>> I only see spark 2.0.2 which is what I am using currently. Any idea on
>>> when 2.1 will be released?
>>>
>>> Thanks,
>>> kant
>>>
>>> On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> In Spark 2.1 we've added a from_json
>>>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L2902>
>>>> function that I think will do what you want.
>>>>
>>>> On Fri, Nov 18, 2016 at 2:29 AM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> This seem to work
>>>>>
>>>>> import org.apache.spark.sql._
>>>>> val rdd = df2.rdd.map { case Row(j: String) => j }
>>>>> spark.read.json(rdd).show()
>>>>>
>>>>> However I wonder if this any inefficiency here ? since I have to apply
>>>>> this function for billion rows.
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Any equivalent method lateral and explore

2016-11-22 Thread Michael Armbrust
Both collect_list and explode are available in the function library

.

The following is an example of using it:
df.select($"*", explode($"myArray") as 'arrayItem)

On Tue, Nov 22, 2016 at 2:42 PM, Mahender Sarangam <
mahender.bigd...@outlook.com> wrote:

> Hi,
>
> We are converting our hive logic which is using lateral view and explode
> functions. Is there any builtin function in scala for performing lateral
> view explore.
>
>
> Below is our  query in Hive. temparray is temp table with c0 and c1 columns
>
> SELECT id, CONCAT_WS(',', collect_list(LineID)) as LineiD
> FROM (SELECT cast(LineID as STRING) as LineiD, cast(id as STRING) as id
> FROM temparray LATERAL VIEW explode(`_c1`) adTable AS id) T
> GROUP BY id;
>
>
> Can any one provide pointer for string functions available in scala. We
> would like perform operations like Collect_List, get starting index of
> matching string.
>
>
>
> Nanu
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-22 Thread Michael Armbrust
The first release candidate should be coming out this week. You can
subscribe to the dev list if you want to follow the release schedule.

On Mon, Nov 21, 2016 at 9:34 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi Michael,
>
> I only see spark 2.0.2 which is what I am using currently. Any idea on
> when 2.1 will be released?
>
> Thanks,
> kant
>
> On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> In Spark 2.1 we've added a from_json
>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L2902>
>> function that I think will do what you want.
>>
>> On Fri, Nov 18, 2016 at 2:29 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> This seem to work
>>>
>>> import org.apache.spark.sql._
>>> val rdd = df2.rdd.map { case Row(j: String) => j }
>>> spark.read.json(rdd).show()
>>>
>>> However I wonder if this any inefficiency here ? since I have to apply
>>> this function for billion rows.
>>>
>>>
>>
>


Re: How do I persist the data after I process the data with Structured streaming...

2016-11-22 Thread Michael Armbrust
We are looking to add a native JDBC sink in Spark 2.2.  Until then you can
write your own connector using df.writeStream.foreach.

On Tue, Nov 22, 2016 at 12:55 PM, shyla deshpande 
wrote:

> Hi,
>
> Structured streaming works great with Kafka source but I need to persist
> the data after processing in some database like Cassandra or at least
> Postgres.
>
> Any suggestions, help please.
>
> Thanks
>


Re: How do I persist the data after I process the data with Structured streaming...

2016-11-22 Thread Michael Armbrust
Forgot the link:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

On Tue, Nov 22, 2016 at 2:40 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> We are looking to add a native JDBC sink in Spark 2.2.  Until then you can
> write your own connector using df.writeStream.foreach.
>
> On Tue, Nov 22, 2016 at 12:55 PM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> Hi,
>>
>> Structured streaming works great with Kafka source but I need to persist
>> the data after processing in some database like Cassandra or at least
>> Postgres.
>>
>> Any suggestions, help please.
>>
>> Thanks
>>
>
>


Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-21 Thread Michael Armbrust
In Spark 2.1 we've added a from_json

function that I think will do what you want.

On Fri, Nov 18, 2016 at 2:29 AM, kant kodali  wrote:

> This seem to work
>
> import org.apache.spark.sql._
> val rdd = df2.rdd.map { case Row(j: String) => j }
> spark.read.json(rdd).show()
>
> However I wonder if this any inefficiency here ? since I have to apply
> this function for billion rows.
>
>


Re: Stateful aggregations with Structured Streaming

2016-11-21 Thread Michael Armbrust
We are planning on adding mapWithState or something similar in a future
release.  In the mean time, standard Dataframe aggregations should work
(count, sum, etc).  If you are looking to do something custom, I'd suggest
looking at Aggregators

.

On Sat, Nov 19, 2016 at 5:46 AM, Yuval.Itzchakov  wrote:

> I've been using `DStream.mapWithState` and was looking forward to trying
> out
> Structured Streaming. The thing I can't under is, does Structured Streaming
> in it's current state support stateful aggregations?
>
> Looking at the StateStore design document
> (https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wf
> Vp_hDM8ZL254/edit#heading=h.2h7zw4ru3nw7),
> and then doing a bit of digging around in the Spark codebase, I've seen
> `mapPartitionsWithStateStore` as the only viable way of doing something
> with
> a store, but the API requires an `UnsafeRow` for key and value which makes
> we question if this is a real public API one should be using?
>
> Does anyone know what the state of things are currently in regards to an
> equivalent to `mapWithState` in Structured Streaming?
>
> Thanks,
> Yuval.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Stateful-aggregations-with-
> Structured-Streaming-tp28108.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

2016-11-21 Thread Michael Armbrust
You could also do this with Datasets, which will probably be a little more
efficient (since you are telling us you only care about one column)

ds1.select($"value".as[Array[Byte]]).map(Student.parseFrom)

On Thu, Nov 17, 2016 at 1:05 PM, shyla deshpande 
wrote:

> Hello everyone,
>  The following code works ...
>
> def main(args : Array[String]) {
>
>   val spark = SparkSession.builder.
> master("local")
> .appName("spark session example")
> .getOrCreate()
>
>   import spark.implicits._
>
>   val ds1 = spark.readStream.format("kafka").
> option("kafka.bootstrap.servers","localhost:9092").
> option("subscribe","student").load()
>
>   val ds2 = ds1.map(row=> 
> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
>
>   val query = ds2.writeStream
> .outputMode("append")
> .format("console")
> .start()
>
>   query.awaitTermination()
>
> }
>
>
> On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> val spark = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>
>> import spark.implicits._
>>
>> val dframe1 = spark.readStream.format("kafka").
>>   option("kafka.bootstrap.servers","localhost:9092").
>>   option("subscribe","student").load()
>>
>> *How do I deserialize the value column from dataframe1 *
>>
>> *which is Array[Byte] to Student object using Student.parseFrom..???*
>>
>> *Please help.*
>>
>> *Thanks.*
>>
>>
>>
>> // Stream of votes from Kafka as bytesval votesAsBytes = 
>> KafkaUtils.createDirectStream[String, Array[Byte]](
>>   ssc, LocationStrategies.PreferConsistent,
>>   ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), 
>> kafkaParams))
>> // Parse them into Vote case class.val votes: DStream[Vote] = 
>> votesAsBytes.map {
>>   (cr: ConsumerRecord[String, Array[Byte]]) =>
>> Vote.parseFrom(cr.value())}
>>
>>
>


Re: Create a Column expression from a String

2016-11-21 Thread Michael Armbrust
You are looking for org.apache.spark.sql.functions.expr()

On Sat, Nov 19, 2016 at 6:12 PM, Stuart White 
wrote:

> I'd like to allow for runtime-configured Column expressions in my
> Spark SQL application.  For example, if my application needs a 5-digit
> zip code, but the file I'm processing contains a 9-digit zip code, I'd
> like to be able to configure my application with the expression
> "substring('zipCode, 0, 5)" to use for the zip code.
>
> So, I think I'm looking for something like this:
>
> def parseColumnExpression(colExpr: String) : Column
>
> I see that SparkSession's sql() method exists to take a string and
> parse it into a DataFrame.  But that's not quite what I want.
>
> Does a mechanism exist that would allow me to take a string
> representation of a column expression and parse it into an actual
> column expression (something that could be use in a .select() call,
> for example)?
>
> Thanks!
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-16 Thread Michael Armbrust
On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon  wrote:

> Maybe it sounds like you are looking for from_json/to_json functions after
> en/decoding properly.
>

Which are new built-in functions that will be released with Spark 2.1.


Re: type-safe join in the new DataSet API?

2016-11-10 Thread Michael Armbrust
You can groupByKey and then cogroup.

On Thu, Nov 10, 2016 at 10:44 AM, Yang  wrote:

> the new DataSet API is supposed to provide type safety and type checks at
> compile time https://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#join-operations
>
> It does this indeed for a lot of places, but I found it still doesn't have
> a type safe join:
>
> val ds1 = hc.sql("select col1, col2 from mytable")
>
> val ds2 = hc.sql("select col3 , col4 from mytable2")
>
> val ds3 = ds1.joinWith(ds2, ds1.col("col1") === ds2.col("col2"))
>
> here spark has no way to make sure (at compile time) that the two columns
> being joined together
> , "col1" and "col2" are of matching types. This is contrast to rdd join,
> where it would be detected at compile time.
>
> am I missing something?
>
> thanks
>
>


Re: Aggregations on every column on dataframe causing StackOverflowError

2016-11-09 Thread Michael Armbrust
It would be great if you could try with the 2.0.2 RC.  Thanks for creating
an issue.

On Wed, Nov 9, 2016 at 1:22 PM, Raviteja Lokineni <
raviteja.lokin...@gmail.com> wrote:

> Well I've tried with 1.5.2, 1.6.2 and 2.0.1
>
> FYI, I have created https://issues.apache.org/jira/browse/SPARK-18388
>
> On Wed, Nov 9, 2016 at 3:08 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Which version of Spark?  Does seem like a bug.
>>
>> On Wed, Nov 9, 2016 at 10:06 AM, Raviteja Lokineni <
>> raviteja.lokin...@gmail.com> wrote:
>>
>>> Does this stacktrace look like a bug guys? Definitely seems like one to
>>> me.
>>>
>>> Caused by: java.lang.StackOverflowError
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>
>>>
>>> On Wed, Nov 9, 2016 at 10:48 AM, Raviteja Lokineni <
>>> raviteja.lokin...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am not sure if this is a bug or not. Basically I am generating weekly
>>>> aggregates of every column of data.
>>>>
>>>> Adding source code here (also attached):
>>>>
>>>> from pyspark.sql.window import Window
>>>> from pyspark.sql.functions import *
>>>>
>>>> timeSeries = sqlContext.read.option("header", 
>>>> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
>>>>
>>>> # Hive timestamp is interpreted as UNIX timestamp in seconds*
>>>> days = lambda i: i * 86400
>>>>
>>>> w = (Window()
>>>>  .partitionBy("id")
>>>>  .orderBy(col("dt").cast("timestamp").cast("long"))
>>>>  .rangeBetween(-days(6), 0))
>>>>
>>>> cols = ["id", "dt"]
>>>> skipCols = ["id", "dt"]
>>>>
>>>> for col in timeSeries.columns:
>>>> if col in skipCols:
>>>> continue
>>>> cols.append(mean(col).over(w).alias("mean_7_"+col))
>>>> cols.append(count(col).over(w).alias("count_7_"+col))
>>>> cols.append(sum(col).over(w).alias("sum_7_"+col))
>>>> cols.append(min(col).over(w).alias("min_7_"+col))
>>>> cols.append(max(col).over(w).alias("max_7_"+col))
>>>>
>>>> df = timeSeries.select(cols)
>>>> df.orderBy('id', 'dt').write\
>>>> 
>>>> .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")\
>>>> .save("file:///tmp/spark-bug-out.csv")
>>>>
>>>>
>>>> Thanks,
>>>> --
>>>> *Raviteja Lokineni* | Business Intelligence Developer
>>>> TD Ameritrade
>>>>
>>>> E: raviteja.lokin...@gmail.com
>>>>
>>>> [image: View Raviteja Lokineni's profile on LinkedIn]
>>>> <http://in.linkedin.com/in/ravitejalokineni>
>>>>
>>>>
>>>
>>>
>>> --
>>> *Raviteja Lokineni* | Business Intelligence Developer
>>> TD Ameritrade
>>>
>>> E: raviteja.lokin...@gmail.com
>>>
>>> [image: View Raviteja Lokineni's profile on LinkedIn]
>>> <http://in.linkedin.com/in/ravitejalokineni>
>>>
>>>
>>
>
>
> --
> *Raviteja Lokineni* | Business Intelligence Developer
> TD Ameritrade
>
> E: raviteja.lokin...@gmail.com
>
> [image: View Raviteja Lokineni's profile on LinkedIn]
> <http://in.linkedin.com/in/ravitejalokineni>
>
>


Re: Aggregations on every column on dataframe causing StackOverflowError

2016-11-09 Thread Michael Armbrust
Which version of Spark?  Does seem like a bug.

On Wed, Nov 9, 2016 at 10:06 AM, Raviteja Lokineni <
raviteja.lokin...@gmail.com> wrote:

> Does this stacktrace look like a bug guys? Definitely seems like one to me.
>
> Caused by: java.lang.StackOverflowError
>   at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>
>
> On Wed, Nov 9, 2016 at 10:48 AM, Raviteja Lokineni <
> raviteja.lokin...@gmail.com> wrote:
>
>> Hi all,
>>
>> I am not sure if this is a bug or not. Basically I am generating weekly
>> aggregates of every column of data.
>>
>> Adding source code here (also attached):
>>
>> from pyspark.sql.window import Window
>> from pyspark.sql.functions import *
>>
>> timeSeries = sqlContext.read.option("header", 
>> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
>>
>> # Hive timestamp is interpreted as UNIX timestamp in seconds*
>> days = lambda i: i * 86400
>>
>> w = (Window()
>>  .partitionBy("id")
>>  .orderBy(col("dt").cast("timestamp").cast("long"))
>>  .rangeBetween(-days(6), 0))
>>
>> cols = ["id", "dt"]
>> skipCols = ["id", "dt"]
>>
>> for col in timeSeries.columns:
>> if col in skipCols:
>> continue
>> cols.append(mean(col).over(w).alias("mean_7_"+col))
>> cols.append(count(col).over(w).alias("count_7_"+col))
>> cols.append(sum(col).over(w).alias("sum_7_"+col))
>> cols.append(min(col).over(w).alias("min_7_"+col))
>> cols.append(max(col).over(w).alias("max_7_"+col))
>>
>> df = timeSeries.select(cols)
>> df.orderBy('id', 'dt').write\
>> .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")\
>> .save("file:///tmp/spark-bug-out.csv")
>>
>>
>> Thanks,
>> --
>> *Raviteja Lokineni* | Business Intelligence Developer
>> TD Ameritrade
>>
>> E: raviteja.lokin...@gmail.com
>>
>> [image: View Raviteja Lokineni's profile on LinkedIn]
>> 
>>
>>
>
>
> --
> *Raviteja Lokineni* | Business Intelligence Developer
> TD Ameritrade
>
> E: raviteja.lokin...@gmail.com
>
> [image: View Raviteja Lokineni's profile on LinkedIn]
> 
>
>


Re: Upgrading to Spark 2.0.1 broke array in parquet DataFrame

2016-11-07 Thread Michael Armbrust
If you can reproduce the issue with Spark 2.0.2 I'd suggest opening a JIRA.

On Fri, Nov 4, 2016 at 5:11 PM, Sam Goodwin  wrote:

> I have a table with a few columns, some of which are arrays. Since
> upgrading from Spark 1.6 to Spark 2.0.1, the array fields are always null
> when reading in a DataFrame.
>
> When writing the Parquet files, the schema of the column is specified as
>
> StructField("packageIds",ArrayType(StringType))
>
> The schema of the column in the Hive Metastore is
>
> packageIds array
>
> The schema used in the writer exactly matches the schema in the Metastore
> in all ways (order, casing, types etc)
>
> The query is a simple "select *"
>
> spark.sql("select * from tablename limit 1").collect() // null columns in Row
>
> How can I begin debugging this issue? Notable things I've already
> investigated:
>
>- Files were written using Spark 1.6
>- DataFrame works in spark 1.5 and 1.6
>- I've inspected the parquet files using parquet-tools and can see the
>data.
>- I also have another table written in exactly the same way and it
>doesn't have the issue.
>
>


Re: NoSuchElementException

2016-11-07 Thread Michael Armbrust
What are you trying to do?  It looks like you are mixing multiple
SparkContexts together.

On Fri, Nov 4, 2016 at 5:15 PM, Lev Tsentsiper 
wrote:

> My code throws an exception when I am trying to create new DataSet from
> within SteamWriter sink
>
> Simplified version of the code
>
>   val df = sparkSession.readStream
> .format("json")
> .option("nullValue", " ")
> .option("headerFlag", "true")
> .option("spark.sql.shuffle.partitions", 1)
> .option("mode", "FAILFAST")
> .schema(tableSchema)
> .load(s"s3n://")
> df.writeStream
> //TODO Switch to S3 location
> //.option("checkpointLocation", s"$input/$tenant/checkpoints/")
> .option("checkpointLocation", "/tmp/checkpoins/test1")
> .foreach(new ForwachWriter() {
>
>  override def close() = {
> val sparkSession = SparkSession.builder()
>   .config(new SparkConf()
> .setAppName("zzz").set("spark.app.id", ""xxx)
> .set("spark.master", "local[1]")
>   ).getOrCreate()
>
> val data = sparkSession.createDataset(rowList).
> .createOrReplaceTempView(tempTableName)
>  val sql =   sparkSession.sql("")
> sql.repartition(1).foreachPartition(iter=> {})
>  }
>
> });
>
> This code throws an exception
>
> java.util.NoSuchElementException: key not found: 202
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196)
>
> at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:421)
>
> at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$
> readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)
>
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
>
> at org.apache.spark.broadcast.TorrentBroadcast._value$
> lzycompute(TorrentBroadcast.scala:65)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
>
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
>
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> prepareBroadcast(BroadcastHashJoinExec.scala:101)
> at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> codegenOuter(BroadcastHashJoinExec.scala:242)
> at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> doConsume(BroadcastHashJoinExec.scala:83)
> at org.apache.spark.sql.execution.CodegenSupport$class.consume(
> WholeStageCodegenExec.scala:153)
> at 
> org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:150)
>
> at org.apache.spark.sql.execution.RowDataSourceScanExec.
> doProduce(ExistingRDD.scala:217)
> at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
> at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
> at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.CodegenSupport$class.produce(
> WholeStageCodegenExec.scala:78)
> at 
> org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:150)
>
> at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.
> doProduce(BroadcastHashJoinExec.scala:77)
> at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
> at org.apache.spark.sql.execution.CodegenSupport$$
> anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
> at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.CodegenSupport$class.produce(
> WholeStageCodegenExec.scala:78)
> at org.apache.spark.sql.execution.joins.
> BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
> at org.apache.spark.sql.execution.ProjectExec.doProduce(
> basicPhysicalOperators.scala:40)
> at org.apache.spark.sql.execution.CodegenSupport$$
> 

Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-04 Thread Michael Armbrust
>
> sure, but then my values are not sorted per key, right?


It does do a partition local sort. Look at the query plan in my example
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1828840559545742/2840265927289860/latest.html>.
The code here will also take care of finding the boundaries and is pretty
careful to spill / avoid materializing unnecessarily.

I think you are correct though that we are not pushing any of the sort into
the shuffle.  I'm not sure how much that buys you.  If its a lot we could
extend the planner to look for Exchange->Sort pairs and change the exchange.

On Fri, Nov 4, 2016 at 7:06 AM, Koert Kuipers <ko...@tresata.com> wrote:

> i just noticed Sort for Dataset has a global flag. and Dataset also has
> sortWithinPartitions.
>
> how about:
> repartition + sortWithinPartitions + mapPartitions?
>
> the plan looks ok, but it is not clear to me if the sort is done as part
> of the shuffle (which is the important optimization).
>
> scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key",
> "value")
>
> scala> df.repartition(2, col("key")).sortWithinPartitions("value").as[(String,
> String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String,
> StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top
> level non-flat input object)._1, true) AS _1#39, staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
> assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
> object)._2, true) AS _2#40]
> +- MapPartitions , obj#38: scala.Tuple2
>+- DeserializeToObject newInstance(class scala.Tuple2), obj#37:
> scala.Tuple2
>   +- *Sort [value#6 ASC], false, 0
>  +- Exchange hashpartitioning(key#5, 2)
> +- LocalTableScan [key#5, value#6]
>
>
>
>
> On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> sure, but then my values are not sorted per key, right?
>>
>> so a group by key with values sorted according to to some ordering is an
>> operation that can be done efficiently in a single shuffle without first
>> figuring out range boundaries. and it is needed for quite a few algos,
>> including Window and lots of timeseries stuff. but it seems there is no way
>> to express i want to do this yet (at least not in an efficient way).
>>
>> which makes me wonder, what does Window do?
>>
>>
>> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> Thinking out loud is good :)
>>>
>>> You are right in that anytime you ask for a global ordering from Spark
>>> you will pay the cost of figuring out the range boundaries for partitions.
>>> If you say orderBy, though, we aren't sure that you aren't expecting a
>>> global order.
>>>
>>> If you only want to make sure that items are colocated, it is cheaper to
>>> do a groupByKey followed by a flatMapGroups
>>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1828840559545742/2840265927289860/latest.html>
>>> .
>>>
>>>
>>>
>>> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> i guess i could sort by (hashcode(key), key, secondarySortColumn) and
>>>> then do mapPartitions?
>>>>
>>>> sorry thinking out loud a bit here. ok i think that could work. thanks
>>>>
>>>> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> thats an interesting thought about orderBy and mapPartitions. i guess
>>>>> i could emulate a groupBy with secondary sort using those two. however
>>>>> isn't using an orderBy expensive since it is a total sort? i mean a 
>>>>> groupBy
>>>>> with secondary sort is also a total sort under the hood, but its on
>>>>> (hashCode(key), secondarySortColumn) which is easier to distribute and
>>>>> therefore can be implemented more efficiently.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> It is still u

Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Michael Armbrust
Thinking out loud is good :)

You are right in that anytime you ask for a global ordering from Spark you
will pay the cost of figuring out the range boundaries for partitions.  If
you say orderBy, though, we aren't sure that you aren't expecting a global
order.

If you only want to make sure that items are colocated, it is cheaper to do
a groupByKey followed by a flatMapGroups
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1828840559545742/2840265927289860/latest.html>
.



On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i guess i could sort by (hashcode(key), key, secondarySortColumn) and then
> do mapPartitions?
>
> sorry thinking out loud a bit here. ok i think that could work. thanks
>
> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> thats an interesting thought about orderBy and mapPartitions. i guess i
>> could emulate a groupBy with secondary sort using those two. however isn't
>> using an orderBy expensive since it is a total sort? i mean a groupBy with
>> secondary sort is also a total sort under the hood, but its on
>> (hashCode(key), secondarySortColumn) which is easier to distribute and
>> therefore can be implemented more efficiently.
>>
>>
>>
>>
>>
>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> It is still unclear to me why we should remember all these tricks (or
>>>> add lots of extra little functions) when this elegantly can be expressed in
>>>> a reduce operation with a simple one line lamba function.
>>>>
>>> I think you can do that too.  KeyValueGroupedDataset has a reduceGroups
>>> function.  This probably won't be as fast though because you end up
>>> creating objects where as the version I gave will get codgened to operate
>>> on binary data the whole way though.
>>>
>>>> The same applies to these Window functions. I had to read it 3 times to
>>>> understand what it all means. Maybe it makes sense for someone who has been
>>>> forced to use such limited tools in sql for many years but that's not
>>>> necessary what we should aim for. Why can I not just have the sortBy and
>>>> then an Iterator[X] => Iterator[Y] to express what I want to do?
>>>>
>>> We also have orderBy and mapPartitions.
>>>
>>>> All these functions (rank etc.) can be trivially expressed in this,
>>>> plus I can add other operations if needed, instead of being locked in like
>>>> this Window framework.
>>>>
>>>  I agree that window functions would probably not be my first choice for
>>> many problems, but for people coming from SQL it was a very popular
>>> feature.  My real goal is to give as many paradigms as possible in a single
>>> unified framework.  Let people pick the right mode of expression for any
>>> given job :)
>>>
>>
>>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Michael Armbrust
>
> It is still unclear to me why we should remember all these tricks (or add
> lots of extra little functions) when this elegantly can be expressed in a
> reduce operation with a simple one line lamba function.
>
I think you can do that too.  KeyValueGroupedDataset has a reduceGroups
function.  This probably won't be as fast though because you end up
creating objects where as the version I gave will get codgened to operate
on binary data the whole way though.

> The same applies to these Window functions. I had to read it 3 times to
> understand what it all means. Maybe it makes sense for someone who has been
> forced to use such limited tools in sql for many years but that's not
> necessary what we should aim for. Why can I not just have the sortBy and
> then an Iterator[X] => Iterator[Y] to express what I want to do?
>
We also have orderBy and mapPartitions.

> All these functions (rank etc.) can be trivially expressed in this, plus I
> can add other operations if needed, instead of being locked in like this
> Window framework.
>
 I agree that window functions would probably not be my first choice for
many problems, but for people coming from SQL it was a very popular
feature.  My real goal is to give as many paradigms as possible in a single
unified framework.  Let people pick the right mode of expression for any
given job :)


Re: incomplete aggregation in a GROUP BY

2016-11-03 Thread Michael Armbrust
Sounds like a bug, if you can reproduce on 1.6.3 (currently being voted
on), then please open a JIRA.

On Thu, Nov 3, 2016 at 8:05 AM, Donald Matthews  wrote:

> While upgrading a program from Spark 1.5.2 to Spark 1.6.2, I've run into a
> HiveContext GROUP BY that no longer works reliably.
> The GROUP BY results are not always fully aggregated; instead, I get lots
> of duplicate + triplicate sets of group values.
>
> I've come up with a workaround that works for me, but the behaviour in
> question seems like a Spark bug, and since I don't see anything matching
> this in the Spark Jira or on this list, I thought I should check with this
> list to see if it's a known issue or if it might be worth creating a ticket
> for.
>
> Input:  A single table with 24 fields that I want to group on, and a few
> other fields that I want to aggregate.
>
> Statement: similar to hiveContext.sql("""
> SELECT a,b,c, ..., x, count(y) as yc, sum(z1) as z1s, sum(z2) as z2s
> FROM inputTable
> GROUP BY a,b,c, ..., x
> """)
>
> Checking the data for one sample run, I see that the input table has about
> 1.1M rows, with 18157 unique combinations of those 24 grouped values.
>
> Expected output: A table of 18157 rows.
>
> Observed output: A table of 28006 rows. Looking just at unique
> combinations of those grouped fields, I see that while 10125 rows are
> unique as expected, there are 6215 duplicate rows and 1817 triplicate rows.
>
> This is not quite 100% repeatable. That is, I'll see the issue repeatedly
> one day, but the next day with the same input data the GROUP BY will work
> correctly.
>
> For now it seems that I have a workaround: if I presort the input table on
> those grouped fields, the GROUP BY works correctly. But of course I
> shouldn't have to do that.
>
> Does this sort of GROUP BY issue seem familiar to anyone?
>
> /drm
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Michael Armbrust
You are looking to perform an *argmax*, which you can do with a single
aggregation.  Here is an example

.

On Thu, Nov 3, 2016 at 4:53 AM, Rabin Banerjee  wrote:

> Hi All ,
>
>   I want to do a dataframe operation to find the rows having the latest
> timestamp in each group using the below operation
>
> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
> .select("customername","service_type","mobileno","cust_addr")
>
>
> *Spark Version :: 1.6.x*
>
> My Question is *"Will Spark guarantee the Order while doing the groupBy , if 
> DF is ordered using OrderBy previously in Spark 1.6.x"??*
>
>
> *I referred a blog here :: 
> **https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>  
> *
>
> *Which claims it will work except in Spark 1.5.1 and 1.5.2 .*
>
>
> *I need a bit elaboration of how internally spark handles it ? also is it 
> more efficient than using a Window function ?*
>
>
> *Thanks in Advance ,*
>
> *Rabin Banerjee*
>
>
>
>


Re: How to return a case class in map function?

2016-11-02 Thread Michael Armbrust
Thats a bug.  Which version of Spark are you running?  Have you tried 2.0.2?

On Wed, Nov 2, 2016 at 12:01 AM, 颜发才(Yan Facai)  wrote:

> Hi, all.
> When I use a case class as return value in map function, spark always
> raise a ClassCastException.
>
> I write an demo, like:
>
> scala> case class Record(key: Int, value: String)
>
> scala> case class ID(key: Int)
>
> scala> val df = Seq(Record(1, "a"), Record(2, "b")).toDF
>
> scala> df.map{x => ID(x.getInt(0))}.show
>
> 16/11/02 14:52:34 ERROR Executor: Exception in task 0.0 in stage 166.0
> (TID 175)
> java.lang.ClassCastException: $line1401.$read$$iw$$iw$ID cannot be cast to
> $line1401.$read$$iw$$iw$ID
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
>
>
> Please tell me if I'm wrong.
> Thanks.
>
>


Re: error: Unable to find encoder for type stored in a Dataset. when trying to map through a DataFrame

2016-11-02 Thread Michael Armbrust
Spark doesn't know how to turn a Seq[Any] back into a row.  You would need
to create a case class or something where we can figure out the schema.
What are you trying to do?

If you don't care about specifics fields and you just want to serialize the
type you can use kryo:

implicit val anyEncoder = Encoders.kryo[Seq[Any]]

On Wed, Nov 2, 2016 at 9:57 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I have the following scenario:
>
> scala> val df = spark.sql("select * from danieltest3")
> df: org.apache.spark.sql.DataFrame = [iid: string, activity: string ... 34
> more fields]
>
> Now I'm trying to map through the rows I'm getting:
> scala> df.map(r=>r.toSeq)
> :32: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing spark.implicits._  Support for serializing other
> types will be added in future releases.
>df.map(r=>r.toSeq)
>
>
> What am I missing here ?
>
> Thank you,
> Daniel
>


Re: Does Data pipeline using kafka and structured streaming work?

2016-11-01 Thread Michael Armbrust
Yeah, those are all requests for additional features / version support.
I've been using kafka with structured streaming to do both ETL into
partitioned parquet tables as well as streaming event time windowed
aggregation for several weeks now.

On Tue, Nov 1, 2016 at 6:18 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Look at the resolved subtasks attached to that ticket you linked.
> Some of them are unresolved, but basic functionality is there.
>
> On Tue, Nov 1, 2016 at 7:37 PM, shyla deshpande
> <deshpandesh...@gmail.com> wrote:
> > Hi Michael,
> >
> > Thanks for the reply.
> >
> > The following link says there is a open unresolved Jira for Structured
> > streaming support for consuming from Kafka.
> >
> > https://issues.apache.org/jira/browse/SPARK-15406
> >
> > Appreciate your help.
> >
> > -Shyla
> >
> >
> > On Tue, Nov 1, 2016 at 5:19 PM, Michael Armbrust <mich...@databricks.com
> >
> > wrote:
> >>
> >> I'm not aware of any open issues against the kafka source for structured
> >> streaming.
> >>
> >> On Tue, Nov 1, 2016 at 4:45 PM, shyla deshpande <
> deshpandesh...@gmail.com>
> >> wrote:
> >>>
> >>> I am building a data pipeline using Kafka, Spark streaming and
> Cassandra.
> >>> Wondering if the issues with  Kafka source fixed in Spark 2.0.1. If
> not,
> >>> please give me an update on when it may be fixed.
> >>>
> >>> Thanks
> >>> -Shyla
> >>
> >>
> >
>


Re: Does Data pipeline using kafka and structured streaming work?

2016-11-01 Thread Michael Armbrust
I'm not aware of any open issues against the kafka source for structured
streaming.

On Tue, Nov 1, 2016 at 4:45 PM, shyla deshpande 
wrote:

> I am building a data pipeline using Kafka, Spark streaming and Cassandra.
> Wondering if the issues with  Kafka source fixed in Spark 2.0.1. If not,
> please give me an update on when it may be fixed.
>
> Thanks
> -Shyla
>


Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-11-01 Thread Michael Armbrust
registerTempTable is backed by an in-memory hash table that maps table name
(a string) to a logical query plan.  Fragments of that logical query plan
may or may not be cached (but calling register alone will not result in any
materialization of results).  In Spark 2.0 we renamed this function to
createOrReplaceTempView, since a traditional RDBMs view is a better analogy
here.

If I was trying to augment the engine to make better use of HBase's
internal ordering, I'd probably use the experimental ability to inject
extra strategies into the query planner.  Essentially, you could look for
filters on top of BaseRelations (the internal class used to map DataSources
into the query plan) where there is a range filter on some prefix of the
table's key.  When this is detected, you could return an RDD that contains
the already filtered result talking directly to HBase, which would override
the default execution pathway.

I wrote up a (toy) example of using this API
,
which might be helpful.

On Tue, Nov 1, 2016 at 4:11 AM, Mich Talebzadeh 
wrote:

> it would be great if we establish this.
>
> I know in Hive these temporary tables "CREATE TEMPRARY TABLE ..." are
> private to the session and are put in a hidden staging directory as below
>
> /user/hive/warehouse/.hive-staging_hive_2016-07-10_22-58-
> 47_319_5605745346163312826-10
>
> and removed when the session ends or table is dropped
>
> Not sure how Spark handles this.
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 1 November 2016 at 10:50, Michael David Pedersen  googlemail.com> wrote:
>
>> Thanks for the link, I hadn't come across this.
>>
>> According to https://forums.databricks.com/questions/400/what-is-the-diff
>>> erence-between-registertemptable-a.html
>>>
>>> and I quote
>>>
>>> "registerTempTable()
>>>
>>> registerTempTable() creates an in-memory table that is scoped to the
>>> cluster in which it was created. The data is stored using Hive's
>>> highly-optimized, in-memory columnar format."
>>>
>> But then the last post in the thread corrects this, saying:
>> "registerTempTable does not create a 'cached' in-memory table, but rather
>> an alias or a reference to the DataFrame. It's akin to a pointer in C/C++
>> or a reference in Java".
>>
>> So - probably need to dig into the sources to get more clarity on this.
>>
>> Cheers,
>> Michael
>>
>
>


Re: importing org.apache.spark.Logging class

2016-10-27 Thread Michael Armbrust
This was made internal to Spark.  I'd suggest that you use slf4j directly.

On Thu, Oct 27, 2016 at 2:42 PM, Reth RM  wrote:

> Updated spark to version 2.0.0 and have issue with importing
> org.apache.spark.Logging
>
> Any suggested fix for this issue?
>


Re: Reading AVRO from S3 - No parallelism

2016-10-27 Thread Michael Armbrust
How big are your avro files?  We collapse many small files into a single
partition to eliminate scheduler overhead.  If you need explicit
parallelism you can also repartition.

On Thu, Oct 27, 2016 at 5:19 AM, Prithish  wrote:

> I am trying to read a bunch of AVRO files from a S3 folder using Spark
> 2.0. No matter how many executors I use or what configuration changes I
> make, the cluster doesn't seem to use all the executors. I am using the
> com.databricks.spark.avro library from databricks to read the AVRO.
>
> However, if I try the same on CSV files (same S3 folder, same
> configuration and cluster), it does use all executors.
>
> Is there something that I need to do to enable parallelism when using the
> AVRO databricks library?
>
> Thanks for your help.
>
>
>


Re: Dataframe schema...

2016-10-26 Thread Michael Armbrust
On Fri, Oct 21, 2016 at 8:40 PM, Koert Kuipers  wrote:

> This rather innocent looking optimization flag nullable has caused a lot
> of bugs... Makes me wonder if we are better off without it
>

Yes... my most regretted design decision :(

Please give thoughts here: https://issues.apache.org/jira/browse/SPARK-17939


Re: [Spark 2.0.1] Error in generated code, possible regression?

2016-10-26 Thread Michael Armbrust
I think that there should be comments that show the expressions that are
getting compiled.  Maybe make a gist with the whole generated code fragment?

On Wed, Oct 26, 2016 at 3:45 PM, Efe Selcuk <efema...@gmail.com> wrote:

> I do plan to do that Michael. Do you happen to know of any guidelines for
> tracking down the context of this generated code?
>
> On Wed, Oct 26, 2016 at 3:42 PM Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> If you have a reproduction you can post for this, it would be great if
>> you could open a JIRA.
>>
>> On Mon, Oct 24, 2016 at 6:21 PM, Efe Selcuk <efema...@gmail.com> wrote:
>>
>> I have an application that works in 2.0.0 but has been dying at runtime
>> on the 2.0.1 distribution.
>>
>> at org.apache.spark.sql.catalyst.expressions.codegen.
>> CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$
>> CodeGenerator$$doCompile(CodeGenerator.scala:893)
>> at org.apache.spark.sql.catalyst.expressions.codegen.
>> CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
>> at org.apache.spark.sql.catalyst.expressions.codegen.
>> CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
>> at org.spark_project.guava.cache.LocalCache$LoadingValueReference.
>> loadFuture(LocalCache.java:3599)
>> at org.spark_project.guava.cache.LocalCache$Segment.loadSync(
>> LocalCache.java:2379)
>> ... 30 more
>> Caused by: org.codehaus.commons.compiler.CompileException: File
>> 'generated.java', Line 74, Column 145: Unknown variable or type "value4"
>>
>> It also includes a massive 1800-line generated code output (which repeats
>> over and over, even on 1 thread, which makes this a pain), but fortunately
>> the error occurs early so I can give at least some context.
>>
>> /* 001 */ public java.lang.Object generate(Object[] references) {
>> /* 002 */   return new SpecificMutableProjection(references);
>> /* 003 */ }
>> /* 004 */
>> /* 005 */ class SpecificMutableProjection extends
>> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>> /* 006 */
>> /* 007 */   private Object[] references;
>> /* 008 */   private MutableRow mutableRow;
>> /* 009 */   private Object[] values;
>> ... // many lines of class variables, mostly errMsg strings and Object[]
>> /* 071 */   private void apply2_7(InternalRow i) {
>> /* 072 */
>> /* 073 */ boolean isNull215 = false;
>> /* 074 */ final com.mypackage.MyThing value215 = isNull215 ? null :
>> (com.mypackage.MyThing) value4._2();
>> /* 075 */ isNull215 = value215 == null;
>> /* 076 */
>> ...
>>
>> As you can see, on line 74 there's a reference to value4 but nothing
>> called value4 has been defined. I have no idea of where to even begin
>> looking for what caused this, or even whether it's my fault or a bug in the
>> code generation. Any help is appreciated.
>>
>> Efe
>>
>>
>>


Re: [Spark 2.0.1] Error in generated code, possible regression?

2016-10-26 Thread Michael Armbrust
If you have a reproduction you can post for this, it would be great if you
could open a JIRA.

On Mon, Oct 24, 2016 at 6:21 PM, Efe Selcuk  wrote:

> I have an application that works in 2.0.0 but has been dying at runtime on
> the 2.0.1 distribution.
>
> at org.apache.spark.sql.catalyst.expressions.codegen.
> CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$
> CodeGenerator$$doCompile(CodeGenerator.scala:893)
> at org.apache.spark.sql.catalyst.expressions.codegen.
> CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
> at org.apache.spark.sql.catalyst.expressions.codegen.
> CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
> at org.spark_project.guava.cache.LocalCache$LoadingValueReference.
> loadFuture(LocalCache.java:3599)
> at org.spark_project.guava.cache.LocalCache$Segment.loadSync(
> LocalCache.java:2379)
> ... 30 more
> Caused by: org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 74, Column 145: Unknown variable or type "value4"
>
> It also includes a massive 1800-line generated code output (which repeats
> over and over, even on 1 thread, which makes this a pain), but fortunately
> the error occurs early so I can give at least some context.
>
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */   return new SpecificMutableProjection(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificMutableProjection extends
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
> /* 006 */
> /* 007 */   private Object[] references;
> /* 008 */   private MutableRow mutableRow;
> /* 009 */   private Object[] values;
> ... // many lines of class variables, mostly errMsg strings and Object[]
> /* 071 */   private void apply2_7(InternalRow i) {
> /* 072 */
> /* 073 */ boolean isNull215 = false;
> /* 074 */ final com.mypackage.MyThing value215 = isNull215 ? null :
> (com.mypackage.MyThing) value4._2();
> /* 075 */ isNull215 = value215 == null;
> /* 076 */
> ...
>
> As you can see, on line 74 there's a reference to value4 but nothing
> called value4 has been defined. I have no idea of where to even begin
> looking for what caused this, or even whether it's my fault or a bug in the
> code generation. Any help is appreciated.
>
> Efe
>
>


Re: Resiliency with SparkStreaming - fileStream

2016-10-26 Thread Michael Armbrust
I'll answer in the context of structured streaming (the new streaming API
build on DataFrames). When reading from files, the FileSource, records
which files are included in each batch inside of the given
checkpointLocation.  If you fail in the middle of a batch, the streaming
engine will retry that batch next time the query is restarted.

If you are concerned about exactly-once semantics, you can get that too.
The FileSink (i.e. using writeStream) writing out to something like parquet
does this automatically.  If you are writing to something like a
transactional database yourself, you can also implement similar
functionality.  Specifically, you can record the partition and version that
are provided by the open method

into
the database in the same transaction that is writing the data.  This way,
when you recover you can avoid writing the same updates more than once.

On Wed, Oct 26, 2016 at 9:20 AM, Scott W  wrote:

> Hello,
>
> I'm planning to use fileStream Spark streaming API to stream data from
> HDFS. My Spark job would essentially process these files and post the
> results to an external endpoint.
>
> *How does fileStream API handle checkpointing of the file it processed ? *In
> other words, if my Spark job failed while posting the results to an
> external endpoint, I want that same original file to be picked up again and
> get reprocessed.
>
> Thanks much!
>


Re: LIMIT issue of SparkSQL

2016-10-24 Thread Michael Armbrust
It is not about limits on specific tables.  We do support that.  The case
I'm describing involves pushing limits across system boundaries.  It is
certainly possible to do this, but the current datasource API does provide
this information (other than the implicit limit that is pushed down to the
consumed iterator of the data source).

On Mon, Oct 24, 2016 at 9:11 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> This is an interesting point.
>
> As far as I know in any database (practically all RDBMS Oracle, SAP etc),
> the LIMIT affects the collection part of the result set.
>
> The result set is carried out fully on the query that may involve multiple
> joins on multiple underlying tables.
>
> To limit the actual query by LIMIT on each underlying table does not make
> sense and will not be industry standard AFAK.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 24 October 2016 at 06:48, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> - dev + user
>>
>> Can you give more info about the query?  Maybe a full explain()?  Are you
>> using a datasource like JDBC?  The API does not currently push down limits,
>> but the documentation talks about how you can use a query instead of a
>> table if that is what you are looking to do.
>>
>> On Mon, Oct 24, 2016 at 5:40 AM, Liz Bai <liz...@icloud.com> wrote:
>>
>>> Hi all,
>>>
>>> Let me clarify the problem:
>>>
>>> Suppose we have a simple table `A` with 100 000 000 records
>>>
>>> Problem:
>>> When we execute sql query ‘select * from A Limit 500`,
>>> It scan through all 100 000 000 records.
>>> Normal behaviour should be that once 500 records is found, engine stop
>>> scanning.
>>>
>>> Detailed observation:
>>> We found that there are “GlobalLimit / LocalLimit” physical operators
>>> https://github.com/apache/spark/blob/branch-2.0/sql/core/src
>>> /main/scala/org/apache/spark/sql/execution/limit.scala
>>> But during query plan generation, GlobalLimit / LocalLimit is not
>>> applied to the query plan.
>>>
>>> Could you please help us to inspect LIMIT problem?
>>> Thanks.
>>>
>>> Best,
>>> Liz
>>>
>>> On 23 Oct 2016, at 10:11 PM, Xiao Li <gatorsm...@gmail.com> wrote:
>>>
>>> Hi, Liz,
>>>
>>> CollectLimit means `Take the first `limit` elements and collect them to
>>> a single partition.`
>>>
>>> Thanks,
>>>
>>> Xiao
>>>
>>> 2016-10-23 5:21 GMT-07:00 Ran Bai <liz...@icloud.com>:
>>>
>>>> Hi all,
>>>>
>>>> I found the runtime for query with or without “LIMIT” keyword is the
>>>> same. We looked into it and found actually there is “GlobalLimit /
>>>> LocalLimit” in logical plan, however no relevant physical plan there. Is
>>>> this a bug or something else? Attached are the logical and physical plans
>>>> when running "SELECT * FROM seq LIMIT 1".
>>>>
>>>>
>>>> More specifically, We expected a early stop upon getting adequate
>>>> results.
>>>> Thanks so much.
>>>>
>>>> Best,
>>>> Liz
>>>>
>>>>
>>>>
>>>>
>>>> -
>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>
>>>
>>>
>>>
>>
>


Re: LIMIT issue of SparkSQL

2016-10-23 Thread Michael Armbrust
- dev + user

Can you give more info about the query?  Maybe a full explain()?  Are you
using a datasource like JDBC?  The API does not currently push down limits,
but the documentation talks about how you can use a query instead of a
table if that is what you are looking to do.

On Mon, Oct 24, 2016 at 5:40 AM, Liz Bai  wrote:

> Hi all,
>
> Let me clarify the problem:
>
> Suppose we have a simple table `A` with 100 000 000 records
>
> Problem:
> When we execute sql query ‘select * from A Limit 500`,
> It scan through all 100 000 000 records.
> Normal behaviour should be that once 500 records is found, engine stop
> scanning.
>
> Detailed observation:
> We found that there are “GlobalLimit / LocalLimit” physical operators
> https://github.com/apache/spark/blob/branch-2.0/sql/
> core/src/main/scala/org/apache/spark/sql/execution/limit.scala
> But during query plan generation, GlobalLimit / LocalLimit is not applied
> to the query plan.
>
> Could you please help us to inspect LIMIT problem?
> Thanks.
>
> Best,
> Liz
>
> On 23 Oct 2016, at 10:11 PM, Xiao Li  wrote:
>
> Hi, Liz,
>
> CollectLimit means `Take the first `limit` elements and collect them to a
> single partition.`
>
> Thanks,
>
> Xiao
>
> 2016-10-23 5:21 GMT-07:00 Ran Bai :
>
>> Hi all,
>>
>> I found the runtime for query with or without “LIMIT” keyword is the
>> same. We looked into it and found actually there is “GlobalLimit /
>> LocalLimit” in logical plan, however no relevant physical plan there. Is
>> this a bug or something else? Attached are the logical and physical plans
>> when running "SELECT * FROM seq LIMIT 1".
>>
>>
>> More specifically, We expected a early stop upon getting adequate results.
>> Thanks so much.
>>
>> Best,
>> Liz
>>
>>
>>
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
>


Re: Dataframe schema...

2016-10-20 Thread Michael Armbrust
What is the issue you see when unioning?

On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar <bablo...@gmail.com> wrote:

> Hello Michael,
>
> Thank you for looking into this query. In my case there seem to be an
> issue when I union a parquet file read from disk versus another dataframe
> that I construct in-memory. The only difference I see is the containsNull =
> true. In fact, I do not see any errors with union on the simple schema of
> "col1 thru col4" above. But the problem seem to exist only on that
> "some_histogram" column which contains the mixed containsNull = true/false.
> Let me know if this helps.
>
> Thanks,
> Muthu
>
>
>
> On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Nullable is just a hint to the optimizer that its impossible for there to
>> be a null value in this column, so that it can avoid generating code for
>> null-checks.  When in doubt, we set nullable=true since it is always safer
>> to check.
>>
>> Why in particular are you trying to change the nullability of the column?
>>
>> On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar <bablo...@gmail.com>
>> wrote:
>>
>>> Hello there,
>>>
>>> I am trying to understand how and when does DataFrame (or Dataset) sets
>>> nullable = true vs false on a schema.
>>>
>>> Here is my observation from a sample code I tried...
>>>
>>>
>>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
>>> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>>> lit("bla")).printSchema()
>>> root
>>>  |-- col1: integer (nullable = false)
>>>  |-- col2: string (nullable = true)
>>>  |-- col3: double (nullable = false)
>>>  |-- col4: string (nullable = false)
>>>
>>>
>>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
>>> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>>> lit("bla")).write.parquet("/tmp/sample.parquet")
>>>
>>> scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
>>> root
>>>  |-- col1: integer (nullable = true)
>>>  |-- col2: string (nullable = true)
>>>  |-- col3: double (nullable = true)
>>>  |-- col4: string (nullable = true)
>>>
>>>
>>> The place where this seem to get me into trouble is when I try to union
>>> one data-structure from in-memory (notice that in the below schema the
>>> highlighted element is represented as 'false' for in-memory created schema)
>>> and one from file that starts out with a schema like below...
>>>
>>>  |-- some_histogram: struct (nullable = true)
>>>  ||-- values: array (nullable = true)
>>>  |||-- element: double (containsNull = true)
>>>  ||-- freq: array (nullable = true)
>>>  |||-- element: long (containsNull = true)
>>>
>>> Is there a way to convert this attribute from true to false without
>>> running any mapping / udf on that column?
>>>
>>> Please advice,
>>> Muthu
>>>
>>
>>
>


Re: How does Spark determine in-memory partition count when reading Parquet ~files?

2016-10-19 Thread Michael Armbrust
In spark 2.0 we bin-pack small files into a single task to avoid
overloading the scheduler.  If you want a specific number of partitions you
should repartition.  If you want to disable this optimization you can set
the file open cost very high: spark.sql.files.openCostInBytes

On Tue, Oct 18, 2016 at 7:04 PM, shea.parkes  wrote:

> When reading a parquet ~file with >50 parts, Spark is giving me a DataFrame
> object with far fewer in-memory partitions.
>
> I'm happy to troubleshoot this further, but I don't know Scala well and
> could use some help pointing me in the right direction.  Where should I be
> looking in the code base to understand how many partitions will result from
> reading a parquet ~file?
>
> Thanks,
>
> Shea
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-does-Spark-determine-in-
> memory-partition-count-when-reading-Parquet-files-tp27921.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Dataframe schema...

2016-10-19 Thread Michael Armbrust
Nullable is just a hint to the optimizer that its impossible for there to
be a null value in this column, so that it can avoid generating code for
null-checks.  When in doubt, we set nullable=true since it is always safer
to check.

Why in particular are you trying to change the nullability of the column?

On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar  wrote:

> Hello there,
>
> I am trying to understand how and when does DataFrame (or Dataset) sets
> nullable = true vs false on a schema.
>
> Here is my observation from a sample code I tried...
>
>
> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
> lit("bla")).printSchema()
> root
>  |-- col1: integer (nullable = false)
>  |-- col2: string (nullable = true)
>  |-- col3: double (nullable = false)
>  |-- col4: string (nullable = false)
>
>
> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
> lit("bla")).write.parquet("/tmp/sample.parquet")
>
> scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
> root
>  |-- col1: integer (nullable = true)
>  |-- col2: string (nullable = true)
>  |-- col3: double (nullable = true)
>  |-- col4: string (nullable = true)
>
>
> The place where this seem to get me into trouble is when I try to union
> one data-structure from in-memory (notice that in the below schema the
> highlighted element is represented as 'false' for in-memory created schema)
> and one from file that starts out with a schema like below...
>
>  |-- some_histogram: struct (nullable = true)
>  ||-- values: array (nullable = true)
>  |||-- element: double (containsNull = true)
>  ||-- freq: array (nullable = true)
>  |||-- element: long (containsNull = true)
>
> Is there a way to convert this attribute from true to false without
> running any mapping / udf on that column?
>
> Please advice,
> Muthu
>


Re: Questions about DataFrame's filter()

2016-09-29 Thread Michael Armbrust
-dev +user

It surprises me as `filter()` takes a Column, not a `Row => Boolean`.


There are several overloaded versions of Dataset.filter(...)

def filter(func: FilterFunction[T]): Dataset[T]
def filter(func: (T) ⇒ Boolean): Dataset[T]
def filter(conditionExpr: String): Dataset[T]
def filter(condition: Column): Dataset[T]

... and why the error occurs.  Can someone explain please?


Anytime the compiler fails like that, it is probably a Spark code
generation bug.  It would be awesome if you could try your application on
Spark 2.0.1 (currently voting on RC3) and see if its fixed.  If not, please
open a JIRA.

Michael

On Thu, Sep 29, 2016 at 9:16 AM, Samy Dindane  wrote:

> Hi,
>
> I noticed that the following code compiles:
>
>   val df = spark.read.format("com.databricks.spark.avro").load("/tmp/
> whatever/output")
>   val count = df.filter(x => x.getAs[Int]("day") == 2).count
>
> It surprises me as `filter()` takes a Column, not a `Row => Boolean`.
>
> Also, this code returns the right result, but takes 1m30 to run (while it
> takes less than 1 second when using `$"day" === 2`) and gives the error
> pasted in the bottom of this message.
>
> I was just wondering why it does work (implicit conversion?), why it is
> slow, and why the error occurs.
> Can someone explain please?
>
> Thank you,
>
> Samy
>
> --
>
> [error] org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 398, Column 41: Expression "scan_isNull10" is not an
> rvalue
> [error] at org.codehaus.janino.UnitCompil
> er.compileError(UnitCompiler.java:10174)
> [error] at org.codehaus.janino.UnitCompil
> er.toRvalueOrCompileException(UnitCompiler.java:6036)
> [error] at org.codehaus.janino.UnitCompil
> er.getConstantValue2(UnitCompiler.java:4440)
> [error] at org.codehaus.janino.UnitCompil
> er.access$9900(UnitCompiler.java:185)
> [error] at org.codehaus.janino.UnitCompil
> er$11.visitAmbiguousName(UnitCompiler.java:4417)
> [error] at org.codehaus.janino.Java$Ambig
> uousName.accept(Java.java:3138)
> [error] at org.codehaus.janino.UnitCompil
> er.getConstantValue(UnitCompiler.java:4427)
> [error] at org.codehaus.janino.UnitCompil
> er.getConstantValue2(UnitCompiler.java:4634)
> [error] at org.codehaus.janino.UnitCompil
> er.access$8900(UnitCompiler.java:185)
> [error] at org.codehaus.janino.UnitCompil
> er$11.visitBinaryOperation(UnitCompiler.java:4394)
> [error] at org.codehaus.janino.Java$Binar
> yOperation.accept(Java.java:3768)
> [error] at org.codehaus.janino.UnitCompil
> er.getConstantValue(UnitCompiler.java:4427)
> [error] at org.codehaus.janino.UnitCompil
> er.compileGetValue(UnitCompiler.java:4360)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:1845)
> [error] at org.codehaus.janino.UnitCompil
> er.access$2000(UnitCompiler.java:185)
> [error] at org.codehaus.janino.UnitCompil
> er$4.visitLocalVariableDeclarationStatement(UnitCompiler.java:945)
> [error] at org.codehaus.janino.Java$Local
> VariableDeclarationStatement.accept(Java.java:2508)
> [error] at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:958)
> [error] at org.codehaus.janino.UnitCompil
> er.compileStatements(UnitCompiler.java:1007)
> [error] at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:2293)
> [error] at org.codehaus.janino.UnitCompil
> er.compileDeclaredMethods(UnitCompiler.java:822)
> [error] at org.codehaus.janino.UnitCompil
> er.compileDeclaredMethods(UnitCompiler.java:794)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:507)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:658)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:662)
> [error] at org.codehaus.janino.UnitCompil
> er.access$600(UnitCompiler.java:185)
> [error] at org.codehaus.janino.UnitCompil
> er$2.visitMemberClassDeclaration(UnitCompiler.java:350)
> [error] at org.codehaus.janino.Java$Membe
> rClassDeclaration.accept(Java.java:1035)
> [error] at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:354)
> [error] at org.codehaus.janino.UnitCompil
> er.compileDeclaredMemberTypes(UnitCompiler.java:769)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:532)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:393)
> [error] at org.codehaus.janino.UnitCompil
> er.access$400(UnitCompiler.java:185)
> [error] at org.codehaus.janino.UnitCompil
> er$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
> [error] at org.codehaus.janino.Java$Packa
> geMemberClassDeclaration.accept(Java.java:1139)
> [error] at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:354)
> [error] 

Re: Dataset doesn't have partitioner after a repartition on one of the columns

2016-09-28 Thread Michael Armbrust
Hi Darin,

In SQL we have finer grained information about partitioning, so we don't
use the RDD Partitioner.  Here's a notebook
that
walks through what we do expose and how it is used by the query planner.

Michael

On Tue, Sep 20, 2016 at 11:22 AM, McBeath, Darin W (ELS-STL) <
d.mcbe...@elsevier.com> wrote:

> I’m using Spark 2.0.
>
> I’ve created a dataset from a parquet file and repartition on one of the
> columns (docId) and persist the repartitioned dataset.
>
> val om = ds.repartition($"docId”).persist(StorageLevel.MEMORY_AND_DISK)
>
> When I try to confirm the partitioner, with
>
> om.rdd.partitioner
>
> I get
>
> Option[org.apache.spark.Partitioner] = None
>
> I would have thought it would be HashPartitioner.
>
> Does anyone know why this would be None and not HashPartitioner?
>
> Thanks.
>
> Darin.
>
>
>


Re: Spark 2.0 Structured Streaming: sc.parallelize in foreach sink cause Task not serializable error

2016-09-26 Thread Michael Armbrust
The code in ForeachWriter runs on the executors, which means that you are
not allowed to use the SparkContext.  This is probably why you are seeing
that exception.

On Sun, Sep 25, 2016 at 3:20 PM, Jianshi  wrote:

> Dear all:
>
> I am trying out the new released feature of structured streaming in Spark
> 2.0. I use the Structured Streaming to perform windowing by event time. I
> can print out the result in the console.  I would like to write the result
> to  Cassandra database through the foreach sink option. I am trying to use
> the spark-cassandra-connector to save the result. The connector saves rdd
> to
> Cassandra by calling rdd.saveToCassandra(), and this works fine if I
> execute
> the commands in spark-shell. For example:
> import com.datastax.spark.connector._
> val col = sc.parallelize(Seq(("of", 1200), ("the", "863")))
> col.saveToCassandra(keyspace, table)
>
> However, when I use the sc.parallelize inside foreach sink, it raise an
> error. The input file is Json messages with each row like the following:
> {"id": text, "time":timestamp,"hr": int}
>
> Here is my code:
>
> object StructStream {
>   def main(args: Array[String]) {
> val conf = new SparkConf(true).set("spark.cassandra.connection.host",
> "172.31.0.174")
> val sc = new SparkContext(conf)
> val sqlContext = new SQLContext(sc)
> val spark =
> SparkSession.builder.appName("StructuredAverage").getOrCreate()
> import spark.implicits._
>
> val userSchema = new StructType().add("id", "string").add("hr",
> "integer").add("time","timestamp")
> val jsonDF =
> spark.readStream.schema(userSchema).json("hdfs://ec2-
> 52-45-70-95.compute-1.amazonaws.com:9000/test3/")
> val line_count = jsonDF.groupBy(window($"time","2 minutes","1
> minutes"),
> $"id").count().orderBy("window")
>
> import org.apache.spark.sql.ForeachWriter
>
> val writer = new ForeachWriter[org.apache.spark.sql.Row] {
>   override def open(partitionId: Long, version: Long) = true
>   override def process(value: org.apache.spark.sql.Row) = {
> val toRemove = "[]".toSet
> val v_str = value.toString().filterNot(toRemove).split(",")
> val v_df =
> sc.parallelize(Seq(Stick(v_str(2),v_str(3).toInt,v_str(1),v_str(0
> v_df.saveToCassandra("playground","sstest")
> println(v_str(0),v_str(1),v_str(2),v_str(3))}
>   override def close(errorOrNull: Throwable) = ()
> }
>
> val query =
> line_count.writeStream.outputMode("complete").foreach(writer).start()
>
> query.awaitTermination()
>
>   }
>
> }
>
> case class Stick(aid: String, bct:Int, cend: String, dst: String)
>
> *
> The error message looks like this:*
>
> Error:
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:298)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(
> ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:882)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:881)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:881)
> at
> org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply$mcV$sp(Dataset.scala:2117)
> at
> org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply(Dataset.scala:2117)
> at
> org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply(Dataset.scala:2117)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:57)
> at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.
> scala:2532)
> at org.apache.spark.sql.Dataset.foreachPartition(Dataset.
> scala:2116)
> at
> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(
> ForeachSink.scala:69)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$
> spark$sql$execution$streaming$StreamExecution$$runBatch(
> StreamExecution.scala:375)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$
> apache$spark$sql$execution$streaming$StreamExecution$$
> runBatches$1.apply$mcZ$sp(StreamExecution.scala:194)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.
> execute(TriggerExecutor.scala:43)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$
> spark$sql$execution$streaming$StreamExecution$$runBatches(
> 

Re: udf forces usage of Row for complex types?

2016-09-26 Thread Michael Armbrust
I agree this should work.  We just haven't finished killing the old
reflection based conversion logic now that we have more powerful/efficient
encoders.  Please open a JIRA.

On Sun, Sep 25, 2016 at 2:41 PM, Koert Kuipers  wrote:

> after having gotten used to have case classes represent complex structures
> in Datasets, i am surprised to find out that when i work in DataFrames with
> udfs no such magic exists, and i have to fall back to manipulating Row
> objects, which is error prone and somewhat ugly.
>
> for example:
> case class Person(name: String, age: Int)
>
> val df = Seq((Person("john", 33), 5), (Person("mike", 30),
> 6)).toDF("person", "id")
> val df1 = df.withColumn("person", udf({ (p: Person) => p.copy(age = p.age
> + 1) }).apply(col("person")))
> df1.printSchema
> df1.show
>
> leads to:
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
> cannot be cast to Person
>


Re: Spark SQL - Applying transformation on a struct inside an array

2016-09-15 Thread Michael Armbrust
Is what you are looking for a withColumn that support in place modification
of nested columns? or is it some other problem?

On Wed, Sep 14, 2016 at 11:07 PM, Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> I tried to use the RowEncoder but got stuck along the way :
> The main issue really is that even if it's possible (however tedious) to
> pattern match generically Row(s) and target the nested field that you need
> to modify, Rows being immutable data structure without a method like a case
> class's copy or any kind of lens to create a brand new object, I ended up
> stuck at the step "target and extract the field to update" without any way
> to update the original Row with the new value.
>
> To sum up, I tried :
>
>- using only dataframe's API itself + my udf - which works for nested
>structs as long as no arrays are along the way
>- trying to create a udf the can apply on Row and pattern match
>recursively the path I needed to explore/modify
>- trying to create a UDT - but we seem to be stuck in a strange
>middle-ground with 2.0 because some parts of the API ended up private while
>some stayed public making it impossible to use it now (I'd be glad if I'm
>mistaken)
>
> All of these failed for me and I ended up converting the rows to JSON and
> update using JSONPath which is…. something I'd like to avoid 'pretty
> please' [image: simple_smile]
>
>
>
> On Thu, Sep 15, 2016 5:20 AM, Michael Allman mich...@videoamp.com wrote:
>
>> Hi Guys,
>>
>> Have you tried org.apache.spark.sql.catalyst.encoders.RowEncoder? It's
>> not a public API, but it is publicly accessible. I used it recently to
>> correct some bad data in a few nested columns in a dataframe. It wasn't an
>> easy job, but it made it possible. In my particular case I was not working
>> with arrays.
>>
>> Olivier, I'm interested in seeing what you come up with.
>>
>> Thanks,
>>
>> Michael
>>
>>
>> On Sep 14, 2016, at 10:44 AM, Fred Reiss <freiss@gmail.com> wrote:
>>
>> +1 to this request. I talked last week with a product group within IBM
>> that is struggling with the same issue. It's pretty common in data cleaning
>> applications for data in the early stages to have nested lists or sets
>> inconsistent or incomplete schema information.
>>
>> Fred
>>
>> On Tue, Sep 13, 2016 at 8:08 AM, Olivier Girardot <
>> o.girar...@lateral-thoughts.com> wrote:
>>
>> Hi everyone,
>> I'm currently trying to create a generic transformation mecanism on a
>> Dataframe to modify an arbitrary column regardless of the underlying the
>> schema.
>>
>> It's "relatively" straightforward for complex types like
>> struct<struct<…>> to apply an arbitrary UDF on the column and replace the
>> data "inside" the struct, however I'm struggling to make it work for
>> complex types containing arrays along the way like struct<array<struct<…>>>.
>>
>> Michael Armbrust seemed to allude on the mailing list/forum to a way of
>> using Encoders to do that, I'd be interested in any pointers, especially
>> considering that it's not possible to output any Row or
>> GenericRowWithSchema from a UDF (thanks to https://github.com/apache/spar
>> k/blob/v2.0.0/sql/catalyst/src/main/scala/org/apache/
>> spark/sql/catalyst/ScalaReflection.scala#L657 it seems).
>>
>> To sum up, I'd like to find a way to apply a transformation on complex
>> nested datatypes (arrays and struct) on a Dataframe updating the value
>> itself.
>>
>> Regards,
>>
>> *Olivier Girardot*
>>
>>
>>
>>
>
> *Olivier Girardot* | Associé
> o.girar...@lateral-thoughts.com
> +33 6 24 09 17 94
>


Re: [SQL] Why does spark.read.csv.cache give me a WARN about cache but not text?!

2016-08-16 Thread Michael Armbrust
try running explain on each of these.  my guess would be caching in broken
in some cases.

On Tue, Aug 16, 2016 at 6:05 PM, Jacek Laskowski  wrote:

> Hi,
>
> Can anyone explain why spark.read.csv("people.csv").cache.show ends up
> with a WARN while spark.read.text("people.csv").cache.show does not?
> It happens in 2.0 and today's build.
>
> scala> sc.version
> res5: String = 2.1.0-SNAPSHOT
>
> scala> spark.read.csv("people.csv").cache.show
> +-+-+---++
> |  _c0|  _c1|_c2| _c3|
> +-+-+---++
> |kolumna 1|kolumna 2|kolumn3|size|
> |Jacek| Warszawa| Polska|  40|
> +-+-+---++
>
> scala> spark.read.csv("people.csv").cache.show
> 16/08/16 18:01:52 WARN CacheManager: Asked to cache already cached data.
> +-+-+---++
> |  _c0|  _c1|_c2| _c3|
> +-+-+---++
> |kolumna 1|kolumna 2|kolumn3|size|
> |Jacek| Warszawa| Polska|  40|
> +-+-+---++
>
> scala> spark.read.text("people.csv").cache.show
> ++
> |   value|
> ++
> |kolumna 1,kolumna...|
> |Jacek,Warszawa,Po...|
> ++
>
> scala> spark.read.text("people.csv").cache.show
> ++
> |   value|
> ++
> |kolumna 1,kolumna...|
> |Jacek,Warszawa,Po...|
> ++
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re:

2016-08-14 Thread Michael Armbrust
You can force a broadcast, but with tables that large its probably not a
good idea.  However, filtering and then broadcasting one of the joins is
likely to get you the benefits of broadcasting (no shuffle on the larger
table that will colocate all the skewed tuples to a single overloaded
executor) without attempting to broadcast something thats too large.

On Sun, Aug 14, 2016 at 11:02 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Michael,
>
> As I understand broadcast joins, Jestin could also use broadcast
> function on a dataset to make it broadcast. Jestin could force the
> brodcast without the trick hoping it's gonna kick off brodcast.
> Correct?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sun, Aug 14, 2016 at 9:51 AM, Michael Armbrust
> <mich...@databricks.com> wrote:
> > Have you tried doing the join in two parts (id == 0 and id != 0) and then
> > doing a union of the results?  It is possible that with this technique,
> that
> > the join which only contains skewed data would be filtered enough to
> allow
> > broadcasting of one side.
> >
> > On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma <jestinwith.a...@gmail.com>
> > wrote:
> >>
> >> Hi, I'm currently trying to perform an outer join between two
> >> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id.
> >>
> >> df1.id is skewed in that there are many 0's, the rest being unique IDs.
> >>
> >> df2.id is not skewed. If I filter df1.id != 0, then the join works
> well.
> >> If I don't, then the join does not complete for a very, very long time.
> >>
> >> I have diagnosed this problem due to the hashpartitioning on IDs,
> >> resulting in one partition containing many values due to data skew. One
> >> executor ends up reading most of the shuffle data, and writing all of
> the
> >> shuffle data, as shown below.
> >>
> >>
> >>
> >>
> >>
> >> Shown above is the task in question assigned to one executor.
> >>
> >>
> >>
> >> This screenshot comes from one of the executors, showing one single
> thread
> >> spilling sort data since the executor cannot hold 90%+ of the ~200 GB
> result
> >> in memory.
> >>
> >> Moreover, looking at the event timeline, I find that the executor on
> that
> >> task spends about 20% time reading shuffle data, 70% computation, and
> 10%
> >> writing output data.
> >>
> >> I have tried the following:
> >>
> >> "Salting" the 0-value keys by monotonically_increasing_id().mod(N)
> >> - This doesn't seem to have an effect since now I have
> hundreds/thousands
> >> of keys with tens of thousands of occurrences.
> >> - Should I increase N? Is there a way to just do random.mod(N) instead
> of
> >> monotonically_increasing_id()?
> >>
> >> Repartitioning according to column I know contains unique values
> >>
> >> - This is overridden by Spark's sort-based shuffle manager which hash
> >> repartitions on the skewed column
> >>
> >> - Is it possible to change this? Or will the join column need to be
> hashed
> >> and partitioned on for joins to work
> >>
> >> Broadcasting does not work for my large tables
> >>
> >> Increasing/decreasing spark.sql.shuffle.partitions does not remedy the
> >> skewed data problem as 0-product values are still being hashed to the
> same
> >> partition.
> >>
> >>
> >> --
> >>
> >> What I am considering currently is doing the join at the RDD level, but
> is
> >> there any level of control which can solve my skewed data problem? Other
> >> than that, see the bolded question.
> >>
> >> I would appreciate any suggestions/tips/experience with this. Thank you!
> >>
> >
>


Re:

2016-08-14 Thread Michael Armbrust
Have you tried doing the join in two parts (id == 0 and id != 0) and then
doing a union of the results?  It is possible that with this technique,
that the join which only contains skewed data would be filtered enough to
allow broadcasting of one side.

On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma 
wrote:

> Hi, I'm currently trying to perform an outer join between two
> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id.
>
> df1.id is skewed in that there are many 0's, the rest being unique IDs.
>
> df2.id is not skewed. If I filter df1.id != 0, then the join works well.
> If I don't, then the join does not complete for a very, very long time.
>
> I have diagnosed this problem due to the hashpartitioning on IDs,
> resulting in one partition containing many values due to data skew. One
> executor ends up reading most of the shuffle data, and writing all of the
> shuffle data, as shown below.
>
>
>
>
>
> Shown above is the task in question assigned to one executor.
>
>
>
> This screenshot comes from one of the executors, showing one single thread
> spilling sort data since the executor cannot hold 90%+ of the ~200 GB
> result in memory.
>
> Moreover, looking at the event timeline, I find that the executor on that
> task spends about 20% time reading shuffle data, 70% computation, and 10%
> writing output data.
>
> I have tried the following:
>
>
>- "Salting" the 0-value keys by monotonically_increasing_id().mod(N)
>- - This doesn't seem to have an effect since now I have
>hundreds/thousands of keys with tens of thousands of occurrences.
>- - Should I increase N? Is there a way to just do random.mod(N)
>instead of monotonically_increasing_id()?
>-
>- Repartitioning according to column I know contains unique values
>-
>- - This is overridden by Spark's sort-based shuffle manager which
>hash repartitions on the skewed column
>-
>- - Is it possible to change this? Or will the join column need to be
>hashed and partitioned on for joins to work
>-
>- Broadcasting does not work for my large tables
>-
>- Increasing/decreasing spark.sql.shuffle.partitions does not remedy
>the skewed data problem as 0-product values are still being hashed to the
>same partition.
>
>
> --
>
> What I am considering currently is doing the join at the RDD level, but is
> there any level of control which can solve my skewed data problem? Other
> than that, see the bolded question.
>
> I would appreciate any suggestions/tips/experience with this. Thank you!
>
>


Re: call a mysql stored procedure from spark

2016-08-14 Thread Michael Armbrust
As described here
,
you can use the DataSource API to connect to an external database using
JDBC.  While the dbtable option is usually just a table name, it can also
be any valid SQL command that returns a table when enclosed in
(parentheses).  I'm not certain, but I'd expect you could use this feature
to invoke a stored procedure and return the results as a DataFrame.

On Sat, Aug 13, 2016 at 10:40 AM, sujeet jog  wrote:

> Hi,
>
> Is there a way to call a stored procedure using spark ?
>
>
> thanks,
> Sujeet
>


Re: Spark 2.0.0 JaninoRuntimeException

2016-08-14 Thread Michael Armbrust
Anytime you see JaninoRuntimeException you are seeing a bug in our code
generation.  If you can come up with a small example that causes the
problem it would be very helpful if you could open a JIRA.

On Fri, Aug 12, 2016 at 2:30 PM, dhruve ashar  wrote:

> I see a similar issue being resolved recently: https://issues.
> apache.org/jira/browse/SPARK-15285
>
> On Fri, Aug 12, 2016 at 3:33 PM, Aris  wrote:
>
>> Hello folks,
>>
>> I'm on Spark 2.0.0 working with Datasets -- and despite the fact that
>> smaller data unit tests work on my laptop, when I'm on a cluster, I get
>> cryptic error messages:
>>
>> Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method
>>> "(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/
>>> spark/sql/catalyst/InternalRow;)I" of class
>>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering"
>>> grows beyond 64 KB
>>>
>>
>> Unfortunately I'm not clear on how to even isolate the source of this
>> problem. I didn't have this problem in Spark 1.6.1.
>>
>> Any clues?
>>
>
>
>
> --
> -Dhruve Ashar
>
>


Re: [SQL] Why does (0 to 9).toDF("num").as[String] work?

2016-08-14 Thread Michael Armbrust
There are two type systems in play here.  Spark SQL's and Scala's.

>From the Scala side, this is type-safe.  After calling as[String]the
Dataset will only return Strings. It is impossible to ever get a class cast
exception unless you do your own incorrect casting after the fact.

Underneath the covers, calling as[String] will cause Spark SQL to
implicitly insert an "upcast".  An upcast will automatically perform safe
(lossless) casts (i.e., Int -> Long, Number -> String).  In the case where
there is no safe conversion, we'll throw an AnalysisException and require
you to explicitly do the conversion.  This upcasting happens when you
specify a primitive type or when you specify a more complicated class that
is mapping multiple columns to fields.

On Sat, Aug 13, 2016 at 1:17 PM, Jacek Laskowski  wrote:

> Hi,
>
> Just ran into it and can't explain why it works. Please help me understand
> it.
>
> Q1: Why can I `as[String]` with Ints? Is this type safe?
>
> scala> (0 to 9).toDF("num").as[String]
> res12: org.apache.spark.sql.Dataset[String] = [num: int]
>
> Q2: Why can I map over strings even though there are really ints?
>
> scala> (0 to 9).toDF("num").as[String].map(_.toUpperCase)
> res11: org.apache.spark.sql.Dataset[String] = [value: string]
>
> Why are the two lines possible?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Does Spark SQL support indexes?

2016-08-14 Thread Michael Armbrust
Using df.write.partitionBy is similar to a coarse-grained, clustered index
in a traditional database.  You can't use it on temporary tables, but it
will let you efficiently select small parts of a much larger table.

On Sat, Aug 13, 2016 at 11:13 PM, Jörn Franke  wrote:

> Use a format that has built-in indexes, such as Parquet or Orc. Do not
> forget to sort the data on the columns that your filter on.
>
> On 14 Aug 2016, at 05:03, Taotao.Li  wrote:
>
>
> hi, guys, does Spark SQL support indexes?  if so, how can I create an
> index on my temp table? if not, how can I handle some specific queries on a
> very large table? it would iterate all the table even though all I want is
> just a small piece of that table.
>
> great thanks,
>
>
> *___*
> Quant | Engineer | Boy
> *___*
> *blog*:http://litaotao.github.io
> 
> *github*: www.github.com/litaotao
>
>
>


Re: How to set nullable field when create DataFrame using case class

2016-08-04 Thread Michael Armbrust
Nullable is an optimization for Spark SQL.  It is telling spark to not even
do an if check when accessing that field.

In this case, your data *is* nullable, because timestamp is an object in
java and you could put null there.

On Thu, Aug 4, 2016 at 2:56 PM, luismattor  wrote:

> Hi all,
>
> Consider the following case:
>
> import java.sql.Timestamp
> case class MyProduct(t: Timestamp, a: Float)
> val rdd = sc.parallelize(List(MyProduct(new Timestamp(0), 10))).toDF()
> rdd.printSchema()
>
> The output is:
> root
>  |-- t: timestamp (nullable = true)
>  |-- a: float (nullable = false)
>
> How can I set the timestamp column to be NOT nullable?
>
> Regards,
> Luis
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-set-nullable-field-when-
> create-DataFrame-using-case-class-tp27479.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: error while running filter on dataframe

2016-07-31 Thread Michael Armbrust
You are hitting a bug in code generation.  If you can come up with a small
reproduction for the problem.  It would be very helpful if you could open a
JIRA.

On Sun, Jul 31, 2016 at 9:14 AM, Tony Lane  wrote:

> Can someone help me understand this error which occurs while running a
> filter on a dataframe
>
> 2016-07-31 21:01:57 ERROR CodeGenerator:91 - failed to compile:
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
> 117, Column 58: Expression "mapelements_isNull" is not an rvalue
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */   return new GeneratedIterator(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ /** Codegened pipeline for:
> /* 006 */ * TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#127L])
> /* 007 */ +- Project
> /* 008 */ +- Filter (is...
> /* 009 */   */
> /* 010 */ final class GeneratedIterator extends
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 011 */   private Object[] references;
> /* 012 */   private boolean agg_initAgg;
> /* 013 */   private boolean agg_bufIsNull;
> /* 014 */   private long agg_bufValue;
> /* 015 */   private scala.collection.Iterator inputadapter_input;
> /* 016 */   private Object[] deserializetoobject_values;
> /* 017 */   private org.apache.spark.sql.types.StructType
> deserializetoobject_schema;
> /* 018 */   private UnsafeRow deserializetoobject_result;
> /* 019 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> deserializetoobject_holder;
> /* 020 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> deserializetoobject_rowWriter;
> /* 021 */   private UnsafeRow mapelements_result;
> /* 022 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> mapelements_holder;
> /* 023 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> mapelements_rowWriter;
> /* 024 */   private Object[] serializefromobject_values;
> /* 025 */   private UnsafeRow serializefromobject_result;
> /* 026 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> serializefromobject_holder;
> /* 027 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> serializefromobject_rowWriter;
> /* 028 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> serializefromobject_rowWriter1;
> /* 029 */   private org.apache.spark.sql.execution.metric.SQLMetric
> filter_numOutputRows;
> /* 030 */   private UnsafeRow filter_result;
> /* 031 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> filter_holder;
> /* 032 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> filter_rowWriter;
> /* 033 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> filter_rowWriter1;
> /* 034 */   private org.apache.spark.sql.execution.metric.SQLMetric
> agg_numOutputRows;
> /* 035 */   private org.apache.spark.sql.execution.metric.SQLMetric
> agg_aggTime;
> /* 036 */   private UnsafeRow agg_result;
> /* 037 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
> /* 038 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> agg_rowWriter;
> /* 039 */
> /* 040 */   public GeneratedIterator(Object[] references) {
> /* 041 */ this.references = references;
> /* 042 */   }
> /* 043 */
>
>


Re: calling dataset.show on a custom object - displays toString() value as first column and blank for rest

2016-07-31 Thread Michael Armbrust
Can you share you code?  This does not happen for me

.

On Sun, Jul 31, 2016 at 7:16 AM, Rohit Chaddha 
wrote:

> I have a custom object called A and corresponding Dataset
>
> when I call datasetA.show() method i get the following
>
> +++-+-+---+
> |id|da|like|values|uid|
> +++-+-+---+
> |A.toString()...|
> |A.toString()...|
> |A.toString()...|
> |A.toString()...|
> |A.toString()...|
> |A.toString()...|
>
> that is A.toString() is called and displayed as value of the first column
> and rest all columns are blank
>
> Any suggestions what should be done to fix this ?
>
> - Rohit
>


Re: spark 2.0 readStream from a REST API

2016-07-31 Thread Michael Armbrust
You have to add a file in resource too (example
).
Either that or give a full class name.

On Sun, Jul 31, 2016 at 9:45 AM, Ayoub Benali 
wrote:

> Looks like the way to go in spark 2.0 is to implement StreamSourceProvider
> 
>  with DataSourceRegister
> .
> But now spark fails at loading the class when doing:
>
> spark.readStream.format("mysource").load()
>
> I get :
>
> java.lang.ClassNotFoundException: Failed to find data source: mysource.
> Please find packages at http://spark-packages.org
>
> Is there something I need to do in order to "load" the Stream source
> provider ?
>
> Thanks,
> Ayoub
>
> 2016-07-31 17:19 GMT+02:00 Jacek Laskowski :
>
>> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali
>>  wrote:
>>
>> > I started playing with the Structured Streaming API in spark 2.0 and I
>> am
>> > looking for a way to create streaming Dataset/Dataframe from a rest HTTP
>> > endpoint but I am bit stuck.
>>
>> What a great idea! Why did I myself not think about this?!?!
>>
>> > What would be the easiest way to hack around it ? Do I need to
>> implement the
>> > Datasource API ?
>>
>> Yes and perhaps Hadoop API too, but not sure which one exactly since I
>> haven't even thought about it (not even once).
>>
>> > Are there examples on how to create a DataSource from a REST endpoint ?
>>
>> Never heard of one.
>>
>> I'm hosting a Spark/Scala meetup this week so I'll definitely propose
>> it as a topic. Thanks a lot!
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>
>


Re: [Spark 2.0] Why MutableInt cannot be cast to MutableLong?

2016-07-31 Thread Michael Armbrust
Are you sure you are running Spark 2.0?

In your stack trace I see SqlNewHadoopRDD, which was removed in #12354
.

On Sun, Jul 31, 2016 at 2:12 AM, Chanh Le  wrote:

> Hi everyone,
> Why *MutableInt* cannot be cast to *MutableLong?*
> It’s really weird and seems Spark 2.0 has a lot of error with parquet
> about format.
>
> *org.apache.spark.sql.catalyst.expressions.MutableInt cannot be cast to
> org.apache.spark.sql.catalyst.expressions.MutableL ong*
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
> value at 0 in block 0 in file
> file:/data/etl-report/parquet/AD_COOKIE_REPORT/time=2016-07-
>
> 25-16/network_id=31713/part-r-0-9adbef89-f2f4-4836-a50c-a2e7b381d558.snappy.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
> at
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.MutableInt cannot be cast to
> org.apache.spark.sql.catalyst.expressions.MutableL
> ong
> at
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setLong(SpecificMutableRow.scala:295)
> at
> org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter$RowUpdater.setLong(CatalystRowConverter.scala:161)
> at
> org.apache.spark.sql.execution.datasources.parquet.CatalystPrimitiveConverter.addLong(CatalystRowConverter.scala:85)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:269)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:365)
> at
> org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
> ... 20 more
>


Re: libraryDependencies

2016-07-26 Thread Michael Armbrust
libraryDependencies  ++= Seq(
  // other dependencies here
  "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
  "org.apache.spark" %% "spark-mllib" % "1.6.2" % "provided",
  "org.scalanlp" %% "breeze" % "0.12",
  // native libraries are not included by default. add this if
you want them (as of 0.7)
  // native libraries greatly improve performance, but increase
jar sizes.
  "org.scalanlp" %% "breeze-natives" % "0.12",
)

On Tue, Jul 26, 2016 at 12:49 PM, Martin Somers <sono...@gmail.com> wrote:

> cheers - I updated
>
> libraryDependencies  ++= Seq(
>   // other dependencies here
>   "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
>   "org.apache.spark" %% "spark-mllib_2.10" % "1.6.2",
>   "org.scalanlp" %% "breeze" % "0.12",
>   // native libraries are not included by default. add this if
> you want them (as of 0.7)
>   // native libraries greatly improve performance, but
> increase jar sizes.
>   "org.scalanlp" %% "breeze-natives" % "0.12",
> )
>
> and getting similar error
>
> Compiling 1 Scala source to
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
> [error]
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
> [error] ^
> [error]
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:3:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.linalg.SingularValueDecomposition
> [error] ^
> [error]
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:5:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.linalg.{Vector, Vectors}
> [error]     ^
> [error]
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:8:
> not found: object breeze
>
> On Tue, Jul 26, 2016 at 8:36 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Also, you'll want all of the various spark versions to be the same.
>>
>> On Tue, Jul 26, 2016 at 12:34 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> If you are using %% (double) then you do not need _2.11.
>>>
>>> On Tue, Jul 26, 2016 at 12:18 PM, Martin Somers <sono...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> my build file looks like
>>>>
>>>> libraryDependencies  ++= Seq(
>>>>   // other dependencies here
>>>>   "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
>>>>   "org.apache.spark" %% "spark-mllib_2.11" % "1.6.0",
>>>>   "org.scalanlp" % "breeze_2.11" % "0.7",
>>>>   // native libraries are not included by default. add this
>>>> if you want them (as of 0.7)
>>>>   // native libraries greatly improve performance, but
>>>> increase jar sizes.
>>>>   "org.scalanlp" % "breeze-natives_2.11" % "0.7",
>>>> )
>>>>
>>>> not 100% sure on the version numbers if they are indeed correct
>>>> getting an error of
>>>>
>>>> [info] Resolving jline#jline;2.12.1 ...
>>>> [info] Done updating.
>>>> [info] Compiling 1 Scala source to
>>>> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
>>>> [error]
>>>> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
>>>> object mllib is not a member of package org.apache.spark
>>>> [error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
>>>> 
>>>> ...
>>>>
>>>>
>>>> Im trying to import in
>>>>
>>>> import org.apache.spark.mllib.linalg.distributed.RowMatrix
>>>> import org.apache.spark.mllib.linalg.SingularValueDecomposition
>>>>
>>>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>>>
>>>>
>>>> import breeze.linalg._
>>>> import breeze.linalg.{ Matrix => B_Matrix }
>>>> import breeze.linalg.{ Vector => B_Matrix }
>>>> import breeze.linalg.DenseMatrix
>>>>
>>>> object MyApp {
>>>>   def main(args: Array[String]): Unit = {
>>>> //code here
>>>> }
>>>>
>>>>
>>>> It might not be the correct way of doing this
>>>>
>>>> Anyone got any suggestion
>>>> tks
>>>> M
>>>>
>>>>
>>>>
>>>>
>>>
>>
>
>
> --
> M
>


Re: libraryDependencies

2016-07-26 Thread Michael Armbrust
Also, you'll want all of the various spark versions to be the same.

On Tue, Jul 26, 2016 at 12:34 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> If you are using %% (double) then you do not need _2.11.
>
> On Tue, Jul 26, 2016 at 12:18 PM, Martin Somers <sono...@gmail.com> wrote:
>
>>
>> my build file looks like
>>
>> libraryDependencies  ++= Seq(
>>   // other dependencies here
>>   "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
>>   "org.apache.spark" %% "spark-mllib_2.11" % "1.6.0",
>>   "org.scalanlp" % "breeze_2.11" % "0.7",
>>   // native libraries are not included by default. add this
>> if you want them (as of 0.7)
>>   // native libraries greatly improve performance, but
>> increase jar sizes.
>>   "org.scalanlp" % "breeze-natives_2.11" % "0.7",
>> )
>>
>> not 100% sure on the version numbers if they are indeed correct
>> getting an error of
>>
>> [info] Resolving jline#jline;2.12.1 ...
>> [info] Done updating.
>> [info] Compiling 1 Scala source to
>> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
>> [error]
>> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
>> object mllib is not a member of package org.apache.spark
>> [error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
>> 
>> ...
>>
>>
>> Im trying to import in
>>
>> import org.apache.spark.mllib.linalg.distributed.RowMatrix
>> import org.apache.spark.mllib.linalg.SingularValueDecomposition
>>
>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>
>>
>> import breeze.linalg._
>> import breeze.linalg.{ Matrix => B_Matrix }
>> import breeze.linalg.{ Vector => B_Matrix }
>> import breeze.linalg.DenseMatrix
>>
>> object MyApp {
>>   def main(args: Array[String]): Unit = {
>> //code here
>> }
>>
>>
>> It might not be the correct way of doing this
>>
>> Anyone got any suggestion
>> tks
>> M
>>
>>
>>
>>
>


Re: transtition SQLContext to SparkSession

2016-07-18 Thread Michael Armbrust
+ dev, reynold

Yeah, thats a good point.  I wonder if SparkSession.sqlContext should be
public/deprecated?

On Mon, Jul 18, 2016 at 8:37 AM, Koert Kuipers  wrote:

> in my codebase i would like to gradually transition to SparkSession, so
> while i start using SparkSession i also want a SQLContext to be available
> as before (but with a deprecated warning when i use it). this should be
> easy since SQLContext is now a wrapper for SparkSession.
>
> so basically:
> val session = SparkSession.builder.set(..., ...).getOrCreate()
> val sqlc = new SQLContext(session)
>
> however this doesnt work, the SQLContext constructor i am trying to use is
> private. SparkSession.sqlContext is also private.
>
> am i missing something?
>
> a non-gradual switch is not very realistic in any significant codebase,
> and i do not want to create SparkSession and SQLContext independendly (both
> from same SparkContext) since that can only lead to confusion and
> inconsistent settings.
>


Re: Saving Table with Special Characters in Columns

2016-07-11 Thread Michael Armbrust
This is protecting you from a limitation in parquet.  The library will let
you write out invalid files that can't be read back, so we added this check.

You can call .format("csv") (in spark 2.0) to switch it to CSV.

On Mon, Jul 11, 2016 at 11:16 AM, Tobi Bosede  wrote:

> Hi everyone,
>
> I am trying to save a data frame with special characters in the column
> names as a table in hive. However I am getting the following error. Is the
> only solution to rename all the columns? Or is there some argument that can
> be passed to into the saveAsTable() or write.parquet() functions to ignore
> special characters?
>
> Py4JJavaError: An error occurred while calling o2956.saveAsTable.
> : org.apache.spark.sql.AnalysisException: Attribute name "apple- 
> mail_duration" contains invalid character(s) among " ,;{}()\n\t=". Please use 
> alias to rename it.
>
>
> If not how can I simply write the data frame as a csv file?
>
> Thanks,
> Tobi
>
>
>


Re: DataFrame Min By Column

2016-07-09 Thread Michael Armbrust
I would guess that using the built in min/max/struct functions will be much
faster than a UDAF.  They should have native internal implementations that
utilize code generation.

On Sat, Jul 9, 2016 at 2:20 PM, Pedro Rodriguez <ski.rodrig...@gmail.com>
wrote:

> Thanks Michael,
>
> That seems like the analog to sorting tuples. I am curious, is there a
> significant performance penalty to the UDAF versus that? Its certainly
> nicer and more compact code at least.
>
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> <https://www.linkedin.com/in/pedrorodriguezscience>
>
> On July 9, 2016 at 2:19:11 PM, Michael Armbrust (mich...@databricks.com)
> wrote:
>
> You can do whats called an *argmax/argmin*, where you take the min/max of
> a couple of columns that have been grouped together as a struct.  We sort
> in column order, so you can put the timestamp first.
>
> Here is an example
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html>
> .
>
> On Sat, Jul 9, 2016 at 6:10 AM, Pedro Rodriguez <ski.rodrig...@gmail.com>
> wrote:
>
>> I implemented a more generic version which I posted here:
>> https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a
>>
>> I think I could generalize this by pattern matching on DataType to use
>> different getLong/getDouble/etc functions ( not trying to use getAs[]
>> because getting T from Array[T] is hard it seems).
>>
>> Is there a way to go further and make the arguments unnecessary or
>> inferable at runtime, particularly for the valueType since it doesn’t
>> matter what it is? DataType is abstract so I can’t instantiate it, is there
>> a way to define the method so that it pulls from the user input at runtime?
>>
>> Thanks,
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> <https://www.linkedin.com/in/pedrorodriguezscience>
>>
>> On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodrig...@gmail.com)
>> wrote:
>>
>> Hi Xinh,
>>
>> A co-worker also found that solution but I thought it was possibly
>> overkill/brittle so looks into UDAFs (user defined aggregate functions). I
>> don’t have code, but Databricks has a post that has an example
>> https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html.
>> From that, I was able to write a MinLongByTimestamp function, but was
>> having a hard time writing a generic aggregate to any column by an order
>> able column.
>>
>> Anyone know how you might go about using generics in a UDAF, or something
>> that would mimic union types to express that order able spark sql types are
>> allowed?
>>
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> <https://www.linkedin.com/in/pedrorodriguezscience>
>>
>> On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote:
>>
>> Hi Pedro,
>>
>> I could not think of a way using an aggregate. It's possible with a
>> window function, partitioned on user and ordered by time:
>>
>> // Assuming "df" holds your dataframe ...
>>
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.expressions.Window
>> val wSpec = Window.partitionBy("user").orderBy("time")
>> df.select($"user", $"time", rank().over(wSpec).as("rank"))
>>   .where($"rank" === 1)
>>
>> Xinh
>>
>> On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodrig...@gmail.com
>> > wrote:
>>
>>> Is there a way to on a GroupedData (from groupBy in DataFrame) to have
>>> an aggregate that returns column A based on a min of column B? For example,
>>> I have a list of sites visited by a given user and I would like to find the
>>> event with the minimum time (first event)
>>>
>>> Thanks,
>>> --
>>> Pedro Rodriguez
>>> PhD Student in Distributed Machine Learning | CU Boulder
>>> UC Berkeley AMPLab Alumni
>>>
>>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>>> Github: github.com/EntilZha | LinkedIn:
>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>
>>>
>>
>


Re: DataFrame Min By Column

2016-07-09 Thread Michael Armbrust
You can do whats called an *argmax/argmin*, where you take the min/max of a
couple of columns that have been grouped together as a struct.  We sort in
column order, so you can put the timestamp first.

Here is an example

.

On Sat, Jul 9, 2016 at 6:10 AM, Pedro Rodriguez 
wrote:

> I implemented a more generic version which I posted here:
> https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a
>
> I think I could generalize this by pattern matching on DataType to use
> different getLong/getDouble/etc functions ( not trying to use getAs[]
> because getting T from Array[T] is hard it seems).
>
> Is there a way to go further and make the arguments unnecessary or
> inferable at runtime, particularly for the valueType since it doesn’t
> matter what it is? DataType is abstract so I can’t instantiate it, is there
> a way to define the method so that it pulls from the user input at runtime?
>
> Thanks,
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> 
>
> On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodrig...@gmail.com)
> wrote:
>
> Hi Xinh,
>
> A co-worker also found that solution but I thought it was possibly
> overkill/brittle so looks into UDAFs (user defined aggregate functions). I
> don’t have code, but Databricks has a post that has an example
> https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html.
> From that, I was able to write a MinLongByTimestamp function, but was
> having a hard time writing a generic aggregate to any column by an order
> able column.
>
> Anyone know how you might go about using generics in a UDAF, or something
> that would mimic union types to express that order able spark sql types are
> allowed?
>
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> 
>
> On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.hu...@gmail.com) wrote:
>
> Hi Pedro,
>
> I could not think of a way using an aggregate. It's possible with a window
> function, partitioned on user and ordered by time:
>
> // Assuming "df" holds your dataframe ...
>
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.expressions.Window
> val wSpec = Window.partitionBy("user").orderBy("time")
> df.select($"user", $"time", rank().over(wSpec).as("rank"))
>   .where($"rank" === 1)
>
> Xinh
>
> On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez 
> wrote:
>
>> Is there a way to on a GroupedData (from groupBy in DataFrame) to have an
>> aggregate that returns column A based on a min of column B? For example, I
>> have a list of sites visited by a given user and I would like to find the
>> event with the minimum time (first event)
>>
>> Thanks,
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>


Re: Multiple aggregations over streaming dataframes

2016-07-07 Thread Michael Armbrust
We are planning to address this issue in the future.

At a high level, we'll have to add a delta mode so that updates can be
communicated from one operator to the next.

On Thu, Jul 7, 2016 at 8:59 AM, Arnaud Bailly 
wrote:

> Indeed. But nested aggregation does not work with Structured Streaming,
> that's the point. I would like to know if there is workaround, or what's
> the plan regarding this feature which seems to me quite useful. If the
> implementation is not overtly complex and it is just a matter of manpower,
> I am fine with devoting some time to it.
>
>
>
> --
> Arnaud Bailly
>
> twitter: abailly
> skype: arnaud-bailly
> linkedin: http://fr.linkedin.com/in/arnaudbailly/
>
> On Thu, Jul 7, 2016 at 2:17 PM, Sivakumaran S  wrote:
>
>> Arnauld,
>>
>> You could aggregate the first table and then merge it with the second
>> table (assuming that they are similarly structured) and then carry out the
>> second aggregation. Unless the data is very large, I don’t see why you
>> should persist it to disk. IMO, nested aggregation is more elegant and
>> readable than a complex single stage.
>>
>> Regards,
>>
>> Sivakumaran
>>
>>
>>
>> On 07-Jul-2016, at 1:06 PM, Arnaud Bailly  wrote:
>>
>> It's aggregation at multiple levels in a query: first do some aggregation
>> on one tavle, then join with another table and do a second aggregation. I
>> could probably rewrite the query in such a way that it does aggregation in
>> one pass but that would obfuscate the purpose of the various stages.
>> Le 7 juil. 2016 12:55, "Sivakumaran S"  a écrit :
>>
>>> Hi Arnauld,
>>>
>>> Sorry for the doubt, but what exactly is multiple aggregation? What is
>>> the use case?
>>>
>>> Regards,
>>>
>>> Sivakumaran
>>>
>>>
>>> On 07-Jul-2016, at 11:18 AM, Arnaud Bailly 
>>> wrote:
>>>
>>> Hello,
>>>
>>> I understand multiple aggregations over streaming dataframes is not
>>> currently supported in Spark 2.0. Is there a workaround? Out of the top of
>>> my head I could think of having a two stage approach:
>>>  - first query writes output to disk/memory using "complete" mode
>>>  - second query reads from this output
>>>
>>> Does this makes sense?
>>>
>>> Furthermore, I would like to understand what are the technical hurdles
>>> that are preventing Spark SQL from implementing multiple aggregation right
>>> now?
>>>
>>> Thanks,
>>> --
>>> Arnaud Bailly
>>>
>>> twitter: abailly
>>> skype: arnaud-bailly
>>> linkedin: http://fr.linkedin.com/in/arnaudbailly/
>>>
>>>
>>>
>>
>


Re: Logging trait in Spark 2.0

2016-06-28 Thread Michael Armbrust
I'd suggest using the slf4j APIs directly.  They provide a nice stable API
that works with a variety of logging backends.  This is what Spark does
internally.

On Sun, Jun 26, 2016 at 4:02 AM, Paolo Patierno  wrote:

> Yes ... the same here ... I'd like to know the best way for adding logging
> in a custom receiver for Spark Streaming 2.0
>
> *Paolo Patierno*
>
> *Senior Software Engineer (IoT) @ Red Hat**Microsoft MVP on **Windows
> Embedded & IoT*
> *Microsoft Azure Advisor*
>
> Twitter : @ppatierno 
> Linkedin : paolopatierno 
> Blog : DevExperience 
>
>
> --
> From: jonathaka...@gmail.com
> Date: Fri, 24 Jun 2016 20:56:40 +
> Subject: Re: Logging trait in Spark 2.0
> To: yuzhih...@gmail.com; ppatie...@live.com
> CC: user@spark.apache.org
>
>
> Ted, how is that thread related to Paolo's question?
>
> On Fri, Jun 24, 2016 at 1:50 PM Ted Yu  wrote:
>
> See this related thread:
>
>
> http://search-hadoop.com/m/q3RTtEor1vYWbsW=RE+Configuring+Log4J+Spark+1+5+on+EMR+4+1+
>
> On Fri, Jun 24, 2016 at 6:07 AM, Paolo Patierno 
> wrote:
>
> Hi,
>
> developing a Spark Streaming custom receiver I noticed that the Logging
> trait isn't accessible anymore in Spark 2.0.
>
> trait Logging in package internal cannot be accessed in package
> org.apache.spark.internal
>
> For developing a custom receiver what is the preferred way for logging ?
> Just using log4j dependency as any other Java/Scala library/application ?
>
> Thanks,
> Paolo
>
> *Paolo Patierno*
>
> *Senior Software Engineer (IoT) @ Red Hat**Microsoft MVP on **Windows
> Embedded & IoT*
> *Microsoft Azure Advisor*
>
> Twitter : @ppatierno 
> Linkedin : paolopatierno 
> Blog : DevExperience 
>
>
>


Re: cast only some columns

2016-06-21 Thread Michael Armbrust
Use `withColumn`.  It will replace a column if you give it the same name.

On Tue, Jun 21, 2016 at 4:16 AM, pseudo oduesp 
wrote:

> Hi ,
>  with fillna we can select  some columns to perform replace some values
>  with chosing columns with dict
> {columns :values }
> but how i can  do  same with cast i have data frame with 300 columns and i
> want just cats 4 from list columns  but with select query like that :
>
> df.select(columns1.cast(int),columns2.cast(int),columns3.cast(int))
>
> but i loose other columns
> how i can performe on some columns without loosing other ones ?
>
> thanks
>


Re: Spark 2.0: Unify DataFrames and Datasets question

2016-06-14 Thread Michael Armbrust
>
> 1) What does this really mean to an Application developer?
>

It means there are less concepts to learn.


> 2) Why this unification was needed in Spark 2.0?
>

To simplify the API and reduce the number of concepts that needed to be
learned.  We only didn't do it in 1.6 because we didn't want to break
binary compatibility in a minor release.


> 3) What changes can be observed in Spark 2.0 vs Spark 1.6?
>

There is no DataFrame class, all methods are still available, except those
that returned an RDD (now you can call df.rdd.map if that is still what you
want)


> 4) Compile time safety will be there for DataFrames too?
>

Slide 7


> 5) Python API is supported for Datasets in 2.0?
>

Slide 10


Re: Is there a limit on the number of tasks in one job?

2016-06-13 Thread Michael Armbrust
You might try with the Spark 2.0 preview.  We spent a bunch of time
improving the handling of many small files.

On Mon, Jun 13, 2016 at 11:19 AM, khaled.hammouda 
wrote:

> I'm trying to use Spark SQL to load json data that are split across about
> 70k
> files across 24 directories in hdfs, using
> sqlContext.read.json("hdfs:///user/hadoop/data/*/*").
>
> This doesn't seem to work for some reason, I get timeout errors like the
> following:
>
> ---
> 6/06/13 15:46:31 ERROR TransportChannelHandler: Connection to
> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 has been quiet for
> 12
> ms while there are outstanding requests. Assuming connection is dead;
> please
> adjust spark.network.timeout if this is wrong.
> 16/06/13 15:46:31 ERROR TransportResponseHandler: Still have 1 requests
> outstanding when connection from
> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 is closed
> ...
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> ...
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [120 seconds]
> --
>
> I don't want to start tinkering with increasing timeouts yet. I tried to
> load just one sub-directory, which contains around 4k files, and this seems
> to work fine. So I thought of writing a loop where I load the json files
> from each sub-dir and then unionAll the current dataframe with the previous
> dataframe. However, this also fails because apparently the json files don't
> have the exact same schema, causing this error:
>
> ---
> Traceback (most recent call last):
>   File "/home/hadoop/load_json.py", line 65, in 
> df = df.unionAll(hrdf)
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
> line 998, in unionAll
>   File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
> line 813, in __call__
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
> 51, in deco
> pyspark.sql.utils.AnalysisException: u"unresolved operator 'Union;"
> ---
>
> I'd like to know what's preventing Spark from loading 70k files the same
> way
> it's loading 4k files?
>
> To give you some idea about my setup and data:
> - ~70k files across 24 directories in HDFS
> - Each directory contains 3k files on average
> - Cluster: 200 nodes EMR cluster, each node has 53 GB memory and 8 cores
> available to YARN
> - Spark 1.6.1
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-limit-on-the-number-of-tasks-in-one-job-tp27158.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Thrift Server in CDH 5.3

2016-06-13 Thread Michael Armbrust
I'd try asking on the cloudera forums.

On Sun, Jun 12, 2016 at 9:51 PM, pooja mehta  wrote:

> Hi,
>
> How do I start Spark Thrift Server with cloudera CDH 5.3?
>
> Thanks.
>


Re: Spark 2.0: Unify DataFrames and Datasets question

2016-06-13 Thread Michael Armbrust
Here's a talk I gave on the topic:

https://www.youtube.com/watch?v=i7l3JQRx7Qw
http://www.slideshare.net/SparkSummit/structuring-spark-dataframes-datasets-and-streaming-by-michael-armbrust

On Mon, Jun 13, 2016 at 4:01 AM, Arun Patel <arunp.bigd...@gmail.com> wrote:

> In Spark 2.0, DataFrames and Datasets are unified. DataFrame is simply an
> alias for a Dataset of type row.   I have few questions.
>
> 1) What does this really mean to an Application developer?
> 2) Why this unification was needed in Spark 2.0?
> 3) What changes can be observed in Spark 2.0 vs Spark 1.6?
> 4) Compile time safety will be there for DataFrames too?
> 5) Python API is supported for Datasets in 2.0?
>
> Thanks
> Arun
>


Re: Spark 2.0 Streaming and Event Time

2016-06-09 Thread Michael Armbrust
There is no special setting for event time (though we will be adding one
for setting a watermark in 2.1 to allow us to reduce the amount of state
that needs to be kept around).  Just window/groupBy on the on the column
that is your event time.

On Wed, Jun 8, 2016 at 4:12 PM, Chang Lim  wrote:

> Hi All,
>
> Does Spark 2.0 Streaming [sqlContext.read.format(...).stream(...)] support
> Event Time?  In TD's Spark Summit talk yesterday, this is listed as a 2.0
> feature.  Of so, where is the API or how to set it?
>
> Thanks in advanced,
> Chang
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-Streaming-and-Event-Time-tp27120.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Seq.toDF vs sc.parallelize.toDF = no Spark job vs one - why?

2016-06-09 Thread Michael Armbrust
Look at the explain().  For a Seq we know its just local data so avoid
spark jobs for simple operations.  In contrast, an RDD is opaque to
catalyst so we can't perform that optimization.

On Wed, Jun 8, 2016 at 7:49 AM, Jacek Laskowski  wrote:

> Hi,
>
> I just noticed it today while toying with Spark 2.0.0 (today's build)
> that doing Seq(...).toDF does **not** submit a Spark job while
> sc.parallelize(Seq(...)).toDF does. I was nicely surprised and been
> thinking about the reason for the behaviour.
>
> My explanation was that Datasets are just a "view" layer atop data and
> when this data is local/in memory already there's no need to submit a
> job to...well...compute the data.
>
> I'd appreciate more in-depth answer, perhaps with links to the code.
> Thanks!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Dataset Outer Join vs RDD Outer Join

2016-06-06 Thread Michael Armbrust
That kind of stuff is likely fixed in 2.0.  If you can get a reproduction
working there it would be very helpful if you could open a JIRA.

On Mon, Jun 6, 2016 at 7:37 AM, Richard Marscher <rmarsc...@localytics.com>
wrote:

> A quick unit test attempt didn't get far replacing map with as[], I'm only
> working against 1.6.1 at the moment though, I was going to try 2.0 but I'm
> having a hard time building a working spark-sql jar from source, the only
> ones I've managed to make are intended for the full assembly fat jar.
>
>
> Example of the error from calling joinWith as left_outer and then
> .as[(Option[T], U]) where T and U are Int and Int.
>
> [info] newinstance(class scala.Tuple2,decodeusingserializer(input[0,
> StructType(StructField(_1,IntegerType,true),
> StructField(_2,IntegerType,true))],scala.Option,true),decodeusingserializer(input[1,
> StructType(StructField(_1,IntegerType,true),
> StructField(_2,IntegerType,true))],scala.Option,true),false,ObjectType(class
> scala.Tuple2),None)
> [info] :- decodeusingserializer(input[0,
> StructType(StructField(_1,IntegerType,true),
> StructField(_2,IntegerType,true))],scala.Option,true)
> [info] :  +- input[0, StructType(StructField(_1,IntegerType,true),
> StructField(_2,IntegerType,true))]
> [info] +- decodeusingserializer(input[1,
> StructType(StructField(_1,IntegerType,true),
> StructField(_2,IntegerType,true))],scala.Option,true)
> [info]+- input[1, StructType(StructField(_1,IntegerType,true),
> StructField(_2,IntegerType,true))]
>
> Cause: java.util.concurrent.ExecutionException: java.lang.Exception:
> failed to compile: org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 32, Column 60: No applicable constructor/method
> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
> candidates are: "public static java.nio.ByteBuffer
> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
> java.nio.ByteBuffer.wrap(byte[], int, int)"
>
> The generated code is passing InternalRow objects into the ByteBuffer
>
> Starting from two Datasets of types Dataset[(Int, Int)] with expression
> $"left._1" === $"right._1". I'll have to spend some time getting a better
> understanding of this analysis phase, but hopefully I can come up with
> something.
>
> On Wed, Jun 1, 2016 at 3:43 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Option should place nicely with encoders, but its always possible there
>> are bugs.  I think those function signatures are slightly more expensive
>> (one extra object allocation) and its not as java friendly so we probably
>> don't want them to be the default.
>>
>> That said, I would like to enable that kind of sugar while still taking
>> advantage of all the optimizations going on under the covers.  Can you get
>> it to work if you use `as[...]` instead of `map`?
>>
>> On Wed, Jun 1, 2016 at 11:59 AM, Richard Marscher <
>> rmarsc...@localytics.com> wrote:
>>
>>> Ah thanks, I missed seeing the PR for
>>> https://issues.apache.org/jira/browse/SPARK-15441. If the rows became
>>> null objects then I can implement methods that will map those back to
>>> results that align closer to the RDD interface.
>>>
>>> As a follow on, I'm curious about thoughts regarding enriching the
>>> Dataset join interface versus a package or users sugaring for themselves. I
>>> haven't considered the implications of what the optimizations datasets,
>>> tungsten, and/or bytecode gen can do now regarding joins so I may be
>>> missing a critical benefit there around say avoiding Options in favor of
>>> nulls. If nothing else, I guess Option doesn't have a first class Encoder
>>> or DataType yet and maybe for good reasons.
>>>
>>> I did find the RDD join interface elegant, though. In the ideal world an
>>> API comparable the following would be nice:
>>> https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06
>>>
>>>
>>> On Wed, Jun 1, 2016 at 1:42 PM, Michael Armbrust <mich...@databricks.com
>>> > wrote:
>>>
>>>> Thanks for the feedback.  I think this will address at least some of
>>>> the problems you are describing:
>>>> https://github.com/apache/spark/pull/13425
>>>>
>>>> On Wed, Jun 1, 2016 at 9:58 AM, Richard Marscher <
>>>> rmarsc...@localytics.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I've been working on transitioning from RDD to Datasets in our
>>>>> codebase in anticipation of

Re: Dataset Outer Join vs RDD Outer Join

2016-06-01 Thread Michael Armbrust
Option should place nicely with encoders, but its always possible there are
bugs.  I think those function signatures are slightly more expensive (one
extra object allocation) and its not as java friendly so we probably don't
want them to be the default.

That said, I would like to enable that kind of sugar while still taking
advantage of all the optimizations going on under the covers.  Can you get
it to work if you use `as[...]` instead of `map`?

On Wed, Jun 1, 2016 at 11:59 AM, Richard Marscher <rmarsc...@localytics.com>
wrote:

> Ah thanks, I missed seeing the PR for
> https://issues.apache.org/jira/browse/SPARK-15441. If the rows became
> null objects then I can implement methods that will map those back to
> results that align closer to the RDD interface.
>
> As a follow on, I'm curious about thoughts regarding enriching the Dataset
> join interface versus a package or users sugaring for themselves. I haven't
> considered the implications of what the optimizations datasets, tungsten,
> and/or bytecode gen can do now regarding joins so I may be missing a
> critical benefit there around say avoiding Options in favor of nulls. If
> nothing else, I guess Option doesn't have a first class Encoder or DataType
> yet and maybe for good reasons.
>
> I did find the RDD join interface elegant, though. In the ideal world an
> API comparable the following would be nice:
> https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06
>
>
> On Wed, Jun 1, 2016 at 1:42 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Thanks for the feedback.  I think this will address at least some of the
>> problems you are describing: https://github.com/apache/spark/pull/13425
>>
>> On Wed, Jun 1, 2016 at 9:58 AM, Richard Marscher <
>> rmarsc...@localytics.com> wrote:
>>
>>> Hi,
>>>
>>> I've been working on transitioning from RDD to Datasets in our codebase
>>> in anticipation of being able to leverage features of 2.0.
>>>
>>> I'm having a lot of difficulties with the impedance mismatches between
>>> how outer joins worked with RDD versus Dataset. The Dataset joins feel like
>>> a big step backwards IMO. With RDD, leftOuterJoin would give you Option
>>> types of the results from the right side of the join. This follows
>>> idiomatic Scala avoiding nulls and was easy to work with.
>>>
>>> Now with Dataset there is only joinWith where you specify the join type,
>>> but it lost all the semantics of identifying missing data from outer joins.
>>> I can write some enriched methods on Dataset with an implicit class to
>>> abstract messiness away if Dataset nulled out all mismatching data from an
>>> outer join, however the problem goes even further in that the values aren't
>>> always null. Integer, for example, defaults to -1 instead of null. Now it's
>>> completely ambiguous what data in the join was actually there versus
>>> populated via this atypical semantic.
>>>
>>> Are there additional options available to work around this issue? I can
>>> convert to RDD and back to Dataset but that's less than ideal.
>>>
>>> Thanks,
>>> --
>>> *Richard Marscher*
>>> Senior Software Engineer
>>> Localytics
>>> Localytics.com <http://localytics.com/> | Our Blog
>>> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics>
>>>  | Facebook <http://facebook.com/localytics> | LinkedIn
>>> <http://www.linkedin.com/company/1148792?trk=tyah>
>>>
>>
>>
>
>
> --
> *Richard Marscher*
> Senior Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>


Re: Dataset Outer Join vs RDD Outer Join

2016-06-01 Thread Michael Armbrust
Thanks for the feedback.  I think this will address at least some of the
problems you are describing: https://github.com/apache/spark/pull/13425

On Wed, Jun 1, 2016 at 9:58 AM, Richard Marscher 
wrote:

> Hi,
>
> I've been working on transitioning from RDD to Datasets in our codebase in
> anticipation of being able to leverage features of 2.0.
>
> I'm having a lot of difficulties with the impedance mismatches between how
> outer joins worked with RDD versus Dataset. The Dataset joins feel like a
> big step backwards IMO. With RDD, leftOuterJoin would give you Option types
> of the results from the right side of the join. This follows idiomatic
> Scala avoiding nulls and was easy to work with.
>
> Now with Dataset there is only joinWith where you specify the join type,
> but it lost all the semantics of identifying missing data from outer joins.
> I can write some enriched methods on Dataset with an implicit class to
> abstract messiness away if Dataset nulled out all mismatching data from an
> outer join, however the problem goes even further in that the values aren't
> always null. Integer, for example, defaults to -1 instead of null. Now it's
> completely ambiguous what data in the join was actually there versus
> populated via this atypical semantic.
>
> Are there additional options available to work around this issue? I can
> convert to RDD and back to Dataset but that's less than ideal.
>
> Thanks,
> --
> *Richard Marscher*
> Senior Software Engineer
> Localytics
> Localytics.com  | Our Blog
>  | Twitter  |
> Facebook  | LinkedIn
> 
>


Re: Map tuple to case class in Dataset

2016-06-01 Thread Michael Armbrust
That error looks like its caused by the class being defined in the repl
itself.  $line29.$read$ is the name of out outer object that is being used
to compile the line containing case class Test(a: Int).

Is this EMR or the Apache 1.6.1 release?

On Wed, Jun 1, 2016 at 8:05 AM, Tim Gautier <tim.gaut...@gmail.com> wrote:

> I spun up another EC2 cluster today with Spark 1.6.1 and I still get the
> error.
>
> scala>   case class Test(a: Int)
> defined class Test
>
> scala>   Seq(1,2).toDS.map(t => Test(t)).show
> 16/06/01 15:04:21 WARN scheduler.TaskSetManager: Lost task 39.0 in stage
> 0.0 (TID 39, ip-10-2-2-203.us-west-2.compute.internal):
> java.lang.NoClassDefFoundError: Could not initialize class $line29.$read$
> at
> $line33.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:35)
> at
> $line33.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:35)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 16/06/01 15:04:21 INFO scheduler.TaskSetManager: Starting task 39.1 in
> stage 0.0 (TID 40, ip-10-2-2-111.us-west-2.compute.internal, partition
> 39,PROCESS_LOCAL, 2386 bytes)
> 16/06/01 15:04:21 WARN scheduler.TaskSetManager: Lost task 19.0 in stage
> 0.0 (TID 19, ip-10-2-2-203.us-west-2.compute.internal):
> java.lang.ExceptionInInitializerError
> at $line29.$read$$iwC.(:7)
> at $line29.$read.(:24)
> at $line29.$read$.(:28)
> at $line29.$read$.()
> at
> $line33.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:35)
> at
> $line33.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:35)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at $line3.$read$$iwC$$iwC.(:15)
> at $line3.$read$$iwC.(:24)
> at $line3.$read.(:26)
> at $line3.$read$.(:30)
> at $line3.$read$.()
> ... 18 more
>
>
> On Tue, May 31, 2016 at 8:48 PM Tim Gautier <tim.gaut...@gmail.com> wrote:
>
>> That's really odd. I copied that code directly out of the shell and it
>> errored out on me, several times. I wonder if something I did previously
>> caused some instability. I'll see if it happens again tomorrow.
>>
>> On Tue, May 31, 2016, 8:37 PM Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Using spark-shell of 1.6.1 :
>>>
>>> scala> case class Test(a: Int)
>>> defined class Test
>>>
>>> scala> Seq(1,2).toDS.map(t => Test(t)).show
>>> +---+
>>> |  a|
>>> +---+
>>> |  1|
>>> |  2|
>>> +---+
>>>
>>> FYI
>>>
>>> On Tue, May 31, 2016 at 7:35 PM, Tim Gautier <tim.gaut...@gmail.com>
>>> wrote:
>>>
>>>> 1.6.1 The exception is a null pointer exception. I'll paste the whole
>>>> thing after I fire my cluster up again tomorrow.
>>>>
>>>> I take it by the responses that this is supposed to work?
>>>>
>>>> Anyone know when the next version is coming out? I ke

Re: Map tuple to case class in Dataset

2016-05-31 Thread Michael Armbrust
Version of Spark? What is the exception?

On Tue, May 31, 2016 at 4:17 PM, Tim Gautier  wrote:

> How should I go about mapping from say a Dataset[(Int,Int)] to a
> Dataset[]?
>
> I tried to use a map, but it throws exceptions:
>
> case class Test(a: Int)
> Seq(1,2).toDS.map(t => Test(t)).show
>
> Thanks,
> Tim
>


Re: Undocumented left join constraint?

2016-05-27 Thread Michael Armbrust
Sounds like: https://issues.apache.org/jira/browse/SPARK-15441, for which a
fix is in progress.

Please do keep reporting issues though, these are great!

Michael

On Fri, May 27, 2016 at 1:01 PM, Tim Gautier  wrote:

> Is it truly impossible to left join a Dataset[T] on the right if T has any
> non-option fields? It seems Spark tries to create Ts with null values in
> all fields when left joining, which results in null pointer exceptions. In
> fact, I haven't found any other way to get around this issue without making
> all fields in T options. Is there any other way?
>
> Example:
>
> case class Test(id: Int)
> val test1 = Seq(Test(1), Test(2), Test(3)).toDS
> val test2 = Seq(Test(2), Test(3), Test(4)).toDS
> test1.as("t1").joinWith(test2.as("t2"), $"t1.id" === $"t2.id",
> "left_outer").show
>
>


Re: HiveContext standalone => without a Hive metastore

2016-05-26 Thread Michael Armbrust
You can also just make sure that each user is using their own directory.  A
rough example can be found in TestHive.

Note: in Spark 2.0 there should be no need to use HiveContext unless you
need to talk to a metastore.

On Thu, May 26, 2016 at 1:36 PM, Mich Talebzadeh 
wrote:

> Well make sure than you set up a reasonable RDBMS as metastore. Ours is
> Oracle but you can get away with others. Check the supported list in
>
> hduser@rhes564:: :/usr/lib/hive/scripts/metastore/upgrade> ltr
> total 40
> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 postgres
> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 mysql
> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 mssql
> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 derby
> drwxr-xr-x 3 hduser hadoop 4096 May 20 18:44 oracle
>
> you have few good ones in the list.  In general the base tables (without
> transactional support) are around 55  (Hive 2) and don't take much space
> (depending on the volume of tables). I attached a E-R diagram.
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 26 May 2016 at 19:09, Gerard Maas  wrote:
>
>> Thanks a lot for the advice!.
>>
>> I found out why the standalone hiveContext would not work:  it was trying
>> to deploy a derby db and the user had no rights to create the dir where
>> there db is stored:
>>
>> Caused by: java.sql.SQLException: Failed to create database
>> 'metastore_db', see the next exception for details.
>>
>>at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
>> Source)
>>
>>at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>> Source)
>>
>>... 129 more
>>
>> Caused by: java.sql.SQLException: Directory
>> /usr/share/spark-notebook/metastore_db cannot be created.
>>
>>
>> Now, the new issue is that we can't start more than 1 context at the same
>> time. I think we will need to setup a proper metastore.
>>
>>
>> -kind regards, Gerard.
>>
>>
>>
>>
>> On Thu, May 26, 2016 at 3:06 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> To use HiveContext witch is basically an sql api within Spark without
>>> proper hive set up does not make sense. It is a super set of Spark
>>> SQLContext
>>>
>>> In addition simple things like registerTempTable may not work.
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 26 May 2016 at 13:01, Silvio Fiorito 
>>> wrote:
>>>
 Hi Gerard,



 I’ve never had an issue using the HiveContext without a hive-site.xml
 configured. However, one issue you may have is if multiple users are
 starting the HiveContext from the same path, they’ll all be trying to store
 the default Derby metastore in the same location. Also, if you want them to
 be able to persist permanent table metadata for SparkSQL then you’ll want
 to set up a true metastore.



 The other thing it could be is Hive dependency collisions from the
 classpath, but that shouldn’t be an issue since you said it’s standalone
 (not a Hadoop distro right?).



 Thanks,

 Silvio



 *From: *Gerard Maas 
 *Date: *Thursday, May 26, 2016 at 5:28 AM
 *To: *spark users 
 *Subject: *HiveContext standalone => without a Hive metastore



 Hi,



 I'm helping some folks setting up an analytics cluster with  Spark.

 They want to use the HiveContext to enable the Window functions on
 DataFrames(*) but they don't have any Hive installation, nor they need one
 at the moment (if not necessary for this feature)



 When we try to create a Hive context, we get the following error:



 > val sqlContext = new
 org.apache.spark.sql.hive.HiveContext(sparkContext)

 java.lang.RuntimeException: java.lang.RuntimeException: Unable to
 instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

at
 org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)



 Is my HiveContext failing b/c it wants to connect to an unconfigured
  Hive Metastore?



 Is there  a way to instantiate a HiveContext for the sake of Window
 support without an underlying Hive deployment?



 The docs are explicit in saying that that is should be the case: [1]



Re: feedback on dataset api explode

2016-05-25 Thread Michael Armbrust
These APIs predate Datasets / encoders, so that is why they are Row instead
of objects.  We should probably rethink that.

Honestly, I usually end up using the column expression version of explode
now that it exists (i.e. explode($"arrayCol").as("Item")).  It would be
great to understand more why you are using these instead.

On Wed, May 25, 2016 at 8:49 AM, Koert Kuipers  wrote:

> we currently have 2 explode definitions in Dataset:
>
>  def explode[A <: Product : TypeTag](input: Column*)(f: Row =>
> TraversableOnce[A]): DataFrame
>
>  def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f:
> A => TraversableOnce[B]): DataFrame
>
> 1) the separation of the functions into their own argument lists is nice,
> but unfortunately scala's type inference doesn't handle this well, meaning
> that the generic types always have to be explicitly provided. i assume this
> was done to allow the "input" to be a varargs in the first method, and then
> kept the same in the second for reasons of symmetry.
>
> 2) i am surprised the first definition returns a DataFrame. this seems to
> suggest DataFrame usage (so DataFrame to DataFrame), but there is no way to
> specify the output column names, which limits its usability for DataFrames.
> i frequently end up using the first definition for DataFrames anyhow
> because of the need to return more than 1 column (and the data has columns
> unknown at compile time that i need to carry along making flatMap on
> Dataset clumsy/unusable), but relying on the output columns being called _1
> and _2 and renaming then afterwards seems like an anti-pattern.
>
> 3) using Row objects isn't very pretty. why not f: A => TraversableOnce[B]
> or something like that for the first definition? how about:
>  def explode[A: TypeTag, B: TypeTag](input: Seq[Column], output:
> Seq[Column])(f: A => TraversableOnce[B]): DataFrame
>
> best,
> koert
>


<    1   2   3   4   5   6   7   8   9   10   >