Feedback: Feature request

2015-08-26 Thread Murphy, James
Hey all,

In working with the DecisionTree classifier, I found it difficult to extract 
rules that could easily facilitate visualization with libraries like D3.

So for example, using : print(model.toDebugString()), I get the following 
result =

   If (feature 0 = -35.0)
  If (feature 24 = 176.0)
Predict: 2.1
  If (feature 24 = 176.0)
Predict: 4.2
  Else (feature 24  176.0)
Predict: 6.3
Else (feature 0  -35.0)
  If (feature 24 = 11.0)
Predict: 4.5
  Else (feature 24  11.0)
Predict: 10.2

But ideally, I could see results in a more parseable format like JSON:


{

node: [

{

name:node1,

rule:feature 0 = -35.0,

children:[

{

  name:node2,

  rule:feature 24 = 176.0,

  children:[

  {

  name:node4,

  rule:feature 20  116.0,

  predict:  2.1

  },

  {

  name:node5,

  rule:feature 20 = 116.0,

  predict: 4.2

  },

  {

  name:node5,

  rule:feature 20  116.0,

  predict: 6.3

  }

  ]

},

{

name:node3,

rule:feature 0  -35.0,

  children:[

  {

  name:node7,

  rule:feature 3 = 11.0,

  predict: 4.5

  },

  {

  name:node8,

  rule:feature 3  11.0,

  predict: 10.2

  }

  ]

}



]

}

]

}

Food for thought!

Thanks,

Jim



Re: Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Interestingly, if there is nothing running on dev spark-shell, it recovers
successfully and regains the lost executors. Attaching the log for that.
Notice, the Registering block manager .. statements in the very end after
all executors were lost.

On Wed, Aug 26, 2015 at 11:27 AM, Sadhan Sood sadhan.s...@gmail.com wrote:

 Attaching log for when the dev job gets stuck (once all its executors are
 lost due to preemption). This is a spark-shell job running in yarn-client
 mode.

 On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Hi All,

 We've set up our spark cluster on aws running on yarn (running on hadoop
 2.3) with fair scheduling and preemption turned on. The cluster is shared
 for prod and dev work where prod runs with a higher fair share and can
 preempt dev jobs if there are not enough resources available for it.
 It appears that dev jobs which get preempted often get unstable after
 losing some executors and the whole jobs gets stuck (without making any
 progress) or end up getting restarted (and hence losing all the work done).
 Has someone encountered this before ? Is the solution just to set 
 spark.task.maxFailures
 to a really high value to recover from task failures in such scenarios? Are
 there other approaches that people have taken for spark multi tenancy that
 works better in such scenario?

 Thanks,
 Sadhan





spark_job_recovers.log
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Cody Koeninger
I'd be less concerned about what the streaming ui shows than what's
actually going on with the job.  When you say you were losing messages, how
were you observing that?  The UI, or actual job output?

The log lines you posted indicate that the checkpoint was restored and
those offsets were processed; what are the log lines for the following
KafkaRDD ?


On Wed, Aug 26, 2015 at 2:04 PM, Susan Zhang suchenz...@gmail.com wrote:

 Compared offsets, and it continues from checkpoint loading:

 15/08/26 11:24:54 INFO
 DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
 KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446),
 (install-json,4,825772921,825773536),
 (install-json,1,831654775,831655076),
 (install-json,0,1296018451,1296018810),
 (install-json,2,824785282,824785696), (install-json,3,
 811428882,811429181)]

 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json,
 partition 0 offsets 1296018451 - 1296018810
 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
 partition 4 offsets 825773536 - 825907428
 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
 partition 2 offsets 824785696 - 824889957
 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
 partition 3 offsets 811429181 - 811529084
 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
 partition 1 offsets 831655076 - 831729964
 ...

 But for some reason the streaming UI shows it as computing 0 events.

 Removing the call to checkpoint does remove the queueing of 0 event
 batches, since offsets just skip to the latest (checked that the first
 part.fromOffset in the restarted job is larger than the last
 part.untilOffset before restart).




 On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger c...@koeninger.org
 wrote:

 When the kafka rdd is actually being iterated on the worker, there should
 be an info line of the form

 log.info(sComputing topic ${part.topic}, partition
 ${part.partition}  +

   soffsets ${part.fromOffset} - ${part.untilOffset})


 You should be able to compare that to log of offsets during checkpoint
 loading, to see if they line up.

 Just out of curiosity, does removing the call to checkpoint on the stream
 affect anything?



 On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang suchenz...@gmail.com
 wrote:

 Thanks for the suggestions! I tried the following:

 I removed

 createOnError = true

 And reran the same process to reproduce. Double checked that checkpoint
 is loading:

 15/08/26 10:10:40 INFO
 DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
 KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
 (install-json,4,825400856,825401058),
 (install-json,1,831453228,831453396),
 (install-json,0,1295759888,1295760378),
 (install-json,2,824443526,82409), (install-json,3,
 811222580,811222874)]
 15/08/26 10:10:40 INFO
 DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
 KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791),
 (install-json,4,825401058,825401249),
 (install-json,1,831453396,831453603),
 (install-json,0,1295760378,1295760809),
 (install-json,2,82409,824445510), (install-json,3,
 811222874,811223285)]
 ...

 And the same issue is appearing as before (with 0 event batches getting
 queued corresponding to dropped messages). Our kafka brokers are on version
 0.8.2.0, if that makes a difference.

 Also as a sanity check, I took out the ZK updates and reran (just in
 case that was somehow causing problems), and that didn't change anything as
 expected.

 Furthermore, the 0 event batches seem to take longer to process than
 batches with the regular load of events: processing time for 0 event
 batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
 event batches is consistently  1s. Why would that happen?


 As for the checkpoint call:

 directKStream.checkpoint(checkpointDuration)

 was an attempt to set the checkpointing interval (at some multiple of
 the batch interval), whereas StreamingContext.checkpoint seems like it will
 only set the checkpoint directory.



 Thanks for all the help,

 Susan


 On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger c...@koeninger.org
 wrote:

 The first thing that stands out to me is
 createOnError = true

 Are you sure the checkpoint is actually loading, as opposed to failing
 and starting the job anyway?  There should be info lines that look like

 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
 Restoring KafkaRDD for time 144059718 ms [(test,1,162,220)


 You should be able to tell from those whether the offset ranges being
 loaded from the checkpoint look reasonable.

 Also, is there a reason you're calling

 directKStream.checkpoint(checkpointDuration)

 Just calling checkpoint on the streaming context should be sufficient
 to save the metadata



 On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang suchenz...@gmail.com
 wrote:


Re: Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
Using TABLESAMPLE(0.1) is actually way worse. Spark first spends 12 minutes
to discover all split files on all hosts (for some reason) before it even
starts the job, and then it creates 3.5 million tasks (the partition has
~32k split files).

On Wed, Aug 26, 2015 at 9:36 AM, Jörn Franke jornfra...@gmail.com wrote:


 Have you tried tablesample? You find the exact syntax in the
 documentation, but it exlxactly does what you want

 Le mer. 26 août 2015 à 18:12, Thomas Dudziak tom...@gmail.com a écrit :

 Sorry, I meant without reading from all splits. This is a single
 partition in the table.

 On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak tom...@gmail.com wrote:

 I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows
 from and I don't particularly care which rows. Doing a LIMIT unfortunately
 results in two stages where the first stage reads the whole table, and the
 second then performs the limit with a single worker, which is not very
 efficient.
 Is there a better way to sample a subset of rows in Spark without,
 ideally in a single stage without reading all partitions.

 cheers,
 Tom





query avro hive table in spark sql

2015-08-26 Thread gpatcham
Hi,

I'm trying to query hive table which is based on avro in spark SQL and
seeing below errors.

15/08/26 17:51:12 WARN avro.AvroSerdeUtils: Encountered AvroSerdeException
determining schema. Returning signal schema to indicate problem
org.apache.hadoop.hive.serde2.avro.AvroSerdeException: Neither
avro.schema.literal nor avro.schema.url specified, can't determine table
schema
at
org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrThrowException(AvroSerdeUtils.java:68)
at
org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrReturnErrorSchema(AvroSerdeUtils.java:93)
at
org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:60)
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:375)
at
org.apache.hadoop.hive.ql.metadata.Partition.getDeserializer(Partition.java:249)


Its not able to determine schema. Hive table is pointing to avro schema
using url. I'm stuck and couldn't find more info on this. 

Any pointers ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/query-avro-hive-table-in-spark-sql-tp24462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from
and I don't particularly care which rows. Doing a LIMIT unfortunately
results in two stages where the first stage reads the whole table, and the
second then performs the limit with a single worker, which is not very
efficient.
Is there a better way to sample a subset of rows in Spark without, ideally
in a single stage without reading all partitions.

cheers,
Tom


Re: Efficient sampling from a Hive table

2015-08-26 Thread Jörn Franke
Have you tried tablesample? You find the exact syntax in the documentation,
but it exlxactly does what you want

Le mer. 26 août 2015 à 18:12, Thomas Dudziak tom...@gmail.com a écrit :

 Sorry, I meant without reading from all splits. This is a single partition
 in the table.

 On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak tom...@gmail.com wrote:

 I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows
 from and I don't particularly care which rows. Doing a LIMIT unfortunately
 results in two stages where the first stage reads the whole table, and the
 second then performs the limit with a single worker, which is not very
 efficient.
 Is there a better way to sample a subset of rows in Spark without,
 ideally in a single stage without reading all partitions.

 cheers,
 Tom





Re: Spark 1.3.1 saveAsParquetFile hangs on app exit

2015-08-26 Thread cingram
spark-shell-hang-on-exit.tdump
http://apache-spark-user-list.1001560.n3.nabble.com/file/n24461/spark-shell-hang-on-exit.tdump
  





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-saveAsParquetFile-hangs-on-app-exit-tp24460p24461.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Susan Zhang
Thanks for the suggestions! I tried the following:

I removed

createOnError = true

And reran the same process to reproduce. Double checked that checkpoint is
loading:

15/08/26 10:10:40 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
(install-json,4,825400856,825401058), (install-json,1,831453228,831453396),
(install-json,0,1295759888,1295760378),
(install-json,2,824443526,82409), (install-json,3,
811222580,811222874)]
15/08/26 10:10:40 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791),
(install-json,4,825401058,825401249), (install-json,1,831453396,831453603),
(install-json,0,1295760378,1295760809),
(install-json,2,82409,824445510), (install-json,3,
811222874,811223285)]
...

And the same issue is appearing as before (with 0 event batches getting
queued corresponding to dropped messages). Our kafka brokers are on version
0.8.2.0, if that makes a difference.

Also as a sanity check, I took out the ZK updates and reran (just in case
that was somehow causing problems), and that didn't change anything as
expected.

Furthermore, the 0 event batches seem to take longer to process than
batches with the regular load of events: processing time for 0 event
batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
event batches is consistently  1s. Why would that happen?


As for the checkpoint call:

directKStream.checkpoint(checkpointDuration)

was an attempt to set the checkpointing interval (at some multiple of the
batch interval), whereas StreamingContext.checkpoint seems like it will
only set the checkpoint directory.



Thanks for all the help,

Susan


On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger c...@koeninger.org wrote:

 The first thing that stands out to me is
 createOnError = true

 Are you sure the checkpoint is actually loading, as opposed to failing and
 starting the job anyway?  There should be info lines that look like

 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
 Restoring KafkaRDD for time 144059718 ms [(test,1,162,220)


 You should be able to tell from those whether the offset ranges being
 loaded from the checkpoint look reasonable.

 Also, is there a reason you're calling

 directKStream.checkpoint(checkpointDuration)

 Just calling checkpoint on the streaming context should be sufficient to
 save the metadata



 On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang suchenz...@gmail.com wrote:

 Sure thing!

 The main looks like:


 --


 val kafkaBrokers = conf.getString(s$varPrefix.metadata.broker.list)

 val kafkaConf = Map(
   zookeeper.connect - zookeeper,
   group.id - options.group,
   zookeeper.connection.timeout.ms - 1,
   auto.commit.interval.ms - 1000,
   rebalance.max.retries - 25,
   bootstrap.servers - kafkaBrokers
 )

 val ssc = StreamingContext.getOrCreate(checkpointDirectory,
   () = {
 createContext(kafkaConf, checkpointDirectory, topic, numThreads,
 isProd)
   }, createOnError = true)

 ssc.start()
 ssc.awaitTermination()



 --


 And createContext is defined as:



 --


 val batchDuration = Seconds(5)
 val checkpointDuration = Seconds(20)

 private val AUTO_OFFSET_COMMIT = auto.commit.enable

 def createContext(kafkaConf: Map[String, String],
 checkpointDirectory: String,
 topic: String,
 numThreads: Int,
 isProd: Boolean)
   : StreamingContext = {

 val sparkConf = new SparkConf().setAppName(***)
 val ssc = new StreamingContext(sparkConf, batchDuration)
 ssc.checkpoint(checkpointDirectory)

 val topicSet = topic.split(,).toSet
 val groupId = kafkaConf.getOrElse(group.id, )

 val directKStream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
 directKStream.checkpoint(checkpointDuration)

 val table = ***

 directKStream.foreachRDD { rdd =
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd.flatMap(rec = someFunc(rec))
 .reduceByKey((i1: Long, i2: Long) = if (i1  i2) i1 else i2)
 .foreachPartition { partitionRec =
   val dbWrite = DynamoDBWriter()
   partitionRec.foreach {
 /* Update Dynamo Here */
   }
 }

   /** Set up ZK Connection **/
   val props = new Properties()
   kafkaConf.foreach(param = props.put(param._1, param._2))

   props.setProperty(AUTO_OFFSET_COMMIT, false)

   val consumerConfig = new 

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Susan Zhang
Compared offsets, and it continues from checkpoint loading:

15/08/26 11:24:54 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446),
(install-json,4,825772921,825773536), (install-json,1,831654775,831655076),
(install-json,0,1296018451,1296018810),
(install-json,2,824785282,824785696), (install-json,3,
811428882,811429181)]

15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json,
partition 0 offsets 1296018451 - 1296018810
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 4 offsets 825773536 - 825907428
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 2 offsets 824785696 - 824889957
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 3 offsets 811429181 - 811529084
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 1 offsets 831655076 - 831729964
...

But for some reason the streaming UI shows it as computing 0 events.

Removing the call to checkpoint does remove the queueing of 0 event
batches, since offsets just skip to the latest (checked that the first
part.fromOffset in the restarted job is larger than the last
part.untilOffset before restart).




On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger c...@koeninger.org wrote:

 When the kafka rdd is actually being iterated on the worker, there should
 be an info line of the form

 log.info(sComputing topic ${part.topic}, partition ${part.partition}
  +

   soffsets ${part.fromOffset} - ${part.untilOffset})


 You should be able to compare that to log of offsets during checkpoint
 loading, to see if they line up.

 Just out of curiosity, does removing the call to checkpoint on the stream
 affect anything?



 On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang suchenz...@gmail.com wrote:

 Thanks for the suggestions! I tried the following:

 I removed

 createOnError = true

 And reran the same process to reproduce. Double checked that checkpoint
 is loading:

 15/08/26 10:10:40 INFO
 DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
 KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
 (install-json,4,825400856,825401058),
 (install-json,1,831453228,831453396),
 (install-json,0,1295759888,1295760378),
 (install-json,2,824443526,82409), (install-json,3,
 811222580,811222874)]
 15/08/26 10:10:40 INFO
 DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
 KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791),
 (install-json,4,825401058,825401249),
 (install-json,1,831453396,831453603),
 (install-json,0,1295760378,1295760809),
 (install-json,2,82409,824445510), (install-json,3,
 811222874,811223285)]
 ...

 And the same issue is appearing as before (with 0 event batches getting
 queued corresponding to dropped messages). Our kafka brokers are on version
 0.8.2.0, if that makes a difference.

 Also as a sanity check, I took out the ZK updates and reran (just in case
 that was somehow causing problems), and that didn't change anything as
 expected.

 Furthermore, the 0 event batches seem to take longer to process than
 batches with the regular load of events: processing time for 0 event
 batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
 event batches is consistently  1s. Why would that happen?


 As for the checkpoint call:

 directKStream.checkpoint(checkpointDuration)

 was an attempt to set the checkpointing interval (at some multiple of the
 batch interval), whereas StreamingContext.checkpoint seems like it will
 only set the checkpoint directory.



 Thanks for all the help,

 Susan


 On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger c...@koeninger.org
 wrote:

 The first thing that stands out to me is
 createOnError = true

 Are you sure the checkpoint is actually loading, as opposed to failing
 and starting the job anyway?  There should be info lines that look like

 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
 Restoring KafkaRDD for time 144059718 ms [(test,1,162,220)


 You should be able to tell from those whether the offset ranges being
 loaded from the checkpoint look reasonable.

 Also, is there a reason you're calling

 directKStream.checkpoint(checkpointDuration)

 Just calling checkpoint on the streaming context should be sufficient to
 save the metadata



 On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang suchenz...@gmail.com
 wrote:

 Sure thing!

 The main looks like:


 --


 val kafkaBrokers = conf.getString(s$varPrefix.metadata.broker.list)

 val kafkaConf = Map(
   zookeeper.connect - zookeeper,
   group.id - options.group,
   zookeeper.connection.timeout.ms - 1,
   auto.commit.interval.ms - 1000,
   rebalance.max.retries - 25,
   bootstrap.servers - 

Re: Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Attaching log for when the dev job gets stuck (once all its executors are
lost due to preemption). This is a spark-shell job running in yarn-client
mode.

On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood sadhan.s...@gmail.com wrote:

 Hi All,

 We've set up our spark cluster on aws running on yarn (running on hadoop
 2.3) with fair scheduling and preemption turned on. The cluster is shared
 for prod and dev work where prod runs with a higher fair share and can
 preempt dev jobs if there are not enough resources available for it.
 It appears that dev jobs which get preempted often get unstable after
 losing some executors and the whole jobs gets stuck (without making any
 progress) or end up getting restarted (and hence losing all the work done).
 Has someone encountered this before ? Is the solution just to set 
 spark.task.maxFailures
 to a really high value to recover from task failures in such scenarios? Are
 there other approaches that people have taken for spark multi tenancy that
 works better in such scenario?

 Thanks,
 Sadhan



spark_job_stuck.log
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Can RDD be shared accross the cluster by other drivers?

2015-08-26 Thread Tao Lu
Hi, Guys,

Is it possible that RDD created by driver A be used driver B?

Thanks!


Re: Question on take function - Spark Java API

2015-08-26 Thread Pankaj Wahane
Thanks Sonal.. I shall try doing that..

 On 26-Aug-2015, at 1:05 pm, Sonal Goyal sonalgoy...@gmail.com wrote:
 
 You can try using wholeTextFile which will give you a pair rdd of fileName, 
 content. flatMap through this and manipulate the content. 
 
 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co/ 
 Check out Reifier at Spark Summit 2015 
 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/
 
  http://in.linkedin.com/in/sonalgoyal
 
 
 
 On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane pankaj.wah...@qiotec.com 
 mailto:pankaj.wah...@qiotec.com wrote:
 Hi community members,
 
 
 Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
 
 Question:
 
 I have multiple files in a folder and and the first line in each file is 
 name of the asset that the file belongs to. Second line is csv header row 
 and data starts from third row..
 
 Ex: File 1
 
 TestAsset01
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,123,456,789
 11-01-2015 15:00:01,123,456,789
 . . .
 
 Ex: File 2
 
 TestAsset02
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,1230,4560,7890
 11-01-2015 15:00:01,1230,4560,7890
 . . .
 
 I have got nearly 1000 files in each folder sizing ~10G
 
 I am using apache spark Java api to read all this files.
 
 Following is code extract that I am using:
 
 try (JavaSparkContext sc = new JavaSparkContext(conf)) {
 MapString, String readingTypeMap = getReadingTypesMap(sc);
 //Read File
 JavaRDDString data = 
 sc.textFile(resourceBundle.getString(FOLDER_NAME));
 //Get Asset
 String asset = data.take(1).get(0);
 //Extract Time Series Data
 JavaRDDString actualData = data.filter(line - 
 line.contains(DELIMERTER));
 //Strip header
 String header = actualData.take(1).get(0);
 String[] headers = header.split(DELIMERTER);
 //Extract actual data
 JavaRDDString timeSeriesLines = actualData.filter(line - 
 !line.equals(header));
 //Extract valid records
 JavaRDDString validated = timeSeriesLines.filter(line - 
 validate(line));
 //Find Granularity
 Integer granularity = 
 toInt(resourceBundle.getString(GRANULARITY));
 //Transform to TSD objects
 JavaRDDTimeSeriesData tsdFlatMap = 
 transformTotimeSeries(validated, asset, readingTypeMap, headers, 
 granularity);
 
 //Save to Cassandra
 
 javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace),
 time_series_data, 
 mapToRow(TimeSeriesData.class)).saveToCassandra();
 
 System.out.println(Total Records:  + timeSeriesLines.count());
 System.out.println(Valid Records:  + validated.count());
 }
 Within TimeSeriesData Object I need to set the asset name for the reading, 
 so I need output of data.take(1) to be different for different files.
 
 
 Thank You.
 
 Best Regards,
 Pankaj
 
 
 
 
 QIO Technologies Limited is a limited company registered in England  Wales 
 at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 
 
 This message and the information contained within it is intended solely for 
 the addressee and may contain confidential or privileged information. If you 
 have received this message in error please notify QIO Technologies Limited 
 immediately and then permanently delete this message. If you are not the 
 intended addressee then you must not copy, transmit, disclose or rely on the 
 information contained in this message or in any attachment to it, all such 
 use is prohibited to maximum extent possible by law.
 
 


-- 


QIO Technologies Limited is a limited company registered in England  Wales 
at 1 Curzon Street, London, England, W1J 5HD, with registered number 
09368431 

This message and the information contained within it is intended solely for 
the addressee and may contain confidential or privileged information. If 
you have received this message in error please notify QIO Technologies 
Limited immediately and then permanently delete this message. If you are 
not the intended addressee then you must not copy, transmit, disclose or 
rely on the information contained in this message or in any attachment to 
it, all such use is prohibited to maximum extent possible by law.


Re: SparkSQL saveAsParquetFile does not preserve AVRO schema

2015-08-26 Thread storm
Note:
In the code (org.apache.spark.sql.parquet.DefaultSource) I've found this:

val relation = if (doInsertion) {
  // This is a hack. We always set
nullable/containsNull/valueContainsNull to true
  // for the schema of a parquet data.
  val df =
sqlContext.createDataFrame(
  data.queryExecution.toRdd,
  data.schema.asNullable)
  val createdRelation =
createRelation(sqlContext, parameters,
df.schema).asInstanceOf[ParquetRelation2]
  createdRelation.insert(df, overwrite = mode == SaveMode.Overwrite)
  createdRelation
}

The culprit is data.schema.asNullable. What's the real reason for this?
Why not simply use the existing schema nullable flags?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-saveAsParquetFile-does-not-preserve-AVRO-schema-tp2p24454.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



reduceByKey not working on JavaPairDStream

2015-08-26 Thread Deepesh Maheshwari
Hi,
I have applied mapToPair and then a reduceByKey on a DStream to obtain a
JavaPairDStreamString, MapString, Object.
I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained
above.
But i do not see any logs from reduceByKey operation.
Can anyone explain why is this happening..?


find My Code Below -



* /***   * GroupLevel1 Groups - articleId, host and tags
 */*
JavaPairDStreamString, MapString, Object groupLevel1 =
inputDataMap

.mapToPair(
new PairFunctionMapString, Object, String,
MapString, Object() {

private static final long serialVersionUID =
5196132687044875422L;

@Override
public Tuple2String, MapString, Object call(

MapString, Object map) throws
Exception {
String host = (String) map.get(host);
String articleId = (String)
map.get(articleId);
List tags = (List) map.get(tags);

if (host == null || articleId == null) {
logger.error(*** Error Doc
\n + map);
}
String key = articleId_ + articleId +
_host_ + host + _tags_ + tags.toString();

//logger.info(key);
System.out.println(Printing Key -  + key);

map.put(articlecount, 1L);

return new Tuple2String, MapString,
Object(key, map);
}
})
.reduceByKey(
new Function2MapString, Object, MapString,
Object, MapString, Object() {

private static final long serialVersionUID = 1L;


@Override
public MapString, Object call(
MapString, Object map1,
MapString, Object map2) throws
Exception {
Long count1 = (Long)
map1.get(articlecount);
Long count2 = (Long)
map2.get(articlecount);

map1.put(articlecount, count1 + count2);
return map1;
}
});













*/***   * Grouping level 1 groups on articleId+host+tags
   * Tags can be multiple for an article.   * Grouping level 2
does -   *  1. For each tag in a row, find occurrence of that tag
in other rows.   *  2. If one tag found in another row, then add
the articleCount of current and new row and put as articleCount for that
tag.   *  Note -   *  Idea behind this grouping is to
get all article counts that contain a particular tag and preserve this
value.   */*


JavaPairDStreamString, MapString, Object groupLevel2 =
groupLevel1.flatMapToPair(new PairFlatMapFunctionTuple2String,
MapString, Object, String, MapString, Object() {
@Override
public IterableTuple2String, MapString, Object
call(Tuple2String, MapString, Object stringMapTuple2) throws Exception {

System.out.println(group level 2 tuple 1 - +
stringMapTuple2._1());
System.out.println(group level 2 tuple 2 - +
stringMapTuple2._2());
ArrayListString tagList = (ArrayListString)
stringMapTuple2._2().get(tags);
ArrayList tagKeyList = new ArrayList();
String host = (String) stringMapTuple2._2().get(host);
StringBuilder key;
for (String tag : tagList) {
key = new
StringBuilder(host_).append(host).append(_tag_).append(tag);
System.out.println(generated Key - +key);
tagKeyList.add(new Tuple2String, MapString,
Object(key.toString(), stringMapTuple2._2()));
}
return tagKeyList;
}
});

groupLevel2 = groupLevel2.reduceByKey(new Function2MapString,
Object, MapString, Object, MapString, Object() {
@Override
public MapString, Object call(MapString, Object dataMap1,
MapString, Object dataMap2) throws Exception {
System.out.println(Type of article map in 1  +
dataMap1.get(articleId).getClass());
System.out.println(Type of article map in 2  +
dataMap2.get(articleId).getClass());
MapString, String articleMap1 = (MapString, String)
dataMap1.get(articleId);
MapString, String articleMap2 = (MapString, String)
dataMap2.get(articleId);

if (articleMap1 == null || articleMap1.isEmpty()) {
System.out.println(returning because map 1 null);
 

BlockNotFoundException when running spark word count on Tachyon

2015-08-26 Thread Todd

I am using tachyon in the spark program below,but I encounter a 
BlockNotFoundxception.
Does someone know what's wrong and also is there guide on how to configure 
spark to work with Tackyon?Thanks!

conf.set(spark.externalBlockStore.url, tachyon://10.18.19.33:19998)
conf.set(spark.externalBlockStore.baseDir,/spark)
val sc = new SparkContext(conf)
import org.apache.spark.storage.StorageLevel
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd.persist(StorageLevel.OFF_HEAP)
val count = rdd.count()
   val sum = rdd.reduce(_ + _)
println(sThe count: $count, The sum is: $sum)


15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in 
stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 
5, localhost): java.lang.RuntimeException: 
org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found
at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)






Re: MLlib Prefixspan implementation

2015-08-26 Thread alexis GILLAIN
A first use case of gap constraint is included in the article.
Another application would be customer-shopping sequence analysis where you
want to put a constraint on the duration between two purchases for them to
be considered as a pertinent sequence.

Additional question regarding the code : what's the point of using
ReversedPrefix
in localprefispan ? The prefix is used neither in finding frequent items of
a projected database or computing a new projected database so it looks like
it's appended in inverse order just to be reversed when transformed to a
sequence.

2015-08-25 12:15 GMT+08:00 Feynman Liang fli...@databricks.com:

 CCing the mailing list again.

 It's currently not on the radar. Do you have a use case for it? I can
 bring it up during 1.6 roadmap planning tomorrow.

 On Mon, Aug 24, 2015 at 8:28 PM, alexis GILLAIN ila...@hotmail.com
 wrote:

 Hi,

 I just realized the article I mentioned is cited in the jira and not in
 the code so I guess you didn't use this result.

 Do you plan to implement sequence with timestamp and gap constraint as in
 :

 https://people.mpi-inf.mpg.de/~rgemulla/publications/miliaraki13mg-fsm.pdf

 2015-08-25 7:06 GMT+08:00 Feynman Liang fli...@databricks.com:

 Hi Alexis,

 Unfortunately, both of the papers you referenced appear to be
 translations and are quite difficult to understand. We followed
 http://doi.org/10.1109/ICDE.2001.914830 when implementing PrefixSpan.
 Perhaps you can find the relevant lines in there so I can elaborate further?

 Feynman

 On Thu, Aug 20, 2015 at 9:07 AM, alexis GILLAIN ila...@hotmail.com
 wrote:

 I want to use prefixspan so I had a look at the code and the cited
 paper : Distributed PrefixSpan Algorithm Based on MapReduce.

 There is a result in the paper I didn't really undertstand and I
 could'nt find where it is used in the code.

 Suppose a sequence database S = {­1­,2...­n}, a sequence a... is a
 length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a
 prefix of a length-(L-1) sequential pattern a...a, when the support count
 of a is not less than min_support, it is equal to obtaining a length-L
 sequential pattern  a ... a  from projected databases that obtaining a
 length-L sequential pattern  a ... a  from a sequence database S.

 According to the paper It's supposed to add a pruning step in the
 reduce function but I couldn't find where.

 This result seems to come from a previous paper : Wang Linlin, Fan
 Jun. Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan
 [J]. Computer Engineering, 2009, 35(23): 56-61 but it didn't help me to
 understand it and how it can improve the algorithm.







Re: BlockNotFoundException when running spark word count on Tachyon

2015-08-26 Thread Dibyendu Bhattacharya
Sometime back I was playing with Spark and Tachyon and I also found this
issue .  The issue here is TachyonBlockManager put the blocks in
WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted
from Tachyon Cache when Memory is full and when Spark try to find the block
it throws  BlockNotFoundException .

To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon
-project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have
worked and I did not see any any Spark Job failed due to
BlockNotFoundException.
below is my  Hierarchical Storage settings which I used..

  -Dtachyon.worker.hierarchystore.level.max=2
  -Dtachyon.worker.hierarchystore.level0.alias=MEM
  -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER

-Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
  -Dtachyon.worker.hierarchystore.level1.alias=HDD
  -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
  -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
  -Dtachyon.worker.allocate.strategy=MAX_FREE
  -Dtachyon.worker.evict.strategy=LRU

Regards,
Dibyendu

On Wed, Aug 26, 2015 at 12:25 PM, Todd bit1...@163.com wrote:


 I am using tachyon in the spark program below,but I encounter a
 BlockNotFoundxception.
 Does someone know what's wrong and also is there guide on how to configure
 spark to work with Tackyon?Thanks!

 conf.set(spark.externalBlockStore.url, tachyon://10.18.19.33:19998
 )
 conf.set(spark.externalBlockStore.baseDir,/spark)
 val sc = new SparkContext(conf)
 import org.apache.spark.storage.StorageLevel
 val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
 rdd.persist(StorageLevel.OFF_HEAP)
 val count = rdd.count()
val sum = rdd.reduce(_ + _)
 println(sThe count: $count, The sum is: $sum)


 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
 have all completed, from pool
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage
 0.0 (TID 5, localhost): java.lang.RuntimeException:
 org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
 at
 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
 at
 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
 at
 

RE: SparkR: exported functions

2015-08-26 Thread Felix Cheung
I believe that is done explicitly while the final API is being figured out.
For the moment you could use DataFrame read.df()
 
 From: csgilles...@gmail.com
 Date: Tue, 25 Aug 2015 18:26:50 +0100
 Subject: SparkR: exported functions
 To: user@spark.apache.org
 
 Hi,
 
 I've just started playing about with SparkR (Spark 1.4.1), and noticed
 that a number of the functions haven't been exported. For example,
 the textFile function
 
 https://github.com/apache/spark/blob/master/R/pkg/R/context.R
 
 isn't exported, i.e. the function isn't in the NAMESPACE file. This is 
 obviously
 due to the ' missing in the roxygen2 directives.
 
 Is this intentional?
 
 Thanks
 
 Colin
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

Re: BlockNotFoundException when running spark word count on Tachyon

2015-08-26 Thread Dibyendu Bhattacharya
The URL seems to have changed .. here is the one ..
http://tachyon-project.org/documentation/Tiered-Storage-on-Tachyon.html



On Wed, Aug 26, 2015 at 12:32 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Sometime back I was playing with Spark and Tachyon and I also found this
 issue .  The issue here is TachyonBlockManager put the blocks in
 WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted
 from Tachyon Cache when Memory is full and when Spark try to find the
 block it throws  BlockNotFoundException .

 To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon
 -project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have
 worked and I did not see any any Spark Job failed due to 
 BlockNotFoundException.
 below is my  Hierarchical Storage settings which I used..

   -Dtachyon.worker.hierarchystore.level.max=2
   -Dtachyon.worker.hierarchystore.level0.alias=MEM
   -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER

 -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
   -Dtachyon.worker.hierarchystore.level1.alias=HDD
   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
   -Dtachyon.worker.allocate.strategy=MAX_FREE
   -Dtachyon.worker.evict.strategy=LRU

 Regards,
 Dibyendu

 On Wed, Aug 26, 2015 at 12:25 PM, Todd bit1...@163.com wrote:


 I am using tachyon in the spark program below,but I encounter a
 BlockNotFoundxception.
 Does someone know what's wrong and also is there guide on how to
 configure spark to work with Tackyon?Thanks!

 conf.set(spark.externalBlockStore.url, tachyon://10.18.19.33:19998
 )
 conf.set(spark.externalBlockStore.baseDir,/spark)
 val sc = new SparkContext(conf)
 import org.apache.spark.storage.StorageLevel
 val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
 rdd.persist(StorageLevel.OFF_HEAP)
 val count = rdd.count()
val sum = rdd.reduce(_ + _)
 println(sThe count: $count, The sum is: $sum)


 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
 tasks have all completed, from pool
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage
 0.0 (TID 5, localhost): java.lang.RuntimeException:
 org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
 at
 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
 at
 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 

Re: MLlib Prefixspan implementation

2015-08-26 Thread Feynman Liang
ReversedPrefix is used because scala's List uses a linked list, which has
constant time append to head but linear time append to tail.

I'm aware that there are use cases for the gap constraints. My question was
more about whether any users of Spark/MLlib have an immediate application
for these features.

On Wed, Aug 26, 2015 at 12:10 AM, alexis GILLAIN ila...@hotmail.com wrote:

 A first use case of gap constraint is included in the article.
 Another application would be customer-shopping sequence analysis where you
 want to put a constraint on the duration between two purchases for them to
 be considered as a pertinent sequence.

 Additional question regarding the code : what's the point of using 
 ReversedPrefix
 in localprefispan ? The prefix is used neither in finding frequent items
 of a projected database or computing a new projected database so it looks
 like it's appended in inverse order just to be reversed when transformed to
 a sequence.

 2015-08-25 12:15 GMT+08:00 Feynman Liang fli...@databricks.com:

 CCing the mailing list again.

 It's currently not on the radar. Do you have a use case for it? I can
 bring it up during 1.6 roadmap planning tomorrow.

 On Mon, Aug 24, 2015 at 8:28 PM, alexis GILLAIN ila...@hotmail.com
 wrote:

 Hi,

 I just realized the article I mentioned is cited in the jira and not in
 the code so I guess you didn't use this result.

 Do you plan to implement sequence with timestamp and gap constraint as
 in :


 https://people.mpi-inf.mpg.de/~rgemulla/publications/miliaraki13mg-fsm.pdf

 2015-08-25 7:06 GMT+08:00 Feynman Liang fli...@databricks.com:

 Hi Alexis,

 Unfortunately, both of the papers you referenced appear to be
 translations and are quite difficult to understand. We followed
 http://doi.org/10.1109/ICDE.2001.914830 when implementing PrefixSpan.
 Perhaps you can find the relevant lines in there so I can elaborate 
 further?

 Feynman

 On Thu, Aug 20, 2015 at 9:07 AM, alexis GILLAIN ila...@hotmail.com
 wrote:

 I want to use prefixspan so I had a look at the code and the cited
 paper : Distributed PrefixSpan Algorithm Based on MapReduce.

 There is a result in the paper I didn't really undertstand and I
 could'nt find where it is used in the code.

 Suppose a sequence database S = {­1­,2...­n}, a sequence a... is a
 length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a
 prefix of a length-(L-1) sequential pattern a...a, when the support 
 count
 of a is not less than min_support, it is equal to obtaining a length-L
 sequential pattern  a ... a  from projected databases that obtaining a
 length-L sequential pattern  a ... a  from a sequence database S.

 According to the paper It's supposed to add a pruning step in the
 reduce function but I couldn't find where.

 This result seems to come from a previous paper : Wang Linlin, Fan
 Jun. Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan
 [J]. Computer Engineering, 2009, 35(23): 56-61 but it didn't help me to
 understand it and how it can improve the algorithm.








Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Hi All,

We've set up our spark cluster on aws running on yarn (running on hadoop
2.3) with fair scheduling and preemption turned on. The cluster is shared
for prod and dev work where prod runs with a higher fair share and can
preempt dev jobs if there are not enough resources available for it.
It appears that dev jobs which get preempted often get unstable after
losing some executors and the whole jobs gets stuck (without making any
progress) or end up getting restarted (and hence losing all the work done).
Has someone encountered this before ? Is the solution just to set
spark.task.maxFailures
to a really high value to recover from task failures in such scenarios? Are
there other approaches that people have taken for spark multi tenancy that
works better in such scenario?

Thanks,
Sadhan


Re: JDBC Streams

2015-08-26 Thread Cody Koeninger
Yes

On Wed, Aug 26, 2015 at 10:23 AM, Chen Song chen.song...@gmail.com wrote:

 Thanks Cody.

 Are you suggesting to put the cache in global context in each executor
 JVM, in a Scala object for example. Then have a scheduled task to refresh
 the cache (or triggered by the expiry if Guava)?

 Chen

 On Wed, Aug 26, 2015 at 10:51 AM, Cody Koeninger c...@koeninger.org
 wrote:

 If your data only changes every few days, why not restart the job every
 few days, and just broadcast the data?

 Or you can keep a local per-jvm cache with an expiry (e.g. guava cache)
 to avoid many mysql reads

 On Wed, Aug 26, 2015 at 9:46 AM, Chen Song chen.song...@gmail.com
 wrote:

 Piggyback on this question.

 I have a similar use case but a bit different. My job is consuming a
 stream from Kafka and I need to join the Kafka stream with some reference
 table from MySQL (kind of data validation and enrichment). I need to
 process this stream every 1 min. The data in MySQL is not changed very
 often, maybe once a few days.

 So my requirement is:

 * I cannot easily use broadcast variable because the data does change,
 although not very often.
 * I am not sure if it is good practice to read data from MySQL in every
 batch (in my case, 1 min).

 Anyone has done this before, any suggestions and feedback is appreciated.

 Chen


 On Sun, Jul 5, 2015 at 11:50 AM, Ashic Mahtab as...@live.com wrote:

 If it is indeed a reactive use case, then Spark Streaming would be a
 good choice.

 One approach worth considering - is it possible to receive a message
 via kafka (or some other queue). That'd not need any polling, and you could
 use standard consumers. If polling isn't an issue, then writing a custom
 receiver will work fine. The way a receiver works is this:

 * Your receiver has a receive() function, where you'd typically start a
 loop. In your loop, you'd fetch items, and call store(entry).
 * You control everything in the receiver. If you're listening on a
 queue, you receive messages, store() and ack your queue. If you're polling,
 it's up to you to ensure delays between db calls.
 * The things you store() go on to make up the rdds in your DStream. So,
 intervals, windowing, etc. apply to those. The receiver is the boundary
 between your data source and the DStream RDDs. In other words, if your
 interval is 15 seconds with no windowing, then the things that went to
 store() every 15 seconds are bunched up into an RDD of your DStream. That's
 kind of a simplification, but should give you the idea that your db
 polling interval and streaming interval are not tied together.

 -Ashic.

 --
 Date: Mon, 6 Jul 2015 01:12:34 +1000
 Subject: Re: JDBC Streams
 From: guha.a...@gmail.com
 To: as...@live.com
 CC: ak...@sigmoidanalytics.com; user@spark.apache.org


 Hi

 Thanks for the reply. here is my situation: I hve a DB which enbles
 synchronus CDC, think this as a DBtrigger which writes to a taable with
 changed values as soon as something changes in production table. My job
 will need to pick up the data as soon as it arrives which can be every 1
 min interval. Ideally it will pick up the changes, transform it into a
 jsonand puts it to kinesis. In short, I am emulating a Kinesis producer
 with a DB source (dont even ask why, lets say these are the constraints :) 
 )

 Please advice (a) is spark a good choice here (b)  whats your
 suggestion either way.

 I understand I can easily do it using a simple java/python app but I am
 little worried about managing scaling/fault tolerance and thats where my
 concern is.

 TIA
 Ayan

 On Mon, Jul 6, 2015 at 12:51 AM, Ashic Mahtab as...@live.com wrote:

 Hi Ayan,
 How continuous is your workload? As Akhil points out, with streaming,
 you'll give up at least one core for receiving, will need at most one more
 core for processing. Unless you're running on something like Mesos, this
 means that those cores are dedicated to your app, and can't be leveraged by
 other apps / jobs.

 If it's something periodic (once an hour, once every 15 minutes, etc.),
 then I'd simply write a normal spark application, and trigger it
 periodically. There are many things that can take care of that - sometimes
 a simple cronjob is enough!

 --
 Date: Sun, 5 Jul 2015 22:48:37 +1000
 Subject: Re: JDBC Streams
 From: guha.a...@gmail.com
 To: ak...@sigmoidanalytics.com
 CC: user@spark.apache.org


 Thanks Akhil. In case I go with spark streaming, I guess I have to
 implment a custom receiver and spark streaming will call this receiver
 every batch interval, is that correct? Any gotcha you see in this plan?
 TIA...Best, Ayan

 On Sun, Jul 5, 2015 at 5:40 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 If you want a long running application, then go with spark streaming
 (which kind of blocks your resources). On the other hand, if you use job
 server then you can actually use the resources (CPUs) for other jobs also
 when your dbjob is not using them.

 Thanks
 Best 

spark streaming 1.3 kafka buffer size

2015-08-26 Thread Shushant Arora
whats the default buffer in spark streaming 1.3 for  kafka messages.

Say In this run it has to fetch messages from offset 1 to 1. will it
fetch all in one go or internally it fetches messages in  few messages
batch.

Is there any setting to configure this no of offsets fetched in one batch?


Re: Building spark-examples takes too much time using Maven

2015-08-26 Thread Ted Yu
Can you provide a bit more information ?

Are Spark artifacts packaged by you have the same names / paths (in maven
repo) as the ones published by Apache Spark ?

Is Zinc running on the machine where you performed the build ?

Cheers

On Wed, Aug 26, 2015 at 7:56 AM, Muhammad Haseeb Javed 
11besemja...@seecs.edu.pk wrote:

 I checked out the master branch and started playing around with the
 examples. I want to build a jar  of the examples as I wish run them using
 the modified spark jar that I have. However, packaging spark-examples takes
 too much time as maven tries to download the jar dependencies rather than
 use the jar that are already present int the system as I extended and
 packaged spark itself locally?



Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-26 Thread Dhaval Patel
Thanks Davies. HiveContext seems neat to use :)

On Thu, Aug 20, 2015 at 3:02 PM, Davies Liu dav...@databricks.com wrote:

 As Aram said, there two options in Spark 1.4,

 1) Use the HiveContext, then you got datediff from Hive,
 df.selectExpr(datediff(d2, d1))
 2) Use Python UDF:
 ```
  from datetime import date
  df = sqlContext.createDataFrame([(date(2008, 8, 18), date(2008, 9,
 26))], ['d1', 'd2'])
  from pyspark.sql.functions import udf
  from pyspark.sql.types import IntegerType
  diff = udf(lambda a, b: (a - b).days, IntegerType())
  df.select(diff(df.d1, df.d2)).show()
 +-+
 |PythonUDF#lambda(d1,d2)|
 +-+
 |  -39|
 +-+
 ```

 On Thu, Aug 20, 2015 at 7:45 AM, Aram Mkrtchyan
 aram.mkrtchyan...@gmail.com wrote:
  Hi,
 
  hope this will help you
 
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._
  import java.sql.Timestamp
 
  val df = sc.parallelize(Array((date1, date2))).toDF(day1, day2)
 
  val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) =
Days.daysBetween(new DateTime(value2.getTime), new
  DateTime(value1.getTime)).getDays)
  df.withColumn(diff, dateDiff(df(day2), df(day1))).show()
 
  or you can write sql query using hiveql's datediff function.
   https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
 
  On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel dhaval1...@gmail.com
 wrote:
 
  More update on this question..I am using spark 1.4.1.
 
  I was just reading documentation of spark 1.5 (still in development)
 and I
  think there will be a new func *datediff* that will solve the issue. So
  please let me know if there is any work-around until spark 1.5 is out
 :).
 
  pyspark.sql.functions.datediff(end, start)[source]
 
  Returns the number of days from start to end.
 
   df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')],
 ['d1',
   'd2'])
   df.select(datediff(df.d2, df.d1).alias('diff')).collect()
  [Row(diff=32)]
 
  New in version 1.5.
 
 
  On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel dhaval1...@gmail.com
  wrote:
 
  Apologies, sent too early accidentally. Actual message is below
  
 
  A dataframe has 2 datecolumns (datetime type) and I would like to add
  another column that would have difference between these two dates.
 Dataframe
  snippet is below.
 
  new_df.show(5)
  +---+--+--+
  | PATID| SVCDATE|next_diag_date|
  +---+--+--+
  |12345655545|2012-02-13| 2012-02-13|
  |12345655545|2012-02-13| 2012-02-13|
  |12345655545|2012-02-13| 2012-02-27|
  +---+--+--+
 
 
 
  Here is what I have tried so far:
 
  - new_df.withColumn('SVCDATE2',
  (new_df.next_diag_date-new_df.SVCDATE)).show()
  Error: DateType does not support numeric operations
 
  - new_df.withColumn('SVCDATE2',
  (new_df.next_diag_date-new_df.SVCDATE).days).show()
  Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);
 
 
  However this simple python code works fine with pySpark:
 
  from datetime import date
  d0 = date(2008, 8, 18)
  d1 = date(2008, 9, 26)
  delta = d0 - d1
  print (d0 - d1).days
 
  # -39
 
 
  Any suggestions would be appreciated! Also is there a way to add a new
  column in dataframe without using column expression (e.g. like in
 pandas or
  R. df$new_col = 'new col value')?
 
 
  Thanks,
  Dhaval
 
 
 
  On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com
  wrote:
 
  new_df.withColumn('SVCDATE2',
  (new_df.next_diag_date-new_df.SVCDATE).days).show()
 
  +---+--+--+ | PATID|
 SVCDATE|next_diag_date|
  +---+--+--+ |12345655545|2012-02-13|
 2012-02-13|
  |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
 2012-02-27|
  +---+--+--+
 
 
 
 



Re: spark streaming 1.3 kafka buffer size

2015-08-26 Thread Cody Koeninger
see http://kafka.apache.org/documentation.html#consumerconfigs

fetch.message.max.bytes

in the kafka params passed to the constructor


On Wed, Aug 26, 2015 at 10:39 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 whats the default buffer in spark streaming 1.3 for  kafka messages.

 Say In this run it has to fetch messages from offset 1 to 1. will it
 fetch all in one go or internally it fetches messages in  few messages
 batch.

 Is there any setting to configure this no of offsets fetched in one batch?



Re: Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
Sorry, I meant without reading from all splits. This is a single partition
in the table.

On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak tom...@gmail.com wrote:

 I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from
 and I don't particularly care which rows. Doing a LIMIT unfortunately
 results in two stages where the first stage reads the whole table, and the
 second then performs the limit with a single worker, which is not very
 efficient.
 Is there a better way to sample a subset of rows in Spark without, ideally
 in a single stage without reading all partitions.

 cheers,
 Tom



Persisting sorted parquet tables for future sort merge joins

2015-08-26 Thread Jason
I want to persist a large _sorted_ table to Parquet on S3 and then read
this in and join it using the Sorted Merge Join strategy against another
large sorted table.

The problem is: even though I sort these tables on the join key beforehand,
once I persist them to Parquet, they lose the information about their
sortedness. Is there anyway to hint to Spark that they do not need to be
resorted the next time I read them in?

I've been trying this on 1.5 and I keep getting plans looking like:
[== Physical Plan ==]
[TungstenProject [pos#28400,workf...#28399]]
[ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]]
[ TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28403,pos#28400)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:/sorted.parquet][pos#2848424]]
[ TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28399,pos#28332)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:exploded_sorted.parquet][pos#2.399]]

As you can see, this plan exchanges and sorts the data before performing
the SortMergeJoin even though these parquet tables are already sorted.

Thanks,
Jason


Re: How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-26 Thread Mike Trienis
Thanks for your response Yana,

I can increase the MaxPermSize parameter and it will allow me to run the
unit test a few more times before I run out of memory.

However, the primary issue is that running the same unit test in the same
JVM (multiple times) results in increased memory (each run of the unit
test) and I believe it has something to do with HiveContext not reclaiming
memory after it is finished (or I'm not shutting it down properly).

It could very well be related to sbt, however, it's not clear to me.


On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 The PermGen space error is controlled with MaxPermSize parameter. I run
 with this in my pom, I think copied pretty literally from Spark's own
 tests... I don't know what the sbt equivalent is but you should be able to
 pass it...possibly via SBT_OPTS?


  plugin
   groupIdorg.scalatest/groupId
   artifactIdscalatest-maven-plugin/artifactId
   version1.0/version
   configuration

 reportsDirectory${project.build.directory}/surefire-reports/reportsDirectory
   parallelfalse/parallel
   junitxml./junitxml
   filereportsSparkTestSuite.txt/filereports
   argLine-Xmx3g -XX:MaxPermSize=256m
 -XX:ReservedCodeCacheSize=512m/argLine
   stderr/
   systemProperties
   java.awt.headlesstrue/java.awt.headless
   spark.testing1/spark.testing
   spark.ui.enabledfalse/spark.ui.enabled

 spark.driver.allowMultipleContextstrue/spark.driver.allowMultipleContexts
   /systemProperties
   /configuration
   executions
   execution
   idtest/id
   goals
   goaltest/goal
   /goals
   /execution
   /executions
   /plugin
   /plugins


 On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hello,

 I am using sbt and created a unit test where I create a `HiveContext` and
 execute some query and then return. Each time I run the unit test the JVM
 will increase it's memory usage until I get the error:

 Internal error when running tests: java.lang.OutOfMemoryError: PermGen
 space
 Exception in thread Thread-2 java.io.EOFException

 As a work-around, I can fork a new JVM each time I run the unit test,
 however, it seems like a bad solution as takes a while to run the unit
 test.

 By the way, I tried to importing the TestHiveContext:

- import org.apache.spark.sql.hive.test.TestHiveContext

 However, it suffers from the same memory issue. Has anyone else suffered
 from the same problem? Note that I am running these unit tests on my mac.

 Cheers, Mike.





Just Released V1.0.4 Low Level Receiver Based Kafka-Spark-Consumer in Spark Packages having built-in Back Pressure Controller

2015-08-26 Thread Dibyendu Bhattacharya
Dear All,

Just now released the 1.0.4 version of Low Level Receiver based
Kafka-Spark-Consumer in spark-packages.org .  You can find the latest
release here :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Here is github location : https://github.com/dibbhatt/kafka-spark-consumer

This consumer is now have built in PID ( Proportional , Integral,
Derivative ) Rate controller to control the Spark Back-Pressure .

This consumer implemented the Rate Limiting logic not by controlling the
number of messages per block ( as it is done in Spark's Out of Box
Consumers), but by size of the blocks per batch. i.e. for any given batch,
this consumer controls the Rate limit by controlling the size of the
batches. As Spark memory is driven by block size rather the number of
messages , I think rate limit by block size is more appropriate. e.g. Let
assume Kafka contains messages of very small sizes ( say few hundred bytes
) to larger messages ( to few hundred KB ) for same topic. Now if we
control the rate limit by number of messages, Block sizes may vary
drastically based on what type of messages get pulled per block . Whereas ,
if I control my rate limiting by size of block, my block size remain
constant across batches (even though number of messages differ across
blocks ) and can help to tune my memory settings more correctly as I know
how much exact memory my Block is going to consume.


This Consumer has its own PID (Proportional, Integral, Derivative )
Controller built into the consumer and control the Spark Back Pressure by
modifying the size of Block it can consume at run time. The PID Controller
rate feedback mechanism is built using Zookeeper. Again the logic to
control Back Pressure is not by controlling number of messages ( as it is
done in Spark 1.5 , SPARK-7398) but altering size of the Block consumed per
batch from Kafka. As the Back Pressure is built into the Consumer, this
consumer can be used with any version of Spark if anyone want to have a
back pressure controlling mechanism in their existing Spark / Kafka
environment.

Regards,
Dibyendu


Re: JDBC Streams

2015-08-26 Thread Jörn Franke
I would use Sqoop. It has been designed exactly for these types of
scenarios. Spark streaming does not make sense here

Le dim. 5 juil. 2015 à 1:59, ayan guha guha.a...@gmail.com a écrit :

 Hi All

 I have a requireent to connect to a DB every few minutes and bring data to
 HBase. Can anyone suggest if spark streaming would be appropriate for this
 senario or I shoud look into jobserver?

 Thanks in advance


 --
 Best Regards,
 Ayan Guha



Re: Does the driver program always run local to where you submit the job from?

2015-08-26 Thread Marcelo Vanzin
On Wed, Aug 26, 2015 at 2:03 PM, Jerry jerry.c...@gmail.com wrote:
 Assuming your submitting the job from terminal; when main() is called, if I
 try to open a file locally, can I assume the machine is always the one I
 submitted the job from?

See the --deploy-mode option. client works as you describe;
cluster does not.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Help! Stuck using withColumn

2015-08-26 Thread Saif.A.Ellafi
This simple comand call:

val final_df = data.select(month_balance).withColumn(month_date, 
data.col(month_date_curr))

Is throwing:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
month_date_curr#324 missing from month_balance#234 in operator !Project 
[month_balance#234, month_date_curr#324 AS month_date_curr#408];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)





Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Susan Zhang
Ah, I was using the UI coupled with the job logs indicating that offsets
were being processed even though it corresponded to 0 events. Looks like
I wasn't matching up timestamps correctly: the 0 event batches were
queued/processed when offsets were getting skipped:

15/08/26 11:26:05 INFO storage.BlockManager: Removing RDD 0
15/08/26 11:26:05 INFO kafka.KafkaRDD: Beginning offset 831729964 is the
same as ending offset skipping install-json 1
15/08/26 11:26:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0
non-empty blocks out of 6 blocks
15/08/26 11:26:08 INFO storage.BlockManager: Removing RDD 1

But eventually processing of offset 831729964 would resume:

15/08/26 11:27:18 INFO kafka.KafkaRDD: Computing topic install-json,
partition 1 offsets 831729964 - 831729976

Lesson learned: will be more focused on reading the job logs properly in
the future.


Thanks for all the help on this!


On Wed, Aug 26, 2015 at 12:16 PM, Cody Koeninger c...@koeninger.org wrote:

 I'd be less concerned about what the streaming ui shows than what's
 actually going on with the job.  When you say you were losing messages, how
 were you observing that?  The UI, or actual job output?

 The log lines you posted indicate that the checkpoint was restored and
 those offsets were processed; what are the log lines for the following
 KafkaRDD ?


 On Wed, Aug 26, 2015 at 2:04 PM, Susan Zhang suchenz...@gmail.com wrote:

 Compared offsets, and it continues from checkpoint loading:

 15/08/26 11:24:54 INFO
 DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
 KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446),
 (install-json,4,825772921,825773536),
 (install-json,1,831654775,831655076),
 (install-json,0,1296018451,1296018810),
 (install-json,2,824785282,824785696), (install-json,3,
 811428882,811429181)]

 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json,
 partition 0 offsets 1296018451 - 1296018810
 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
 partition 4 offsets 825773536 - 825907428
 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
 partition 2 offsets 824785696 - 824889957
 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
 partition 3 offsets 811429181 - 811529084
 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
 partition 1 offsets 831655076 - 831729964
 ...

 But for some reason the streaming UI shows it as computing 0 events.

 Removing the call to checkpoint does remove the queueing of 0 event
 batches, since offsets just skip to the latest (checked that the first
 part.fromOffset in the restarted job is larger than the last
 part.untilOffset before restart).




 On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger c...@koeninger.org
 wrote:

 When the kafka rdd is actually being iterated on the worker, there
 should be an info line of the form

 log.info(sComputing topic ${part.topic}, partition
 ${part.partition}  +

   soffsets ${part.fromOffset} - ${part.untilOffset})


 You should be able to compare that to log of offsets during checkpoint
 loading, to see if they line up.

 Just out of curiosity, does removing the call to checkpoint on the
 stream affect anything?



 On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang suchenz...@gmail.com
 wrote:

 Thanks for the suggestions! I tried the following:

 I removed

 createOnError = true

 And reran the same process to reproduce. Double checked that checkpoint
 is loading:

 15/08/26 10:10:40 INFO
 DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
 KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
 (install-json,4,825400856,825401058),
 (install-json,1,831453228,831453396),
 (install-json,0,1295759888,1295760378),
 (install-json,2,824443526,82409), (install-json,3,
 811222580,811222874)]
 15/08/26 10:10:40 INFO
 DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
 KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791),
 (install-json,4,825401058,825401249),
 (install-json,1,831453396,831453603),
 (install-json,0,1295760378,1295760809),
 (install-json,2,82409,824445510), (install-json,3,
 811222874,811223285)]
 ...

 And the same issue is appearing as before (with 0 event batches getting
 queued corresponding to dropped messages). Our kafka brokers are on version
 0.8.2.0, if that makes a difference.

 Also as a sanity check, I took out the ZK updates and reran (just in
 case that was somehow causing problems), and that didn't change anything as
 expected.

 Furthermore, the 0 event batches seem to take longer to process than
 batches with the regular load of events: processing time for 0 event
 batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
 event batches is consistently  1s. Why would that happen?


 As for the checkpoint call:

 directKStream.checkpoint(checkpointDuration)

 was an attempt to set the 

suggest configuration for debugging spark streaming, kafka

2015-08-26 Thread Joanne Contact
Hi I have a Ubuntu box with 4GB memory and duo cores. Do you think it
won't be enough to run spark streaming and kafka? I try to install
standalone mode spark kafka so I can debug them in IDE. Do I need to
install hadoop?

Thanks!

J

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does the driver program always run local to where you submit the job from?

2015-08-26 Thread Jerry
Thanks!

On Wed, Aug 26, 2015 at 2:06 PM, Marcelo Vanzin van...@cloudera.com wrote:

 On Wed, Aug 26, 2015 at 2:03 PM, Jerry jerry.c...@gmail.com wrote:
  Assuming your submitting the job from terminal; when main() is called,
 if I
  try to open a file locally, can I assume the machine is always the one I
  submitted the job from?

 See the --deploy-mode option. client works as you describe;
 cluster does not.

 --
 Marcelo



RE: Help! Stuck using withColumn

2015-08-26 Thread Saif.A.Ellafi
I can reproduce this even simpler with the following:

val gf = sc.parallelize(Array(3,6,4,7,3,4,5,5,31,4,5,2)).toDF(ASD)
val ff = sc.parallelize(Array(4,6,2,3,5,1,4,6,23,6,4,7)).toDF(GFD)

gf.withColumn(DSA, ff.col(GFD))

org.apache.spark.sql.AnalysisException: resolved attribute(s) GFD#421 missing 
from ASD#419 in operator !Project [ASD#419,GFD#421 AS DSA#422];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)


From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: Wednesday, August 26, 2015 6:47 PM
To: user@spark.apache.org
Subject: Help! Stuck using withColumn

This simple comand call:

val final_df = data.select(month_balance).withColumn(month_date, 
data.col(month_date_curr))

Is throwing:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
month_date_curr#324 missing from month_balance#234 in operator !Project 
[month_balance#234, month_date_curr#324 AS month_date_curr#408];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)





Spark.ml vs Spark.mllib

2015-08-26 Thread njoshi
Hi,

We are in the process of developing a new product/Spark application. While 
the official Spark 1.4.1 page
http://spark.apache.org/docs/latest/ml-guide.html   invites users and
developers to use *Spark.mllib* and optionally contribute to *Spark.ml*, 
this
http://stackoverflow.com/questions/30231840/difference-between-org-apache-spark-ml-classification-and-org-apache-spark-mllib
  
StackOverflow post refers to the /design doc/, saying the Spark.mllib will
be deprecated eventually. 

Could you please confirm which of these is true, and if we need to worry if
we are planning to develop the app using Spark.mlli? What would be the
timeline for this migration?

Thanks in advance,
Nikhil



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ml-vs-Spark-mllib-tp24465.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SPARK_DIST_CLASSPATH, primordial class loader app ClassNotFound

2015-08-26 Thread Night Wolf
Hey all,

I'm trying to do some stuff with a YAML file in the Spark driver using
SnakeYAML library in scala.

When I put the snakeyaml v1.14 jar on the SPARK_DIST_CLASSPATH and try to
de-serialize some objects from YAML into classes in my app JAR on the
driver (only the driver). I get the exception below.

Yet when I dont have the snakeyaml jar on the SPARK_DIST_CLASSPATH but
instead create a fat jar for my application (so it has the snakeyaml jar
baked inside), then everything works fine. If I have both the jar on
DIST_CLASSPATH  fat jarred (with sbt assembly) it still fails with the
same exception.

I'm guessing that SPARK_DIST_CLASSPATH has jars that end up in the
'primordial' class loader, because SnakeYAML is then live in this class
loader, it cant find my application jar classes because they are loaded at
a subsequent point/different class loader.

What is the workaround for this?

Thanks
~N


Exception in thread main com.sai.cfg.InvalidConfigurationPropertyException
at
com.sai.cfg.ConfigurationParser$.getConfig(ConfigurationParser.scala:184)
at
com.sai.cfg.ConfigurationParser$.getGlobalConfig(ConfigurationParser.scala:171)
at
com.sai.cfg.ConfigurationParser.globalConfig$lzycompute(ConfigurationParser.scala:26)
at
com.sai.cfg.ConfigurationParser.globalConfig(ConfigurationParser.scala:26)
at
com.sai.cfg.ConfigurationParser.init(ConfigurationParser.scala:44)
at
com.sai.strategy.StrategyEngineMain$.delayedEndpoint$au$com$quantium$personalisation$strategy$StrategyEngineMain$1(StrategyEngineMain.scala:39)
at
com.sai.strategy.StrategyEngineMain$delayedInit$body.apply(StrategyEngineMain.scala:15)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at
com.sai.strategy.StrategyEngineMain$.main(StrategyEngineMain.scala:15)
at
com.sai.strategy.StrategyEngineMain.main(StrategyEngineMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.yaml.snakeyaml.error.YAMLException: Class not found:
com.sai.cfg.models.GlobalConfiguration
at
org.yaml.snakeyaml.constructor.Constructor.getClassForNode(Constructor.java:647)
at
org.yaml.snakeyaml.constructor.Constructor$ConstructYamlObject.getConstructor(Constructor.java:330)
at
org.yaml.snakeyaml.constructor.Constructor$ConstructYamlObject.construct(Constructor.java:340)
at
org.yaml.snakeyaml.constructor.BaseConstructor.constructObject(BaseConstructor.java:182)
at
org.yaml.snakeyaml.constructor.BaseConstructor.constructDocument(BaseConstructor.java:141)
at
org.yaml.snakeyaml.constructor.BaseConstructor.getSingleData(BaseConstructor.java:127)
at org.yaml.snakeyaml.Yaml.loadFromReader(Yaml.java:481)
at org.yaml.snakeyaml.Yaml.load(Yaml.java:400)
at
com.sai.cfg.ConfigurationParser$.getConfig(ConfigurationParser.scala:181)
... 24 more


Re: Question on take function - Spark Java API

2015-08-26 Thread Sonal Goyal
You can try using wholeTextFile which will give you a pair rdd of fileName,
content. flatMap through this and manipulate the content.

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co
Check out Reifier at Spark Summit 2015
https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/

http://in.linkedin.com/in/sonalgoyal



On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane pankaj.wah...@qiotec.com
wrote:

 Hi community members,


 Apache Spark is Fantastic and very easy to learn.. Awesome work!!!

 *Question:*

 I have multiple files in a folder and and the first line in each file is
 name of the asset that the file belongs to. Second line is csv header row
 and data starts from third row..

 Ex: File 1

 TestAsset01
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,123,456,789
 11-01-2015 15:00:01,123,456,789
 . . .

 Ex: File 2

 TestAsset02
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,1230,4560,7890
 11-01-2015 15:00:01,1230,4560,7890
 . . .

 I have got nearly 1000 files in each folder sizing ~10G

 I am using apache spark Java api to read all this files.

 Following is code extract that I am using:

 try (JavaSparkContext sc = new JavaSparkContext(conf)) {
 MapString, String readingTypeMap = getReadingTypesMap(sc);
 //Read File
 JavaRDDString data = 
 sc.textFile(resourceBundle.getString(FOLDER_NAME));
 //Get Asset
 String asset = data.take(1).get(0);
 //Extract Time Series Data
 JavaRDDString actualData = data.filter(line - 
 line.contains(DELIMERTER));
 //Strip header
 String header = actualData.take(1).get(0);
 String[] headers = header.split(DELIMERTER);
 //Extract actual data
 JavaRDDString timeSeriesLines = actualData.filter(line - 
 !line.equals(header));
 //Extract valid records
 JavaRDDString validated = timeSeriesLines.filter(line - 
 validate(line));
 //Find Granularity
 Integer granularity = 
 toInt(resourceBundle.getString(GRANULARITY));
 //Transform to TSD objects
 JavaRDDTimeSeriesData tsdFlatMap = 
 transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity);

 //Save to Cassandra
 
 javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace),
 time_series_data, 
 mapToRow(TimeSeriesData.class)).saveToCassandra();

 System.out.println(Total Records:  + timeSeriesLines.count());
 System.out.println(Valid Records:  + validated.count());
 }

 Within TimeSeriesData Object I need to set the asset name for the reading,
 so I need output of data.take(1) to be different for different files.

 Thank You.

 Best Regards,
 Pankaj




 QIO Technologies Limited is a limited company registered in England 
 Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number
 09368431

 This message and the information contained within it is intended solely
 for the addressee and may contain confidential or privileged information.
 If you have received this message in error please notify QIO Technologies
 Limited immediately and then permanently delete this message. If you are
 not the intended addressee then you must not copy, transmit, disclose or
 rely on the information contained in this message or in any attachment to
 it, all such use is prohibited to maximum extent possible by law.



Re:Re:Re: How to increase data scale in Spark SQL Perf

2015-08-26 Thread Todd

Sorry  for the noise, It's my bad...I have worked it out now.

At 2015-08-26 13:20:57, Todd bit1...@163.com wrote:



I think the answer is No. I only see such message on the console..and #2 is the 
thread stack trace。
I am thinking is that in Spark SQL Perf forks many dsdgen process to generate 
data when the scalafactor is increased which at last exhaust the JVM
When thread exception is thrown on the console and I leave it there for some 
while(15min about),then eventually I will see OutOfMemory occur

Can you guys try to run it if you have the environment ? I think you may 
reproduce it. Thanks!







At 2015-08-26 13:01:34, Ted Yu yuzhih...@gmail.com wrote:

The error in #1 below was not informative. 


Are you able to get more detailed error message ?


Thanks




On Aug 25, 2015, at 6:57 PM, Todd bit1...@163.com wrote:




Thanks Ted Yu.

Following are the error message:
1. The exception that is shown on the UI is :
Exception in thread Thread-113 Exception in thread Thread-126 Exception in 
thread Thread-64 Exception in thread Thread-90 Exception in thread 
Thread-117 Exception in thread Thread-80 Exception in thread Thread-115 
Exception in thread ResponseProcessor for block 
BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984 Exception in 
thread qtp1270119920-57 Exception in thread Thread-77 Exception in thread 
Thread-132 Exception in thread Thread-68 Exception in thread Thread-61 
Exception in thread Thread-70 Exception in thread qtp1270119920-52 
Exception in thread Thread-88 Exception in thread qtp318933312-47 Exception 
in thread qtp1270119920-56

2. jstack the process, I see bunch of following message:

Thread 31258: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
 - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
 - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
(Interpreted frame)
 - 
scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
 @bci=11, line=142 (Interpreted frame)
 - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
(Interpreted frame)


Thread 31257: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
 - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
 - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
(Interpreted frame)
 - 
scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
 @bci=11, line=142 (Interpreted frame)
 - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
(Interpreted frame)







At 2015-08-25 19:32:56, Ted Yu yuzhih...@gmail.com wrote:

Looks like you were attaching images to your email which didn't go through.


Consider using third party site for images - or paste error in text.


Cheers


On Tue, Aug 25, 2015 at 4:22 AM, Todd bit1...@163.com wrote:

Hi,
The spark sql perf itself contains benchmark data generation. I am using spark 
shell to run the spark sql perf to generate the data with 10G memory for both 
driver and executor.
When I increase the scalefactor to be 30,and run the job, Then I got the 
following error:



When I jstack it to see the status of the thread. I see the following: looks it 
is waiting for the process that the spark job kicks off.








Re: DataFrame Parquet Writer doesn't keep schema

2015-08-26 Thread Petr Novak
The same as
https://mail.google.com/mail/#label/Spark%2Fuser/14f64c75c15f5ccd

Please follow the discussion there.

On Tue, Aug 25, 2015 at 5:02 PM, Petr Novak oss.mli...@gmail.com wrote:

 Hi all,
 when I read parquet files with required fields aka nullable=false they
 are read correctly. Then I save them (df.write.parquet) and read again all
 my fields are saved and read as optional, aka nullable=true. Which means I
 suddenly have files with incompatible schemas. This happens on 1.3.0-1.4.1
 and even on 1.5.1-rc1.

 Should I set some write option to keep nullability? Is there a specific
 reason why nullability is always overriden to true?

 Many thanks,
 Peter





Re: Relation between threads and executor core

2015-08-26 Thread Jem Tucker
Hi Samya,

When submitting an application with spark-submit the cores per executor can
be set with --executor-cores, meaning you can run that many tasks per
executor concurrently. The page below has some more details on submitting
applications:

https://spark.apache.org/docs/latest/submitting-applications.html

thanks,

Jem

On Wed, Aug 26, 2015 at 9:47 AM Samya samya.ma...@amadeus.com wrote:

 Hi All,

 Few basic queries :-
 1. Is there a way we can control the number of threads per executor core?
 2. Does this parameter “executor-cores” also has say in deciding how many
 threads to be run?

 Regards,
 Sam



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: Relation between threads and executor core

2015-08-26 Thread Samya MAITI
Thanks Jem, I do understand your suggestion. Actually --executor-cores alone 
doesn’t control the number of tasks, but is also governed by spark.task.cpus 
(amount of cores dedicated for each task’s execution).

Reframing my Question, How many threads can be spawned per executor core? Is it 
in user control?

Regards,
Sam

From: Jem Tucker [mailto:jem.tuc...@gmail.com]
Sent: Wednesday, August 26, 2015 2:26 PM
To: Samya MAITI samya.ma...@amadeus.com; user@spark.apache.org
Subject: Re: Relation between threads and executor core

Hi Samya,

When submitting an application with spark-submit the cores per executor can be 
set with --executor-cores, meaning you can run that many tasks per executor 
concurrently. The page below has some more details on submitting applications:

https://spark.apache.org/docs/latest/submitting-applications.html

thanks,

Jem

On Wed, Aug 26, 2015 at 9:47 AM Samya 
samya.ma...@amadeus.commailto:samya.ma...@amadeus.com wrote:
Hi All,

Few basic queries :-
1. Is there a way we can control the number of threads per executor core?
2. Does this parameter “executor-cores” also has say in deciding how many
threads to be run?

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


Performance issue with Spark join

2015-08-26 Thread lucap
Hi,

I'm trying to perform an ETL using Spark, but as soon as I start performing
joins performance degrades a lot. Let me explain what I'm doing and what I
found out until now.

First of all, I'm reading avro files that are on a Cloudera cluster, using
commands like this:
/val tab1 = sc.hadoopFile(hdfs:///path/to/file,
classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
classOf[org.apache.hadoop.io.NullWritable], 10)/

After this, I'm applying some filter functions to data (to reproduce where
clauses of the original query) and then I'm using one map for each table in
order to translate RDD elements in (key,record) format. Let's say I'm doing
this:
/val elabTab1 = tab1.filter(...).map()/

It is important to notice that if I do something like /elabTab1.first/ or
/elabTab1.count/ the task is performed in a short time, let's say around
impala's time. Now I need to do the following:
/val joined = elabTab1.leftOuterJoin(elabTab2)/
Then I tried something like /joined.count/ to test performance, but it
degraded really a lot (let's say that a count on a single table takes like 4
seconds and the count on the joined table takes 12 minutes). I think there's
a problem with the configuration, but what might it be?

I'll give you some more information:
1] Spark is running on YARN on a Cloudera cluster
2] I'm starting spark-shell with a command like /spark-shell
--executor-cores 5 --executor-memory 10G/ that gives the shell approx 10
vcores and 25 GB of memory
3] The task seems still for a lot of time after the map tasks, with the
following message in console: /Asked to send map output locations for
shuffle ... to .../
4] If I open the stderr of the executors, I can read plenty of messages like
the following: /Thread ... spilling in-memory map of ... MB to disk/, where
MBs are in the order of 300-400
5] I tried to raise the number of executors, but the situation didn't seem
to change much. I also tried to change the number of splits of the avro
files (currently set to 10), but it didn't seem to change much as well
6] Tables aren't particularly big, the bigger one should be few GBs

Regards,
Luca



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-issue-with-Spark-join-tp24458.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Performance issue with Spark join

2015-08-26 Thread Hemant Bhanawat
Spark joins are different than traditional database joins because of the
lack of support of indexes.  Spark has to shuffle data between various
nodes to perform joins. Hence joins are bound to be much slower than count
which is just a parallel scan of the data.

Still, to ensure that nothing is wrong with the setup, you may want to look
at your Spark Task UI. You may want to look at the Shuffle Reads and
Shuffle write parameters.

On Wed, Aug 26, 2015 at 3:08 PM, lucap luca-pi...@hotmail.it wrote:

 Hi,

 I'm trying to perform an ETL using Spark, but as soon as I start performing
 joins performance degrades a lot. Let me explain what I'm doing and what I
 found out until now.

 First of all, I'm reading avro files that are on a Cloudera cluster, using
 commands like this:
 /val tab1 = sc.hadoopFile(hdfs:///path/to/file,
 classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
 classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
 classOf[org.apache.hadoop.io.NullWritable], 10)/

 After this, I'm applying some filter functions to data (to reproduce
 where
 clauses of the original query) and then I'm using one map for each table in
 order to translate RDD elements in (key,record) format. Let's say I'm doing
 this:
 /val elabTab1 = tab1.filter(...).map()/

 It is important to notice that if I do something like /elabTab1.first/ or
 /elabTab1.count/ the task is performed in a short time, let's say around
 impala's time. Now I need to do the following:
 /val joined = elabTab1.leftOuterJoin(elabTab2)/
 Then I tried something like /joined.count/ to test performance, but it
 degraded really a lot (let's say that a count on a single table takes like
 4
 seconds and the count on the joined table takes 12 minutes). I think
 there's
 a problem with the configuration, but what might it be?

 I'll give you some more information:
 1] Spark is running on YARN on a Cloudera cluster
 2] I'm starting spark-shell with a command like /spark-shell
 --executor-cores 5 --executor-memory 10G/ that gives the shell approx 10
 vcores and 25 GB of memory
 3] The task seems still for a lot of time after the map tasks, with the
 following message in console: /Asked to send map output locations for
 shuffle ... to .../
 4] If I open the stderr of the executors, I can read plenty of messages
 like
 the following: /Thread ... spilling in-memory map of ... MB to disk/, where
 MBs are in the order of 300-400
 5] I tried to raise the number of executors, but the situation didn't seem
 to change much. I also tried to change the number of splits of the avro
 files (currently set to 10), but it didn't seem to change much as well
 6] Tables aren't particularly big, the bigger one should be few GBs

 Regards,
 Luca



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Performance-issue-with-Spark-join-tp24458.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to increase data scale in Spark SQL Perf

2015-08-26 Thread Ted Yu
Mind sharing how you fixed the issue ?

Cheers



 On Aug 26, 2015, at 1:50 AM, Todd bit1...@163.com wrote:
 
 
 Sorry  for the noise, It's my bad...I have worked it out now. 
 
 At 2015-08-26 13:20:57, Todd bit1...@163.com wrote:
 
 I think the answer is No. I only see such message on the console..and #2 is 
 the thread stack trace。
 I am thinking is that in Spark SQL Perf forks many dsdgen process to generate 
 data when the scalafactor is increased which at last exhaust the JVM
 When thread exception is thrown on the console and I leave it there for some 
 while(15min about),then eventually I will see OutOfMemory occur
 
 Can you guys try to run it if you have the environment ? I think you may 
 reproduce it. Thanks!
 
 
 
 
 
 At 2015-08-26 13:01:34, Ted Yu yuzhih...@gmail.com wrote:
 The error in #1 below was not informative. 
 
 Are you able to get more detailed error message ?
 
 Thanks
 
 
 
 On Aug 25, 2015, at 6:57 PM, Todd bit1...@163.com wrote:
 
 
 Thanks Ted Yu.
 
 Following are the error message:
 1. The exception that is shown on the UI is :
 Exception in thread Thread-113 Exception in thread Thread-126 Exception 
 in thread Thread-64 Exception in thread Thread-90 Exception in thread 
 Thread-117 Exception in thread Thread-80 Exception in thread 
 Thread-115 Exception in thread ResponseProcessor for block 
 BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984 Exception 
 in thread qtp1270119920-57 Exception in thread Thread-77 Exception in 
 thread Thread-132 Exception in thread Thread-68 Exception in thread 
 Thread-61 Exception in thread Thread-70 Exception in thread 
 qtp1270119920-52 Exception in thread Thread-88 Exception in thread 
 qtp318933312-47 Exception in thread qtp1270119920-56 
 
 2. jstack the process, I see bunch of following message:
 
 Thread 31258: (state = BLOCKED)
  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
  - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
  - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
  - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
 (Interpreted frame)
  - 
 scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
  @bci=11, line=142 (Interpreted frame)
  - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
 (Interpreted frame)
 
 
 Thread 31257: (state = BLOCKED)
  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
  - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
  - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
  - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
 (Interpreted frame)
  - 
 scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
  @bci=11, line=142 (Interpreted frame)
  - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
 (Interpreted frame)
 
 
 
 
 
 At 2015-08-25 19:32:56, Ted Yu yuzhih...@gmail.com wrote:
 Looks like you were attaching images to your email which didn't go through.
 
 Consider using third party site for images - or paste error in text.
 
 Cheers
 
 On Tue, Aug 25, 2015 at 4:22 AM, Todd bit1...@163.com wrote:
 Hi,
 The spark sql perf itself contains benchmark data generation. I am using 
 spark shell to run the spark sql perf to generate the data with 10G memory 
 for both driver and executor. 
 When I increase the scalefactor to be 30,and run the job, Then I got the 
 following error:
 
 
 
 When I jstack it to see the status of the thread. I see the following: 
 looks it is waiting for the process that the spark job kicks off.
 
 
 
 
 


Custom Offset Management

2015-08-26 Thread Deepesh Maheshwari
Hi Folks,

My Spark application interacts with kafka for getting data through Java Api.
I am using Direct Approach (No Receivers) - which use Kafka’s simple
consumer API to Read data.
So, kafka offsets need to be handles explicitly.

In case of Spark failure i need to save the offset state of kafka for
resuming from the failure point.
I am saving these points in MongoDB.

Please tell he how to initialize Kafka DirectStream with saved offset
points.
I want to initialize kafka stream in Spark Streaming with required offset
points.

There is method i gets on web.

KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams,
topicsSet, fromOffsets, arg8);

arg8 - kafka.message.MessageAndMetadata

Please tell me how to handle and initialize this.

Regards,
Deepesh


Build k-NN graph for large dataset

2015-08-26 Thread Jaonary Rabarisoa
Dear all,

I'm trying to find an efficient way to build a k-NN graph for a large
dataset. Precisely, I have a large set of high dimensional vector (say d
 1) and I want to build a graph where those high dimensional points
are the vertices and each one is linked to the k-nearest neighbor based on
some kind similarity defined on the vertex spaces.
My problem is to implement an efficient algorithm to compute the weight
matrix of the graph. I need to compute a N*N similarities and the only way
I know is to use cartesian operation follow by map operation on RDD.
But, this is very slow when the N is large. Is there a more cleaver way to
do this for an arbitrary similarity function ?

Cheers,

Jao


Re: Relation between threads and executor core

2015-08-26 Thread Jem Tucker
Sam,

This may be of interest, as far as i can see it suggests that a spark
'task' is always executed as a single thread in the JVM.

http://0x0fff.com/spark-architecture/

Thanks,

Jem



On Wed, Aug 26, 2015 at 10:06 AM Samya MAITI samya.ma...@amadeus.com
wrote:

 Thanks Jem, I do understand your suggestion. Actually --executor-cores
 alone doesn’t control the number of tasks, but is also governed by
 *spark.task.cpus* (amount of cores dedicated for each task’s execution).



 Reframing my Question*, How many threads can be spawned per executor
 core? Is it in user control? *



 Regards,

 Sam



 *From:* Jem Tucker [mailto:jem.tuc...@gmail.com]
 *Sent:* Wednesday, August 26, 2015 2:26 PM
 *To:* Samya MAITI samya.ma...@amadeus.com; user@spark.apache.org
 *Subject:* Re: Relation between threads and executor core



 Hi Samya,



 When submitting an application with spark-submit the cores per executor
 can be set with --executor-cores, meaning you can run that many tasks per
 executor concurrently. The page below has some more details on submitting
 applications:



 https://spark.apache.org/docs/latest/submitting-applications.html



 thanks,



 Jem



 On Wed, Aug 26, 2015 at 9:47 AM Samya samya.ma...@amadeus.com wrote:

 Hi All,

 Few basic queries :-
 1. Is there a way we can control the number of threads per executor core?
 2. Does this parameter “executor-cores” also has say in deciding how many
 threads to be run?

 Regards,
 Sam



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Build k-NN graph for large dataset

2015-08-26 Thread Robin East
You could try dimensionality reduction (PCA or SVD) first. I would imagine that 
even if you could successfully compute similarities in the high-dimensional 
space you would probably run into the curse of dimensionality.
 On 26 Aug 2015, at 12:35, Jaonary Rabarisoa jaon...@gmail.com wrote:
 
 Dear all,
 
 I'm trying to find an efficient way to build a k-NN graph for a large 
 dataset. Precisely, I have a large set of high dimensional vector (say d  
 1) and I want to build a graph where those high dimensional points are 
 the vertices and each one is linked to the k-nearest neighbor based on some 
 kind similarity defined on the vertex spaces. 
 My problem is to implement an efficient algorithm to compute the weight 
 matrix of the graph. I need to compute a N*N similarities and the only way I 
 know is to use cartesian operation follow by map operation on RDD. But, 
 this is very slow when the N is large. Is there a more cleaver way to do this 
 for an arbitrary similarity function ? 
 
 Cheers,
 
 Jao


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



JobScheduler: Error generating jobs for time for custom InputDStream

2015-08-26 Thread Juan Rodríguez Hortalá
Hi,

I've developed a ScalaCheck property for testing Spark Streaming
transformations. To do that I had to develop a custom InputDStream, which
is very similar to QueueInputDStream but has a method for adding new test
cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
You can see the code at
https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
I have developed a few properties that run in local mode
https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
The problem is that when the batch interval is too small, and the machine
cannot complete the batches fast enough, I get the following exceptions in
the Spark log

15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
1440580922500 ms
java.lang.NullPointerException
at
org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
at
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at

Re: Issue with building Spark v1.4.1-rc4 with Scala 2.11

2015-08-26 Thread Ted Yu
Have you run dev/change-version-to-2.11.sh ?

Cheers

On Wed, Aug 26, 2015 at 7:07 AM, Felix Neutatz neut...@googlemail.com
wrote:

 Hi everybody,

 I tried to build Spark v1.4.1-rc4 with Scala 2.11:
 ../apache-maven-3.3.3/bin/mvn -Dscala-2.11 -DskipTests clean install

 Before running this, I deleted:
 ../.m2/repository/org/apache/spark
 ../.m2/repository/org/spark-project

 My changes to the code:
 I just changed line 174 of org.apache.spark.executor.Executor$TaskRunner
 to:
 logInfo(stest Executor is trying to kill $taskName (TID $taskId))

 Everything builds without an error, but I have an issue.

 When I look into the jar of spark-core_2.10, I can see the changed string
 in Executor$TaskRunner$$anonfun$kill$1.class. But when I look
 into spark-core_2.11 the corresponding string didn't change. It seems like
 it downloads the jar from maven.

 Do you know what I did wrong?

 I also tried to run mvn -Dscala-2.11 -DskipTests clean install on the
 current master and got the following error:

 [ERROR] Failed to execute goal
 org.apache.maven.plugins:maven-enforcer-plugin:1.4:enforce
 (enforce-versions) on project spark-parent_2.10: Some Enforcer rules have
 failed. Look above for specific messages explaining why the rule failed. -
 [Help 1]
 [ERROR]
 [ERROR] To see the full stack trace of the errors, re-run Maven with the
 -e switch.
 [ERROR] Re-run Maven using the -X switch to enable full debug logging.
 [ERROR]
 [ERROR] For more information about the errors and possible solutions,
 please read the following articles:
 [ERROR] [Help 1]
 http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

 Thank you for your help.

 Best regards,
 Felix




Re: Finding the number of executors.

2015-08-26 Thread Virgil Palanciuc
As I was writing a long-ish message to explain how it doesn't work, it
dawned on me that maybe driver connects to executors only after there's
some work to do (while I was trying to find the number of executors BEFORE
starting the actual work).

So the solution was to simply execute a dummy task (
sparkContext.parallelize(1 until 1000, 200).reduce(_+_) ) before attempting
to retrieve the executors. It works now :)

Virgil.

On Sat, Aug 22, 2015 at 12:44 AM, Du Li l...@yahoo-inc.com wrote:

 Following is a method that retrieves the list of executors registered to a
 spark context. It worked perfectly with spark-submit in standalone mode for
 my project.

 /**
* A simplified method that just returns the current active/registered
 executors
* excluding the driver.
* @param sc
*   The spark context to retrieve registered executors.
* @return
* A list of executors each in the form of host:port.
*/
   def currentActiveExecutors(sc: SparkContext): Seq[String] = {
 val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
 val driverHost: String = sc.getConf.get(spark.driver.host)
 allExecutors.filter(! _.split(:)(0).equals(driverHost)).toList
   }




 On Friday, August 21, 2015 1:53 PM, Virgil Palanciuc virg...@gmail.com
 wrote:


 Hi Akhil,

 I'm using spark 1.4.1.
 Number of executors is not in the command line, not in the 
 getExecutorMemoryStatus
 (I already mentioned that I tried that, works in spark-shell but not when
 executed via spark-submit). I tried looking at defaultParallelism too,
 it's 112 (7 executors * 16 cores) when ran via spark-shell, but just 2 when
 ran via spark-submit.

 But the scheduler obviously knows this information. It *must* know it. How
 can I access it? Other that parsing the HTML of the WebUI, that is...
 that's pretty much guaranteed to work, and maybe I'll do that, but it's
 extremely convoluted.

 Regards,
 Virgil.

 On Fri, Aug 21, 2015 at 11:35 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Which version spark are you using? There was a discussion happened over
 here

 http://apache-spark-user-list.1001560.n3.nabble.com/Determine-number-of-running-executors-td19453.html

 http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccacbyxk+ya1rbbnkwjheekpnbsbh10rykuzt-laqgpdanvhm...@mail.gmail.com%3E
 On Aug 21, 2015 7:42 AM, Virgil Palanciuc vir...@palanciuc.eu wrote:

 Is there any reliable way to find out the number of executors
 programatically - regardless of how the job  is run? A method that
 preferably works for spark-standalone, yarn, mesos, regardless whether the
 code runs from the shell or not?

 Things that I tried and don't work:
 - sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell,
 does not work if task submitted via  spark-submit
 - sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't
 work unless explicitly configured
 - call to http://master:8080/json (this used to work, but doesn't
 anymore?)

 I guess I could parse the output html from the Spark UI... but that seems
 dumb. is there really no better way?

 Thanks,
 Virgil.








application logs for long lived job on YARN

2015-08-26 Thread Chen Song
When running long-lived job on YARN like Spark Streaming, I found that
container logs gone after days on executor nodes, although the job itself
is still running.


I am using cdh5.4.0 and have aggregated logs enabled. Because the local
logs are gone on executor nodes, I don't see any aggregated logs on hdfs
after the job is killed or failed.

Is there a YARN config to keep the logs from being deleted for long-lived
streaming job?

-- 
Chen Song


Spark 1.3.1 saveAsParquetFile hangs on app exit

2015-08-26 Thread cingram
I have a simple test that is hanging when using s3a with spark 1.3.1. Is
there something I need to do to cleanup the S3A file system? The write to S3
appears to have worked but this job hangs in the spark-shell and using
spark-submit. Any help would be greatly appreciated. TIA.

import sqlContext.implicits._
import com.datastax.spark.connector._
case class LU(userid: String, timestamp: Long, lat: Double, lon: Double)
val uid =testuser
val lue = sc.cassandraTable[LU](test, foo).where(userid=?, uid).toDF
lue.saveAsParquetFile(s3a://twc-scratch/craig_lues)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-saveAsParquetFile-hangs-on-app-exit-tp24460.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.3.1 saveAsParquetFile hangs on app exit

2015-08-26 Thread Cheng Lian

Could you please show jstack result of the hanged process? Thanks!

Cheng

On 8/26/15 10:46 PM, cingram wrote:

I have a simple test that is hanging when using s3a with spark 1.3.1. Is
there something I need to do to cleanup the S3A file system? The write to S3
appears to have worked but this job hangs in the spark-shell and using
spark-submit. Any help would be greatly appreciated. TIA.

import sqlContext.implicits._
import com.datastax.spark.connector._
case class LU(userid: String, timestamp: Long, lat: Double, lon: Double)
val uid =testuser
val lue = sc.cassandraTable[LU](test, foo).where(userid=?, uid).toDF
lue.saveAsParquetFile(s3a://twc-scratch/craig_lues)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-saveAsParquetFile-hangs-on-app-exit-tp24460.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Issue with building Spark v1.4.1-rc4 with Scala 2.11

2015-08-26 Thread Felix Neutatz
Hi everybody,

I tried to build Spark v1.4.1-rc4 with Scala 2.11:
../apache-maven-3.3.3/bin/mvn -Dscala-2.11 -DskipTests clean install

Before running this, I deleted:
../.m2/repository/org/apache/spark
../.m2/repository/org/spark-project

My changes to the code:
I just changed line 174 of org.apache.spark.executor.Executor$TaskRunner
to:
logInfo(stest Executor is trying to kill $taskName (TID $taskId))

Everything builds without an error, but I have an issue.

When I look into the jar of spark-core_2.10, I can see the changed string
in Executor$TaskRunner$$anonfun$kill$1.class. But when I look
into spark-core_2.11 the corresponding string didn't change. It seems like
it downloads the jar from maven.

Do you know what I did wrong?

I also tried to run mvn -Dscala-2.11 -DskipTests clean install on the
current master and got the following error:

[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-enforcer-plugin:1.4:enforce
(enforce-versions) on project spark-parent_2.10: Some Enforcer rules have
failed. Look above for specific messages explaining why the rule failed. -
[Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions,
please read the following articles:
[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

Thank you for your help.

Best regards,
Felix


Re: reduceByKey not working on JavaPairDStream

2015-08-26 Thread Sean Owen
I don't see that you invoke any action in this code. It won't do
anything unless you tell it to perform an action that requires the
transformations.

On Wed, Aug 26, 2015 at 7:05 AM, Deepesh Maheshwari
deepesh.maheshwar...@gmail.com wrote:
 Hi,
 I have applied mapToPair and then a reduceByKey on a DStream to obtain a
 JavaPairDStreamString, MapString, Object.
 I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained
 above.
 But i do not see any logs from reduceByKey operation.
 Can anyone explain why is this happening..?


 find My Code Below -

  /***
  * GroupLevel1 Groups - articleId, host and tags
  */
 JavaPairDStreamString, MapString, Object groupLevel1 =
 inputDataMap

 .mapToPair(
 new PairFunctionMapString, Object, String,
 MapString, Object() {

 private static final long serialVersionUID =
 5196132687044875422L;

 @Override
 public Tuple2String, MapString, Object call(
 MapString, Object map) throws
 Exception {
 String host = (String) map.get(host);
 String articleId = (String)
 map.get(articleId);
 List tags = (List) map.get(tags);

 if (host == null || articleId == null) {
 logger.error(*** Error Doc
 \n + map);
 }
 String key = articleId_ + articleId +
 _host_ + host + _tags_ + tags.toString();

 //logger.info(key);
 System.out.println(Printing Key -  + key);
 map.put(articlecount, 1L);

 return new Tuple2String, MapString,
 Object(key, map);
 }
 })
 .reduceByKey(
 new Function2MapString, Object, MapString,
 Object, MapString, Object() {

 private static final long serialVersionUID = 1L;

 @Override
 public MapString, Object call(
 MapString, Object map1,
 MapString, Object map2) throws
 Exception {
 Long count1 = (Long)
 map1.get(articlecount);
 Long count2 = (Long)
 map2.get(articlecount);

 map1.put(articlecount, count1 + count2);
 return map1;
 }
 });





 /***
  * Grouping level 1 groups on articleId+host+tags
  * Tags can be multiple for an article.
  * Grouping level 2 does -
  *  1. For each tag in a row, find occurrence of that tag in other
 rows.
  *  2. If one tag found in another row, then add the articleCount of
 current and new row and put as articleCount for that tag.
  *  Note -
  *  Idea behind this grouping is to get all article counts that
 contain a particular tag and preserve this value.
  */


 JavaPairDStreamString, MapString, Object groupLevel2 =
 groupLevel1.flatMapToPair(new PairFlatMapFunctionTuple2String, MapString,
 Object, String, MapString, Object() {
 @Override
 public IterableTuple2String, MapString, Object
 call(Tuple2String, MapString, Object stringMapTuple2) throws Exception {
 System.out.println(group level 2 tuple 1 - +
 stringMapTuple2._1());
 System.out.println(group level 2 tuple 2 - +
 stringMapTuple2._2());
 ArrayListString tagList = (ArrayListString)
 stringMapTuple2._2().get(tags);
 ArrayList tagKeyList = new ArrayList();
 String host = (String) stringMapTuple2._2().get(host);
 StringBuilder key;
 for (String tag : tagList) {
 key = new
 StringBuilder(host_).append(host).append(_tag_).append(tag);
 System.out.println(generated Key - +key);
 tagKeyList.add(new Tuple2String, MapString,
 Object(key.toString(), stringMapTuple2._2()));
 }
 return tagKeyList;
 }
 });

 groupLevel2 = groupLevel2.reduceByKey(new Function2MapString,
 Object, MapString, Object, MapString, Object() {
 @Override
 public MapString, Object call(MapString, Object dataMap1,
 MapString, Object dataMap2) throws Exception {
 System.out.println(Type of article map in 1  +
 dataMap1.get(articleId).getClass());
 System.out.println(Type of article map in 2  +
 dataMap2.get(articleId).getClass());

Relation between threads and executor core

2015-08-26 Thread Samya
Hi All,

Few basic queries :-
1. Is there a way we can control the number of threads per executor core?
2. Does this parameter “executor-cores” also has say in deciding how many
threads to be run?

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re:Re: How to increase data scale in Spark SQL Perf

2015-08-26 Thread Todd
Increase the number of executors, :-)



At 2015-08-26 16:57:48, Ted Yu yuzhih...@gmail.com wrote:

Mind sharing how you fixed the issue ?


Cheers




On Aug 26, 2015, at 1:50 AM, Todd bit1...@163.com wrote:



Sorry  for the noise, It's my bad...I have worked it out now.

At 2015-08-26 13:20:57, Todd bit1...@163.com wrote:



I think the answer is No. I only see such message on the console..and #2 is the 
thread stack trace。
I am thinking is that in Spark SQL Perf forks many dsdgen process to generate 
data when the scalafactor is increased which at last exhaust the JVM
When thread exception is thrown on the console and I leave it there for some 
while(15min about),then eventually I will see OutOfMemory occur

Can you guys try to run it if you have the environment ? I think you may 
reproduce it. Thanks!







At 2015-08-26 13:01:34, Ted Yu yuzhih...@gmail.com wrote:

The error in #1 below was not informative. 


Are you able to get more detailed error message ?


Thanks




On Aug 25, 2015, at 6:57 PM, Todd bit1...@163.com wrote:




Thanks Ted Yu.

Following are the error message:
1. The exception that is shown on the UI is :
Exception in thread Thread-113 Exception in thread Thread-126 Exception in 
thread Thread-64 Exception in thread Thread-90 Exception in thread 
Thread-117 Exception in thread Thread-80 Exception in thread Thread-115 
Exception in thread ResponseProcessor for block 
BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984 Exception in 
thread qtp1270119920-57 Exception in thread Thread-77 Exception in thread 
Thread-132 Exception in thread Thread-68 Exception in thread Thread-61 
Exception in thread Thread-70 Exception in thread qtp1270119920-52 
Exception in thread Thread-88 Exception in thread qtp318933312-47 Exception 
in thread qtp1270119920-56

2. jstack the process, I see bunch of following message:

Thread 31258: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
 - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
 - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
(Interpreted frame)
 - 
scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
 @bci=11, line=142 (Interpreted frame)
 - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
(Interpreted frame)


Thread 31257: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
 - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
 - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
(Interpreted frame)
 - 
scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
 @bci=11, line=142 (Interpreted frame)
 - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
(Interpreted frame)







At 2015-08-25 19:32:56, Ted Yu yuzhih...@gmail.com wrote:

Looks like you were attaching images to your email which didn't go through.


Consider using third party site for images - or paste error in text.


Cheers


On Tue, Aug 25, 2015 at 4:22 AM, Todd bit1...@163.com wrote:

Hi,
The spark sql perf itself contains benchmark data generation. I am using spark 
shell to run the spark sql perf to generate the data with 10G memory for both 
driver and executor.
When I increase the scalefactor to be 30,and run the job, Then I got the 
following error:



When I jstack it to see the status of the thread. I see the following: looks it 
is waiting for the process that the spark job kicks off.








Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Cody Koeninger
The first thing that stands out to me is
createOnError = true

Are you sure the checkpoint is actually loading, as opposed to failing and
starting the job anyway?  There should be info lines that look like

INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
Restoring KafkaRDD for time 144059718 ms [(test,1,162,220)


You should be able to tell from those whether the offset ranges being
loaded from the checkpoint look reasonable.

Also, is there a reason you're calling

directKStream.checkpoint(checkpointDuration)

Just calling checkpoint on the streaming context should be sufficient to
save the metadata



On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang suchenz...@gmail.com wrote:

 Sure thing!

 The main looks like:


 --


 val kafkaBrokers = conf.getString(s$varPrefix.metadata.broker.list)

 val kafkaConf = Map(
   zookeeper.connect - zookeeper,
   group.id - options.group,
   zookeeper.connection.timeout.ms - 1,
   auto.commit.interval.ms - 1000,
   rebalance.max.retries - 25,
   bootstrap.servers - kafkaBrokers
 )

 val ssc = StreamingContext.getOrCreate(checkpointDirectory,
   () = {
 createContext(kafkaConf, checkpointDirectory, topic, numThreads,
 isProd)
   }, createOnError = true)

 ssc.start()
 ssc.awaitTermination()



 --


 And createContext is defined as:



 --


 val batchDuration = Seconds(5)
 val checkpointDuration = Seconds(20)

 private val AUTO_OFFSET_COMMIT = auto.commit.enable

 def createContext(kafkaConf: Map[String, String],
 checkpointDirectory: String,
 topic: String,
 numThreads: Int,
 isProd: Boolean)
   : StreamingContext = {

 val sparkConf = new SparkConf().setAppName(***)
 val ssc = new StreamingContext(sparkConf, batchDuration)
 ssc.checkpoint(checkpointDirectory)

 val topicSet = topic.split(,).toSet
 val groupId = kafkaConf.getOrElse(group.id, )

 val directKStream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
 directKStream.checkpoint(checkpointDuration)

 val table = ***

 directKStream.foreachRDD { rdd =
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd.flatMap(rec = someFunc(rec))
 .reduceByKey((i1: Long, i2: Long) = if (i1  i2) i1 else i2)
 .foreachPartition { partitionRec =
   val dbWrite = DynamoDBWriter()
   partitionRec.foreach {
 /* Update Dynamo Here */
   }
 }

   /** Set up ZK Connection **/
   val props = new Properties()
   kafkaConf.foreach(param = props.put(param._1, param._2))

   props.setProperty(AUTO_OFFSET_COMMIT, false)

   val consumerConfig = new ConsumerConfig(props)
   assert(!consumerConfig.autoCommitEnable)

   val zkClient = new ZkClient(consumerConfig.zkConnect,
 consumerConfig.zkSessionTimeoutMs,
 consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

   offsetRanges.foreach { osr =
 val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
 val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition}
 ZkUtils.updatePersistentPath(zkClient, zkPath,
 osr.untilOffset.toString)
   }
 }
 ssc
   }



 On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Sounds like something's not set up right... can you post a minimal code
 example that reproduces the issue?

 On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com
 wrote:

 Yeah. All messages are lost while the streaming job was down.

 On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Are you actually losing messages then?

 On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com
 wrote:

 No; first batch only contains messages received after the second job
 starts (messages come in at a steady rate of about 400/second).

 On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Does the first batch after restart contain all the messages received
 while the job was down?

 On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com
 wrote:

 Hello,

 I'm using direct spark streaming (from kafka) with checkpointing, and
 everything works well until a restart. When I shut down (^C) the
 first
 streaming job, wait 1 minute, then re-submit, there is somehow a
 series of 0
 event batches that get queued (corresponding to the 1 minute when
 the job
 was down). Eventually, the batches would resume processing, and I
 would see
 that each batch has roughly 2000 events.

 I see that at the beginning of the 

Re: Build k-NN graph for large dataset

2015-08-26 Thread Kristina Rogale Plazonic
If you don't want to compute all N^2 similarities, you need to implement
some kind of blocking first. For example, LSH (locally sensitive hashing).
A quick search gave this link to a Spark implementation:

http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing

On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa jaon...@gmail.com
wrote:

 Dear all,

 I'm trying to find an efficient way to build a k-NN graph for a large
 dataset. Precisely, I have a large set of high dimensional vector (say d
  1) and I want to build a graph where those high dimensional points
 are the vertices and each one is linked to the k-nearest neighbor based on
 some kind similarity defined on the vertex spaces.
 My problem is to implement an efficient algorithm to compute the weight
 matrix of the graph. I need to compute a N*N similarities and the only way
 I know is to use cartesian operation follow by map operation on RDD.
 But, this is very slow when the N is large. Is there a more cleaver way to
 do this for an arbitrary similarity function ?

 Cheers,

 Jao



Re: Custom Offset Management

2015-08-26 Thread Cody Koeninger
That argument takes a function from MessageAndMetadata to whatever you want
your stream to contain.

See

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala#L57

On Wed, Aug 26, 2015 at 7:55 AM, Deepesh Maheshwari 
deepesh.maheshwar...@gmail.com wrote:

 Hi Folks,

 My Spark application interacts with kafka for getting data through Java
 Api.
 I am using Direct Approach (No Receivers) - which use Kafka’s simple
 consumer API to Read data.
 So, kafka offsets need to be handles explicitly.

 In case of Spark failure i need to save the offset state of kafka for
 resuming from the failure point.
 I am saving these points in MongoDB.

 Please tell he how to initialize Kafka DirectStream with saved offset
 points.
 I want to initialize kafka stream in Spark Streaming with required offset
 points.

 There is method i gets on web.

 KafkaUtils.createDirectStream(jssc, String.class, String.class,
 StringDecoder.class, StringDecoder.class, kafkaParams,
 topicsSet, fromOffsets, arg8);

 arg8 - kafka.message.MessageAndMetadata

 Please tell me how to handle and initialize this.

 Regards,
 Deepesh



Setting number of CORES from inside the Topology (JAVA code )

2015-08-26 Thread anshu shukla
Hey ,

I  need to set the number of cores from inside the topology . Its working
fine  by setting in  spark-env.sh but  unable to do  via setting key/value
for  conf .

SparkConf sparkConf = new
SparkConf().setAppName(JavaCustomReceiver).setMaster(local[4]);

if(toponame.equals(IdentityTopology))
{
sparkConf.setExecutorEnv(SPARK_WORKER_CORES,1);
}




-- 
Thanks  Regards,
Anshu Shukla


Building spark-examples takes too much time using Maven

2015-08-26 Thread Muhammad Haseeb Javed
I checked out the master branch and started playing around with the
examples. I want to build a jar  of the examples as I wish run them using
the modified spark jar that I have. However, packaging spark-examples takes
too much time as maven tries to download the jar dependencies rather than
use the jar that are already present int the system as I extended and
packaged spark itself locally?


Re: Build k-NN graph for large dataset

2015-08-26 Thread Michael Malak
Yes. And a paper that describes using grids (actually varying grids) is 
http://research.microsoft.com/en-us/um/people/jingdw/pubs%5CCVPR12-GraphConstruction.pdf
 In the Spark GraphX In Action book that Robin East and I are writing, we 
implement a drastically simplified version of this in chapter 7, which should 
become available in the MEAP mid-September. 
http://www.manning.com/books/spark-graphx-in-action

  From: Kristina Rogale Plazonic kpl...@gmail.com
 To: Jaonary Rabarisoa jaon...@gmail.com 
Cc: user user@spark.apache.org 
 Sent: Wednesday, August 26, 2015 7:24 AM
 Subject: Re: Build k-NN graph for large dataset
   
If you don't want to compute all N^2 similarities, you need to implement some 
kind of blocking first. For example, LSH (locally sensitive hashing). A quick 
search gave this link to a Spark implementation: 
http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing


On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:

Dear all,
I'm trying to find an efficient way to build a k-NN graph for a large dataset. 
Precisely, I have a large set of high dimensional vector (say d  1) and 
I want to build a graph where those high dimensional points are the vertices 
and each one is linked to the k-nearest neighbor based on some kind similarity 
defined on the vertex spaces. 
My problem is to implement an efficient algorithm to compute the weight matrix 
of the graph. I need to compute a N*N similarities and the only way I know is 
to use cartesian operation follow by map operation on RDD. But, this is 
very slow when the N is large. Is there a more cleaver way to do this for an 
arbitrary similarity function ? 
Cheers,
Jao



  

Re: Build k-NN graph for large dataset

2015-08-26 Thread Charlie Hack
+1 to all of the above esp.  Dimensionality reduction and locality sensitive 
hashing / min hashing. 


There's also an algorithm implemented in MLlib called DIMSUM which was 
developed at Twitter for this purpose. I've been meaning to try it and would be 
interested to hear about results you get. 





https://blog.twitter.com/2014/all-pairs-similarity-via-dimsum













​Charlie 







—
Sent from Mailbox




On Wednesday, Aug 26, 2015 at 09:57, Michael Malak 
michaelma...@yahoo.com.invalid, wrote:


Yes. And a paper that describes using grids (actually varying grids) is 
http://research.microsoft.com/en-us/um/people/jingdw/pubs%5CCVPR12-GraphConstruction.pdf
 In the Spark GraphX In Action book that Robin East and I are writing, we 
implement a drastically simplified version of this in chapter 7, which should 
become available in the MEAP mid-September. 
http://www.manning.com/books/spark-graphx-in-action






   
 


If you don't want to compute all N^2 similarities, you need to implement some 
kind of blocking first. For example, LSH (locally sensitive hashing). A quick 
search gave this link to a Spark implementation: 

http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing












On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:

Dear all,


I'm trying to find an efficient way to build a k-NN graph for a large dataset. 
Precisely, I have a large set of high dimensional vector (say d  1) and 
I want to build a graph where those high dimensional points are the vertices 
and each one is linked to the k-nearest neighbor based on some kind similarity 
defined on the vertex spaces. 
My problem is to implement an efficient algorithm to compute the weight matrix 
of the graph. I need to compute a N*N similarities and the only way I know is 
to use cartesian operation follow by map operation on RDD. But, this is 
very slow when the N is large. Is there a more cleaver way to do this for an 
arbitrary similarity function ? 




Cheers,




Jao

Re: DataFrame/JDBC very slow performance

2015-08-26 Thread Dhaval Patel
Thanks Michael, much appreciated!

Nothing should be held in memory for a query like this (other than a single
count per partition), so I don't think that is the problem.  There is
likely an error buried somewhere.

For your above comments - I don't get any error but just get the NULL as
return value. I have tried digging deeper in the logs etc but couldn't spot
anything. Is there any other suggestions to spot such buried errors?

Thanks,
Dhaval

On Mon, Aug 24, 2015 at 6:38 PM, Michael Armbrust mich...@databricks.com
wrote:

 Much appreciated! I am not comparing with select count(*) for
 performance, but it was one simple thing I tried to check the performance
 :). I think it now makes sense since Spark tries to extract all records
 before doing the count. I thought having an aggregated function query
 submitted over JDBC/Teradata would let Teradata do the heavy lifting.


 We currently only push down filters since there is a lot of variability in
 what types of aggregations various databases support.  You can manually
 pushdown whatever you want by replacing the table name with a subquery
 (i.e. (SELECT ... FROM ...))

- How come my second query for (5B) records didn't return anything
 even after a long processing? If I understood correctly, Spark would try to
 fit it in memory and if not then might use disk space, which I have
 available?


 Nothing should be held in memory for a query like this (other than a
 single count per partition), so I don't think that is the problem.  There
 is likely an error buried somewhere.


  - Am I supposed to do any Spark related tuning to make it work?

 My main need is to access data from these large table(s) on demand and
 provide aggregated and calculated results much quicker, for that  I was
 trying out Spark. Next step I am thinking to export data in Parque files
 and give it a try. Do you have any suggestions for to deal with the problem?


 Exporting to parquet will likely be a faster option that trying to query
 through JDBC, since we have many more opportunities for parallelism here.



Spark-on-YARN LOCAL_DIRS location

2015-08-26 Thread michael.england
Hi,

I am having issues with /tmp space filling up during Spark jobs because 
Spark-on-YARN uses the yarn.nodemanager.local-dirs for shuffle space. I noticed 
this message appears when submitting Spark-on-YARN jobs:

WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by 
the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone 
and LOCAL_DIRS in YARN).

I can’t find much documentation on where to set the LOCAL_DIRS property. Please 
can someone advise whether this is a yarn-env.sh or a spark-env.sh property and 
whether it would then use the directory specified by this env variable as a 
shuffle area instead of the default yarn.nodemanager.local-dirs location?

Thanks,
Mike


This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to Nomura is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm



Dataframe collect() work but count() fails

2015-08-26 Thread Srikanth
Hello,

I'm seeing a strange behavior where count() on a DataFrame errors as shown
below but collect() works fine.
This is what I tried from spark-shell. solrRDD.queryShards() return a
javaRDD.

val rdd = solrRDD.queryShards(sc, query, _version_, 2).rdd
 rdd: org.apache.spark.rdd.RDD[org.apache.solr.common.SolrDocument] =
 MapPartitionsRDD[3] at flatMap at SolrRDD.java:335



 scala val schema = solrRDD.getQuerySchema(query)
 schema: org.apache.spark.sql.types.StructType =
 StructType(StructField(ApplicationType,StringType,true),
 StructField(Language,StringType,true),
 StructField(MfgCode,StringType,true),
 StructField(OpSystemCode,StringType,true),
 StructField(ProductCode,StringType,true),
 StructField(ProductName,StringType,true),
 StructField(ProductVersion,StringType,true),
 StructField(_version_,LongType,true), StructField(id,StringType,true))



 scala val rows = rdd.map(doc = RowFactory.create(schema.fieldNames.map(f
 = doc.getFirstValue(f))) ) //Convert RDD[SolrDocument] to RDD[Row]
 scala val df = sqlContext.createDataFrame(rows, schema)


scala val data = df.collect
 data: Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@2135773a],
 [[Ljava.lang.Object;@3d2691de], [[Ljava.lang.Object;@2f32a52f],
 [[Ljava.lang.Object;@25fac8de]



 scala df.count
 15/08/26 14:53:28 WARN TaskSetManager: Lost task 1.3 in stage 6.0 (TID 42,
 172.19.110.1): java.lang.AssertionError: assertion failed: Row column
 number mismatch, expected 9 columns, but got 1.
 Row content: [[Ljava.lang.Object;@1d962eb2]
 at scala.Predef$.assert(Predef.scala:179)
 at
 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:140)
 at
 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124)
 at
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)


Any idea what is wrong here?

Srikanth


Re: Spark cluster multi tenancy

2015-08-26 Thread Jerrick Hoang
Would be interested to know the answer too.

On Wed, Aug 26, 2015 at 11:45 AM, Sadhan Sood sadhan.s...@gmail.com wrote:

 Interestingly, if there is nothing running on dev spark-shell, it recovers
 successfully and regains the lost executors. Attaching the log for that.
 Notice, the Registering block manager .. statements in the very end after
 all executors were lost.

 On Wed, Aug 26, 2015 at 11:27 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Attaching log for when the dev job gets stuck (once all its executors are
 lost due to preemption). This is a spark-shell job running in yarn-client
 mode.

 On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Hi All,

 We've set up our spark cluster on aws running on yarn (running on hadoop
 2.3) with fair scheduling and preemption turned on. The cluster is shared
 for prod and dev work where prod runs with a higher fair share and can
 preempt dev jobs if there are not enough resources available for it.
 It appears that dev jobs which get preempted often get unstable after
 losing some executors and the whole jobs gets stuck (without making any
 progress) or end up getting restarted (and hence losing all the work done).
 Has someone encountered this before ? Is the solution just to set 
 spark.task.maxFailures
 to a really high value to recover from task failures in such scenarios? Are
 there other approaches that people have taken for spark multi tenancy that
 works better in such scenario?

 Thanks,
 Sadhan





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Differing performance in self joins

2015-08-26 Thread Michael Armbrust
-dev +user

I'd suggest running .explain() on both dataframes to understand the
performance better.  The problem is likely that we have a pattern that
looks for cases where you have an equality predicate where either side can
be evaluated using one side of the join.  We turn this into a hash join.

(df(eday) - laggard(p_eday)) === 1) is pretty tricky for us to
understand, and so the pattern misses the possible optimized plan.

On Wed, Aug 26, 2015 at 6:10 PM, David Smith das...@gmail.com wrote:

 I've noticed that two queries, which return identical results, have very
 different performance. I'd be interested in any hints about how avoid
 problems like this.

 The DataFrame df contains a string field series and an integer eday,
 the
 number of days since (or before) the 1970-01-01 epoch.

 I'm doing some analysis over a sliding date window and, for now, avoiding
 UDAFs. I'm therefore using a self join. First, I create

 val laggard = df.withColumnRenamed(series,
 p_series).withColumnRenamed(eday, p_eday)

 Then, the following query runs in 16s:

 df.join(laggard, (df(series) === laggard(p_series))  (df(eday) ===
 (laggard(p_eday) + 1))).count

 while the following query runs in 4 - 6 minutes:

 df.join(laggard, (df(series) === laggard(p_series))  ((df(eday) -
 laggard(p_eday)) === 1)).count

 It's worth noting that the series term is necessary to keep the query from
 doing a complete cartesian product over the data.

 Ideally, I'd like to look at lags of more than one day, but the following
 is
 equally slow:

 df.join(laggard, (df(series) === laggard(p_series))  (df(eday) -
 laggard(p_eday)).between(1,7)).count

 Any advice about the general principle at work here would be welcome.

 Thanks,
 David



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/Differing-performance-in-self-joins-tp13864.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




Re: Help! Stuck using withColumn

2015-08-26 Thread Silvio Fiorito
Hi Saif,

In both cases you’re referencing columns that don’t exist in the current 
DataFrame.

The first email you did a select and then a withColumn for ‘month_date_cur' on 
the resulting DF, but that column does not exist, because you did a select for 
only ‘month_balance’.

In the second email you’re using 2 different DFs and trying to select a column 
from one in a withColumn on the other, that just wouldn’t work. Also, there’s 
no explicit column names given to either DF, so that column doesn’t exist.

Did you intend to do a join instead?

Thanks,
Silvio

From: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com
Date: Wednesday, August 26, 2015 at 6:06 PM
To: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com, 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: RE: Help! Stuck using withColumn

I can reproduce this even simpler with the following:

val gf = sc.parallelize(Array(3,6,4,7,3,4,5,5,31,4,5,2)).toDF(ASD)
val ff = sc.parallelize(Array(4,6,2,3,5,1,4,6,23,6,4,7)).toDF(GFD)

gf.withColumn(DSA, ff.col(GFD))

org.apache.spark.sql.AnalysisException: resolved attribute(s) GFD#421 missing 
from ASD#419 in operator !Project [ASD#419,GFD#421 AS DSA#422];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)


From: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com 
[mailto:saif.a.ell...@wellsfargo.com]
Sent: Wednesday, August 26, 2015 6:47 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Help! Stuck using withColumn

This simple comand call:

val final_df = data.select(month_balance).withColumn(month_date, 
data.col(month_date_curr))

Is throwing:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
month_date_curr#324 missing from month_balance#234 in operator !Project 
[month_balance#234, month_date_curr#324 AS month_date_curr#408];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)





Re: Join with multiple conditions (In reference to SPARK-7197)

2015-08-26 Thread Michal Monselise
Davies, I created an issue - SPARK-10246
https://issues.apache.org/jira/browse/SPARK-10246

On Tue, Aug 25, 2015 at 12:53 PM, Davies Liu dav...@databricks.com wrote:

 It's good to support this, could you create a JIRA for it and target for
 1.6?

 On Tue, Aug 25, 2015 at 11:21 AM, Michal Monselise
 michal.monsel...@gmail.com wrote:
 
  Hello All,
 
  PySpark currently has two ways of performing a join: specifying a join
 condition or column names.
 
  I would like to perform a join using a list of columns that appear in
 both the left and right DataFrames. I have created an example in this
 question on Stack Overflow.
 
  Basically, I would like to do the following as specified in the
 documentation in  /spark/python/pyspark/sql/dataframe.py row 560 and
 specify a list of column names:
 
   df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
 
  However, this produces an error.
 
  In JIRA issue SPARK-7197, it is mentioned that the syntax is actually
 different from the one specified in the documentation for joining using a
 condition.
 
  Documentation:
   cond = [df.name == df3.name, df.age == df3.age]  df.join(df3,
 cond, 'outer').select(df.name, df3.age).collect()
 
  JIRA Issue:
 
  a.join(b, (a.year==b.year)  (a.month==b.month), 'inner')
 
 
  In other words. the join function cannot take a list.
  I was wondering if you could also clarify what is the correct syntax for
 providing a list of columns.
 
 
  Thanks,
  Michal
 
 



error accessing vertexRDD

2015-08-26 Thread dizzy5112
Hi all, question on an issue im having with a vertexRDD. If i kick of my
spark shell with something like this:



then run:


it will finish and give me the count but is see a few errors (see below).
This is okay for this small dataset but when trying with a large data set it
doesnt finish because of the number of errors. this works okay if i kick of
my spark shell with master = local.  Any help appreciated





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/error-accessing-vertexRDD-tp24466.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: query avro hive table in spark sql

2015-08-26 Thread Michael Armbrust
I'd suggest looking at
http://spark-packages.org/package/databricks/spark-avro

On Wed, Aug 26, 2015 at 11:32 AM, gpatcham gpatc...@gmail.com wrote:

 Hi,

 I'm trying to query hive table which is based on avro in spark SQL and
 seeing below errors.

 15/08/26 17:51:12 WARN avro.AvroSerdeUtils: Encountered AvroSerdeException
 determining schema. Returning signal schema to indicate problem
 org.apache.hadoop.hive.serde2.avro.AvroSerdeException: Neither
 avro.schema.literal nor avro.schema.url specified, can't determine table
 schema
 at

 org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrThrowException(AvroSerdeUtils.java:68)
 at

 org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrReturnErrorSchema(AvroSerdeUtils.java:93)
 at
 org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:60)
 at

 org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:375)
 at

 org.apache.hadoop.hive.ql.metadata.Partition.getDeserializer(Partition.java:249)


 Its not able to determine schema. Hive table is pointing to avro schema
 using url. I'm stuck and couldn't find more info on this.

 Any pointers ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/query-avro-hive-table-in-spark-sql-tp24462.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-26 Thread Michael Armbrust
I'd suggest setting sbt to fork when running tests.

On Wed, Aug 26, 2015 at 10:51 AM, Mike Trienis mike.trie...@orcsol.com
wrote:

 Thanks for your response Yana,

 I can increase the MaxPermSize parameter and it will allow me to run the
 unit test a few more times before I run out of memory.

 However, the primary issue is that running the same unit test in the same
 JVM (multiple times) results in increased memory (each run of the unit
 test) and I believe it has something to do with HiveContext not reclaiming
 memory after it is finished (or I'm not shutting it down properly).

 It could very well be related to sbt, however, it's not clear to me.


 On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 The PermGen space error is controlled with MaxPermSize parameter. I run
 with this in my pom, I think copied pretty literally from Spark's own
 tests... I don't know what the sbt equivalent is but you should be able to
 pass it...possibly via SBT_OPTS?


  plugin
   groupIdorg.scalatest/groupId
   artifactIdscalatest-maven-plugin/artifactId
   version1.0/version
   configuration

 reportsDirectory${project.build.directory}/surefire-reports/reportsDirectory
   parallelfalse/parallel
   junitxml./junitxml
   filereportsSparkTestSuite.txt/filereports
   argLine-Xmx3g -XX:MaxPermSize=256m
 -XX:ReservedCodeCacheSize=512m/argLine
   stderr/
   systemProperties
   java.awt.headlesstrue/java.awt.headless
   spark.testing1/spark.testing
   spark.ui.enabledfalse/spark.ui.enabled

 spark.driver.allowMultipleContextstrue/spark.driver.allowMultipleContexts
   /systemProperties
   /configuration
   executions
   execution
   idtest/id
   goals
   goaltest/goal
   /goals
   /execution
   /executions
   /plugin
   /plugins


 On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hello,

 I am using sbt and created a unit test where I create a `HiveContext`
 and execute some query and then return. Each time I run the unit test the
 JVM will increase it's memory usage until I get the error:

 Internal error when running tests: java.lang.OutOfMemoryError: PermGen
 space
 Exception in thread Thread-2 java.io.EOFException

 As a work-around, I can fork a new JVM each time I run the unit test,
 however, it seems like a bad solution as takes a while to run the unit
 test.

 By the way, I tried to importing the TestHiveContext:

- import org.apache.spark.sql.hive.test.TestHiveContext

 However, it suffers from the same memory issue. Has anyone else suffered
 from the same problem? Note that I am running these unit tests on my mac.

 Cheers, Mike.






Re: JDBC Streams

2015-08-26 Thread Chen Song
Thanks Cody.

Are you suggesting to put the cache in global context in each executor JVM,
in a Scala object for example. Then have a scheduled task to refresh the
cache (or triggered by the expiry if Guava)?

Chen

On Wed, Aug 26, 2015 at 10:51 AM, Cody Koeninger c...@koeninger.org wrote:

 If your data only changes every few days, why not restart the job every
 few days, and just broadcast the data?

 Or you can keep a local per-jvm cache with an expiry (e.g. guava cache) to
 avoid many mysql reads

 On Wed, Aug 26, 2015 at 9:46 AM, Chen Song chen.song...@gmail.com wrote:

 Piggyback on this question.

 I have a similar use case but a bit different. My job is consuming a
 stream from Kafka and I need to join the Kafka stream with some reference
 table from MySQL (kind of data validation and enrichment). I need to
 process this stream every 1 min. The data in MySQL is not changed very
 often, maybe once a few days.

 So my requirement is:

 * I cannot easily use broadcast variable because the data does change,
 although not very often.
 * I am not sure if it is good practice to read data from MySQL in every
 batch (in my case, 1 min).

 Anyone has done this before, any suggestions and feedback is appreciated.

 Chen


 On Sun, Jul 5, 2015 at 11:50 AM, Ashic Mahtab as...@live.com wrote:

 If it is indeed a reactive use case, then Spark Streaming would be a
 good choice.

 One approach worth considering - is it possible to receive a message via
 kafka (or some other queue). That'd not need any polling, and you could use
 standard consumers. If polling isn't an issue, then writing a custom
 receiver will work fine. The way a receiver works is this:

 * Your receiver has a receive() function, where you'd typically start a
 loop. In your loop, you'd fetch items, and call store(entry).
 * You control everything in the receiver. If you're listening on a
 queue, you receive messages, store() and ack your queue. If you're polling,
 it's up to you to ensure delays between db calls.
 * The things you store() go on to make up the rdds in your DStream. So,
 intervals, windowing, etc. apply to those. The receiver is the boundary
 between your data source and the DStream RDDs. In other words, if your
 interval is 15 seconds with no windowing, then the things that went to
 store() every 15 seconds are bunched up into an RDD of your DStream. That's
 kind of a simplification, but should give you the idea that your db
 polling interval and streaming interval are not tied together.

 -Ashic.

 --
 Date: Mon, 6 Jul 2015 01:12:34 +1000
 Subject: Re: JDBC Streams
 From: guha.a...@gmail.com
 To: as...@live.com
 CC: ak...@sigmoidanalytics.com; user@spark.apache.org


 Hi

 Thanks for the reply. here is my situation: I hve a DB which enbles
 synchronus CDC, think this as a DBtrigger which writes to a taable with
 changed values as soon as something changes in production table. My job
 will need to pick up the data as soon as it arrives which can be every 1
 min interval. Ideally it will pick up the changes, transform it into a
 jsonand puts it to kinesis. In short, I am emulating a Kinesis producer
 with a DB source (dont even ask why, lets say these are the constraints :) )

 Please advice (a) is spark a good choice here (b)  whats your suggestion
 either way.

 I understand I can easily do it using a simple java/python app but I am
 little worried about managing scaling/fault tolerance and thats where my
 concern is.

 TIA
 Ayan

 On Mon, Jul 6, 2015 at 12:51 AM, Ashic Mahtab as...@live.com wrote:

 Hi Ayan,
 How continuous is your workload? As Akhil points out, with streaming,
 you'll give up at least one core for receiving, will need at most one more
 core for processing. Unless you're running on something like Mesos, this
 means that those cores are dedicated to your app, and can't be leveraged by
 other apps / jobs.

 If it's something periodic (once an hour, once every 15 minutes, etc.),
 then I'd simply write a normal spark application, and trigger it
 periodically. There are many things that can take care of that - sometimes
 a simple cronjob is enough!

 --
 Date: Sun, 5 Jul 2015 22:48:37 +1000
 Subject: Re: JDBC Streams
 From: guha.a...@gmail.com
 To: ak...@sigmoidanalytics.com
 CC: user@spark.apache.org


 Thanks Akhil. In case I go with spark streaming, I guess I have to
 implment a custom receiver and spark streaming will call this receiver
 every batch interval, is that correct? Any gotcha you see in this plan?
 TIA...Best, Ayan

 On Sun, Jul 5, 2015 at 5:40 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 If you want a long running application, then go with spark streaming
 (which kind of blocks your resources). On the other hand, if you use job
 server then you can actually use the resources (CPUs) for other jobs also
 when your dbjob is not using them.

 Thanks
 Best Regards

 On Sun, Jul 5, 2015 at 5:28 AM, ayan guha guha.a...@gmail.com wrote:

 Hi All

 

spark streaming 1.3 kafka topic error

2015-08-26 Thread Shushant Arora
Hi

My streaming application gets killed with below error

5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
[testtopic,193]))
15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for
time 144062612 ms
org.apache.spark.SparkException:
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([testtopic,115]))
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at



Kafka params in job logs printed are :
 value.serializer = class
org.apache.kafka.common.serialization.StringSerializer
key.serializer = class
org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full = true
retry.backoff.ms = 100
buffer.memory = 1048576
batch.size = 16384
metrics.sample.window.ms = 3
metadata.max.age.ms = 30
receive.buffer.bytes = 32768
timeout.ms = 3
max.in.flight.requests.per.connection = 5
bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
metric.reporters = []
client.id =
compression.type = none
retries = 0
max.request.size = 1048576
send.buffer.bytes = 131072
acks = all
reconnect.backoff.ms = 10
linger.ms = 0
metrics.num.samples = 2
metadata.fetch.timeout.ms = 6


Is it kafka broker getting down and job is getting killed ? Whats the best
way to handle it ?
Increasing retries and backoff time  wil help and to what values those
should be set to never have streaming application failure - rather it keep
on retrying after few seconds and send a event so that my custom code can
send notification of kafka broker down if its because of that.


Thanks


Re: spark streaming 1.3 kafka buffer size

2015-08-26 Thread Shushant Arora
Can I change this param fetch.message.max.bytes  or
spark.streaming.kafka.maxRatePerPartition
at run time across batches.
Say I detected some fail condition in my system and I decided to sonsume i
next batch interval only 10 messages per partition and if that succeed I
reset the max limit to unlimited again .

On Wed, Aug 26, 2015 at 9:32 PM, Cody Koeninger c...@koeninger.org wrote:

 see http://kafka.apache.org/documentation.html#consumerconfigs

 fetch.message.max.bytes

 in the kafka params passed to the constructor


 On Wed, Aug 26, 2015 at 10:39 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 whats the default buffer in spark streaming 1.3 for  kafka messages.

 Say In this run it has to fetch messages from offset 1 to 1. will it
 fetch all in one go or internally it fetches messages in  few messages
 batch.

 Is there any setting to configure this no of offsets fetched in one batch?