Feedback: Feature request
Hey all, In working with the DecisionTree classifier, I found it difficult to extract rules that could easily facilitate visualization with libraries like D3. So for example, using : print(model.toDebugString()), I get the following result = If (feature 0 = -35.0) If (feature 24 = 176.0) Predict: 2.1 If (feature 24 = 176.0) Predict: 4.2 Else (feature 24 176.0) Predict: 6.3 Else (feature 0 -35.0) If (feature 24 = 11.0) Predict: 4.5 Else (feature 24 11.0) Predict: 10.2 But ideally, I could see results in a more parseable format like JSON: { node: [ { name:node1, rule:feature 0 = -35.0, children:[ { name:node2, rule:feature 24 = 176.0, children:[ { name:node4, rule:feature 20 116.0, predict: 2.1 }, { name:node5, rule:feature 20 = 116.0, predict: 4.2 }, { name:node5, rule:feature 20 116.0, predict: 6.3 } ] }, { name:node3, rule:feature 0 -35.0, children:[ { name:node7, rule:feature 3 = 11.0, predict: 4.5 }, { name:node8, rule:feature 3 11.0, predict: 10.2 } ] } ] } ] } Food for thought! Thanks, Jim
Re: Spark cluster multi tenancy
Interestingly, if there is nothing running on dev spark-shell, it recovers successfully and regains the lost executors. Attaching the log for that. Notice, the Registering block manager .. statements in the very end after all executors were lost. On Wed, Aug 26, 2015 at 11:27 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Attaching log for when the dev job gets stuck (once all its executors are lost due to preemption). This is a spark-shell job running in yarn-client mode. On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi All, We've set up our spark cluster on aws running on yarn (running on hadoop 2.3) with fair scheduling and preemption turned on. The cluster is shared for prod and dev work where prod runs with a higher fair share and can preempt dev jobs if there are not enough resources available for it. It appears that dev jobs which get preempted often get unstable after losing some executors and the whole jobs gets stuck (without making any progress) or end up getting restarted (and hence losing all the work done). Has someone encountered this before ? Is the solution just to set spark.task.maxFailures to a really high value to recover from task failures in such scenarios? Are there other approaches that people have taken for spark multi tenancy that works better in such scenario? Thanks, Sadhan spark_job_recovers.log Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
I'd be less concerned about what the streaming ui shows than what's actually going on with the job. When you say you were losing messages, how were you observing that? The UI, or actual job output? The log lines you posted indicate that the checkpoint was restored and those offsets were processed; what are the log lines for the following KafkaRDD ? On Wed, Aug 26, 2015 at 2:04 PM, Susan Zhang suchenz...@gmail.com wrote: Compared offsets, and it continues from checkpoint loading: 15/08/26 11:24:54 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446), (install-json,4,825772921,825773536), (install-json,1,831654775,831655076), (install-json,0,1296018451,1296018810), (install-json,2,824785282,824785696), (install-json,3, 811428882,811429181)] 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json, partition 0 offsets 1296018451 - 1296018810 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 4 offsets 825773536 - 825907428 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 2 offsets 824785696 - 824889957 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 3 offsets 811429181 - 811529084 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 1 offsets 831655076 - 831729964 ... But for some reason the streaming UI shows it as computing 0 events. Removing the call to checkpoint does remove the queueing of 0 event batches, since offsets just skip to the latest (checked that the first part.fromOffset in the restarted job is larger than the last part.untilOffset before restart). On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger c...@koeninger.org wrote: When the kafka rdd is actually being iterated on the worker, there should be an info line of the form log.info(sComputing topic ${part.topic}, partition ${part.partition} + soffsets ${part.fromOffset} - ${part.untilOffset}) You should be able to compare that to log of offsets during checkpoint loading, to see if they line up. Just out of curiosity, does removing the call to checkpoint on the stream affect anything? On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang suchenz...@gmail.com wrote: Thanks for the suggestions! I tried the following: I removed createOnError = true And reran the same process to reproduce. Double checked that checkpoint is loading: 15/08/26 10:10:40 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528), (install-json,4,825400856,825401058), (install-json,1,831453228,831453396), (install-json,0,1295759888,1295760378), (install-json,2,824443526,82409), (install-json,3, 811222580,811222874)] 15/08/26 10:10:40 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791), (install-json,4,825401058,825401249), (install-json,1,831453396,831453603), (install-json,0,1295760378,1295760809), (install-json,2,82409,824445510), (install-json,3, 811222874,811223285)] ... And the same issue is appearing as before (with 0 event batches getting queued corresponding to dropped messages). Our kafka brokers are on version 0.8.2.0, if that makes a difference. Also as a sanity check, I took out the ZK updates and reran (just in case that was somehow causing problems), and that didn't change anything as expected. Furthermore, the 0 event batches seem to take longer to process than batches with the regular load of events: processing time for 0 event batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000 event batches is consistently 1s. Why would that happen? As for the checkpoint call: directKStream.checkpoint(checkpointDuration) was an attempt to set the checkpointing interval (at some multiple of the batch interval), whereas StreamingContext.checkpoint seems like it will only set the checkpoint directory. Thanks for all the help, Susan On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger c...@koeninger.org wrote: The first thing that stands out to me is createOnError = true Are you sure the checkpoint is actually loading, as opposed to failing and starting the job anyway? There should be info lines that look like INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 144059718 ms [(test,1,162,220) You should be able to tell from those whether the offset ranges being loaded from the checkpoint look reasonable. Also, is there a reason you're calling directKStream.checkpoint(checkpointDuration) Just calling checkpoint on the streaming context should be sufficient to save the metadata On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang suchenz...@gmail.com wrote:
Re: Efficient sampling from a Hive table
Using TABLESAMPLE(0.1) is actually way worse. Spark first spends 12 minutes to discover all split files on all hosts (for some reason) before it even starts the job, and then it creates 3.5 million tasks (the partition has ~32k split files). On Wed, Aug 26, 2015 at 9:36 AM, Jörn Franke jornfra...@gmail.com wrote: Have you tried tablesample? You find the exact syntax in the documentation, but it exlxactly does what you want Le mer. 26 août 2015 à 18:12, Thomas Dudziak tom...@gmail.com a écrit : Sorry, I meant without reading from all splits. This is a single partition in the table. On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak tom...@gmail.com wrote: I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from and I don't particularly care which rows. Doing a LIMIT unfortunately results in two stages where the first stage reads the whole table, and the second then performs the limit with a single worker, which is not very efficient. Is there a better way to sample a subset of rows in Spark without, ideally in a single stage without reading all partitions. cheers, Tom
query avro hive table in spark sql
Hi, I'm trying to query hive table which is based on avro in spark SQL and seeing below errors. 15/08/26 17:51:12 WARN avro.AvroSerdeUtils: Encountered AvroSerdeException determining schema. Returning signal schema to indicate problem org.apache.hadoop.hive.serde2.avro.AvroSerdeException: Neither avro.schema.literal nor avro.schema.url specified, can't determine table schema at org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrThrowException(AvroSerdeUtils.java:68) at org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrReturnErrorSchema(AvroSerdeUtils.java:93) at org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:60) at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:375) at org.apache.hadoop.hive.ql.metadata.Partition.getDeserializer(Partition.java:249) Its not able to determine schema. Hive table is pointing to avro schema using url. I'm stuck and couldn't find more info on this. Any pointers ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/query-avro-hive-table-in-spark-sql-tp24462.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Efficient sampling from a Hive table
I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from and I don't particularly care which rows. Doing a LIMIT unfortunately results in two stages where the first stage reads the whole table, and the second then performs the limit with a single worker, which is not very efficient. Is there a better way to sample a subset of rows in Spark without, ideally in a single stage without reading all partitions. cheers, Tom
Re: Efficient sampling from a Hive table
Have you tried tablesample? You find the exact syntax in the documentation, but it exlxactly does what you want Le mer. 26 août 2015 à 18:12, Thomas Dudziak tom...@gmail.com a écrit : Sorry, I meant without reading from all splits. This is a single partition in the table. On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak tom...@gmail.com wrote: I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from and I don't particularly care which rows. Doing a LIMIT unfortunately results in two stages where the first stage reads the whole table, and the second then performs the limit with a single worker, which is not very efficient. Is there a better way to sample a subset of rows in Spark without, ideally in a single stage without reading all partitions. cheers, Tom
Re: Spark 1.3.1 saveAsParquetFile hangs on app exit
spark-shell-hang-on-exit.tdump http://apache-spark-user-list.1001560.n3.nabble.com/file/n24461/spark-shell-hang-on-exit.tdump -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-saveAsParquetFile-hangs-on-app-exit-tp24460p24461.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Thanks for the suggestions! I tried the following: I removed createOnError = true And reran the same process to reproduce. Double checked that checkpoint is loading: 15/08/26 10:10:40 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528), (install-json,4,825400856,825401058), (install-json,1,831453228,831453396), (install-json,0,1295759888,1295760378), (install-json,2,824443526,82409), (install-json,3, 811222580,811222874)] 15/08/26 10:10:40 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791), (install-json,4,825401058,825401249), (install-json,1,831453396,831453603), (install-json,0,1295760378,1295760809), (install-json,2,82409,824445510), (install-json,3, 811222874,811223285)] ... And the same issue is appearing as before (with 0 event batches getting queued corresponding to dropped messages). Our kafka brokers are on version 0.8.2.0, if that makes a difference. Also as a sanity check, I took out the ZK updates and reran (just in case that was somehow causing problems), and that didn't change anything as expected. Furthermore, the 0 event batches seem to take longer to process than batches with the regular load of events: processing time for 0 event batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000 event batches is consistently 1s. Why would that happen? As for the checkpoint call: directKStream.checkpoint(checkpointDuration) was an attempt to set the checkpointing interval (at some multiple of the batch interval), whereas StreamingContext.checkpoint seems like it will only set the checkpoint directory. Thanks for all the help, Susan On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger c...@koeninger.org wrote: The first thing that stands out to me is createOnError = true Are you sure the checkpoint is actually loading, as opposed to failing and starting the job anyway? There should be info lines that look like INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 144059718 ms [(test,1,162,220) You should be able to tell from those whether the offset ranges being loaded from the checkpoint look reasonable. Also, is there a reason you're calling directKStream.checkpoint(checkpointDuration) Just calling checkpoint on the streaming context should be sufficient to save the metadata On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang suchenz...@gmail.com wrote: Sure thing! The main looks like: -- val kafkaBrokers = conf.getString(s$varPrefix.metadata.broker.list) val kafkaConf = Map( zookeeper.connect - zookeeper, group.id - options.group, zookeeper.connection.timeout.ms - 1, auto.commit.interval.ms - 1000, rebalance.max.retries - 25, bootstrap.servers - kafkaBrokers ) val ssc = StreamingContext.getOrCreate(checkpointDirectory, () = { createContext(kafkaConf, checkpointDirectory, topic, numThreads, isProd) }, createOnError = true) ssc.start() ssc.awaitTermination() -- And createContext is defined as: -- val batchDuration = Seconds(5) val checkpointDuration = Seconds(20) private val AUTO_OFFSET_COMMIT = auto.commit.enable def createContext(kafkaConf: Map[String, String], checkpointDirectory: String, topic: String, numThreads: Int, isProd: Boolean) : StreamingContext = { val sparkConf = new SparkConf().setAppName(***) val ssc = new StreamingContext(sparkConf, batchDuration) ssc.checkpoint(checkpointDirectory) val topicSet = topic.split(,).toSet val groupId = kafkaConf.getOrElse(group.id, ) val directKStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicSet) directKStream.checkpoint(checkpointDuration) val table = *** directKStream.foreachRDD { rdd = val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.flatMap(rec = someFunc(rec)) .reduceByKey((i1: Long, i2: Long) = if (i1 i2) i1 else i2) .foreachPartition { partitionRec = val dbWrite = DynamoDBWriter() partitionRec.foreach { /* Update Dynamo Here */ } } /** Set up ZK Connection **/ val props = new Properties() kafkaConf.foreach(param = props.put(param._1, param._2)) props.setProperty(AUTO_OFFSET_COMMIT, false) val consumerConfig = new
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Compared offsets, and it continues from checkpoint loading: 15/08/26 11:24:54 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446), (install-json,4,825772921,825773536), (install-json,1,831654775,831655076), (install-json,0,1296018451,1296018810), (install-json,2,824785282,824785696), (install-json,3, 811428882,811429181)] 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json, partition 0 offsets 1296018451 - 1296018810 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 4 offsets 825773536 - 825907428 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 2 offsets 824785696 - 824889957 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 3 offsets 811429181 - 811529084 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 1 offsets 831655076 - 831729964 ... But for some reason the streaming UI shows it as computing 0 events. Removing the call to checkpoint does remove the queueing of 0 event batches, since offsets just skip to the latest (checked that the first part.fromOffset in the restarted job is larger than the last part.untilOffset before restart). On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger c...@koeninger.org wrote: When the kafka rdd is actually being iterated on the worker, there should be an info line of the form log.info(sComputing topic ${part.topic}, partition ${part.partition} + soffsets ${part.fromOffset} - ${part.untilOffset}) You should be able to compare that to log of offsets during checkpoint loading, to see if they line up. Just out of curiosity, does removing the call to checkpoint on the stream affect anything? On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang suchenz...@gmail.com wrote: Thanks for the suggestions! I tried the following: I removed createOnError = true And reran the same process to reproduce. Double checked that checkpoint is loading: 15/08/26 10:10:40 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528), (install-json,4,825400856,825401058), (install-json,1,831453228,831453396), (install-json,0,1295759888,1295760378), (install-json,2,824443526,82409), (install-json,3, 811222580,811222874)] 15/08/26 10:10:40 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791), (install-json,4,825401058,825401249), (install-json,1,831453396,831453603), (install-json,0,1295760378,1295760809), (install-json,2,82409,824445510), (install-json,3, 811222874,811223285)] ... And the same issue is appearing as before (with 0 event batches getting queued corresponding to dropped messages). Our kafka brokers are on version 0.8.2.0, if that makes a difference. Also as a sanity check, I took out the ZK updates and reran (just in case that was somehow causing problems), and that didn't change anything as expected. Furthermore, the 0 event batches seem to take longer to process than batches with the regular load of events: processing time for 0 event batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000 event batches is consistently 1s. Why would that happen? As for the checkpoint call: directKStream.checkpoint(checkpointDuration) was an attempt to set the checkpointing interval (at some multiple of the batch interval), whereas StreamingContext.checkpoint seems like it will only set the checkpoint directory. Thanks for all the help, Susan On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger c...@koeninger.org wrote: The first thing that stands out to me is createOnError = true Are you sure the checkpoint is actually loading, as opposed to failing and starting the job anyway? There should be info lines that look like INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 144059718 ms [(test,1,162,220) You should be able to tell from those whether the offset ranges being loaded from the checkpoint look reasonable. Also, is there a reason you're calling directKStream.checkpoint(checkpointDuration) Just calling checkpoint on the streaming context should be sufficient to save the metadata On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang suchenz...@gmail.com wrote: Sure thing! The main looks like: -- val kafkaBrokers = conf.getString(s$varPrefix.metadata.broker.list) val kafkaConf = Map( zookeeper.connect - zookeeper, group.id - options.group, zookeeper.connection.timeout.ms - 1, auto.commit.interval.ms - 1000, rebalance.max.retries - 25, bootstrap.servers -
Re: Spark cluster multi tenancy
Attaching log for when the dev job gets stuck (once all its executors are lost due to preemption). This is a spark-shell job running in yarn-client mode. On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi All, We've set up our spark cluster on aws running on yarn (running on hadoop 2.3) with fair scheduling and preemption turned on. The cluster is shared for prod and dev work where prod runs with a higher fair share and can preempt dev jobs if there are not enough resources available for it. It appears that dev jobs which get preempted often get unstable after losing some executors and the whole jobs gets stuck (without making any progress) or end up getting restarted (and hence losing all the work done). Has someone encountered this before ? Is the solution just to set spark.task.maxFailures to a really high value to recover from task failures in such scenarios? Are there other approaches that people have taken for spark multi tenancy that works better in such scenario? Thanks, Sadhan spark_job_stuck.log Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can RDD be shared accross the cluster by other drivers?
Hi, Guys, Is it possible that RDD created by driver A be used driver B? Thanks!
Re: Question on take function - Spark Java API
Thanks Sonal.. I shall try doing that.. On 26-Aug-2015, at 1:05 pm, Sonal Goyal sonalgoy...@gmail.com wrote: You can try using wholeTextFile which will give you a pair rdd of fileName, content. flatMap through this and manipulate the content. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ Check out Reifier at Spark Summit 2015 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ http://in.linkedin.com/in/sonalgoyal On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane pankaj.wah...@qiotec.com mailto:pankaj.wah...@qiotec.com wrote: Hi community members, Apache Spark is Fantastic and very easy to learn.. Awesome work!!! Question: I have multiple files in a folder and and the first line in each file is name of the asset that the file belongs to. Second line is csv header row and data starts from third row.. Ex: File 1 TestAsset01 Time,dp_1,dp_2,dp_3 11-01-2015 15:00:00,123,456,789 11-01-2015 15:00:01,123,456,789 . . . Ex: File 2 TestAsset02 Time,dp_1,dp_2,dp_3 11-01-2015 15:00:00,1230,4560,7890 11-01-2015 15:00:01,1230,4560,7890 . . . I have got nearly 1000 files in each folder sizing ~10G I am using apache spark Java api to read all this files. Following is code extract that I am using: try (JavaSparkContext sc = new JavaSparkContext(conf)) { MapString, String readingTypeMap = getReadingTypesMap(sc); //Read File JavaRDDString data = sc.textFile(resourceBundle.getString(FOLDER_NAME)); //Get Asset String asset = data.take(1).get(0); //Extract Time Series Data JavaRDDString actualData = data.filter(line - line.contains(DELIMERTER)); //Strip header String header = actualData.take(1).get(0); String[] headers = header.split(DELIMERTER); //Extract actual data JavaRDDString timeSeriesLines = actualData.filter(line - !line.equals(header)); //Extract valid records JavaRDDString validated = timeSeriesLines.filter(line - validate(line)); //Find Granularity Integer granularity = toInt(resourceBundle.getString(GRANULARITY)); //Transform to TSD objects JavaRDDTimeSeriesData tsdFlatMap = transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity); //Save to Cassandra javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace), time_series_data, mapToRow(TimeSeriesData.class)).saveToCassandra(); System.out.println(Total Records: + timeSeriesLines.count()); System.out.println(Valid Records: + validated.count()); } Within TimeSeriesData Object I need to set the asset name for the reading, so I need output of data.take(1) to be different for different files. Thank You. Best Regards, Pankaj QIO Technologies Limited is a limited company registered in England Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 This message and the information contained within it is intended solely for the addressee and may contain confidential or privileged information. If you have received this message in error please notify QIO Technologies Limited immediately and then permanently delete this message. If you are not the intended addressee then you must not copy, transmit, disclose or rely on the information contained in this message or in any attachment to it, all such use is prohibited to maximum extent possible by law. -- QIO Technologies Limited is a limited company registered in England Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 This message and the information contained within it is intended solely for the addressee and may contain confidential or privileged information. If you have received this message in error please notify QIO Technologies Limited immediately and then permanently delete this message. If you are not the intended addressee then you must not copy, transmit, disclose or rely on the information contained in this message or in any attachment to it, all such use is prohibited to maximum extent possible by law.
Re: SparkSQL saveAsParquetFile does not preserve AVRO schema
Note: In the code (org.apache.spark.sql.parquet.DefaultSource) I've found this: val relation = if (doInsertion) { // This is a hack. We always set nullable/containsNull/valueContainsNull to true // for the schema of a parquet data. val df = sqlContext.createDataFrame( data.queryExecution.toRdd, data.schema.asNullable) val createdRelation = createRelation(sqlContext, parameters, df.schema).asInstanceOf[ParquetRelation2] createdRelation.insert(df, overwrite = mode == SaveMode.Overwrite) createdRelation } The culprit is data.schema.asNullable. What's the real reason for this? Why not simply use the existing schema nullable flags? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-saveAsParquetFile-does-not-preserve-AVRO-schema-tp2p24454.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
reduceByKey not working on JavaPairDStream
Hi, I have applied mapToPair and then a reduceByKey on a DStream to obtain a JavaPairDStreamString, MapString, Object. I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained above. But i do not see any logs from reduceByKey operation. Can anyone explain why is this happening..? find My Code Below - * /*** * GroupLevel1 Groups - articleId, host and tags */* JavaPairDStreamString, MapString, Object groupLevel1 = inputDataMap .mapToPair( new PairFunctionMapString, Object, String, MapString, Object() { private static final long serialVersionUID = 5196132687044875422L; @Override public Tuple2String, MapString, Object call( MapString, Object map) throws Exception { String host = (String) map.get(host); String articleId = (String) map.get(articleId); List tags = (List) map.get(tags); if (host == null || articleId == null) { logger.error(*** Error Doc \n + map); } String key = articleId_ + articleId + _host_ + host + _tags_ + tags.toString(); //logger.info(key); System.out.println(Printing Key - + key); map.put(articlecount, 1L); return new Tuple2String, MapString, Object(key, map); } }) .reduceByKey( new Function2MapString, Object, MapString, Object, MapString, Object() { private static final long serialVersionUID = 1L; @Override public MapString, Object call( MapString, Object map1, MapString, Object map2) throws Exception { Long count1 = (Long) map1.get(articlecount); Long count2 = (Long) map2.get(articlecount); map1.put(articlecount, count1 + count2); return map1; } }); */*** * Grouping level 1 groups on articleId+host+tags * Tags can be multiple for an article. * Grouping level 2 does - * 1. For each tag in a row, find occurrence of that tag in other rows. * 2. If one tag found in another row, then add the articleCount of current and new row and put as articleCount for that tag. * Note - * Idea behind this grouping is to get all article counts that contain a particular tag and preserve this value. */* JavaPairDStreamString, MapString, Object groupLevel2 = groupLevel1.flatMapToPair(new PairFlatMapFunctionTuple2String, MapString, Object, String, MapString, Object() { @Override public IterableTuple2String, MapString, Object call(Tuple2String, MapString, Object stringMapTuple2) throws Exception { System.out.println(group level 2 tuple 1 - + stringMapTuple2._1()); System.out.println(group level 2 tuple 2 - + stringMapTuple2._2()); ArrayListString tagList = (ArrayListString) stringMapTuple2._2().get(tags); ArrayList tagKeyList = new ArrayList(); String host = (String) stringMapTuple2._2().get(host); StringBuilder key; for (String tag : tagList) { key = new StringBuilder(host_).append(host).append(_tag_).append(tag); System.out.println(generated Key - +key); tagKeyList.add(new Tuple2String, MapString, Object(key.toString(), stringMapTuple2._2())); } return tagKeyList; } }); groupLevel2 = groupLevel2.reduceByKey(new Function2MapString, Object, MapString, Object, MapString, Object() { @Override public MapString, Object call(MapString, Object dataMap1, MapString, Object dataMap2) throws Exception { System.out.println(Type of article map in 1 + dataMap1.get(articleId).getClass()); System.out.println(Type of article map in 2 + dataMap2.get(articleId).getClass()); MapString, String articleMap1 = (MapString, String) dataMap1.get(articleId); MapString, String articleMap2 = (MapString, String) dataMap2.get(articleId); if (articleMap1 == null || articleMap1.isEmpty()) { System.out.println(returning because map 1 null);
BlockNotFoundException when running spark word count on Tachyon
I am using tachyon in the spark program below,but I encounter a BlockNotFoundxception. Does someone know what's wrong and also is there guide on how to configure spark to work with Tackyon?Thanks! conf.set(spark.externalBlockStore.url, tachyon://10.18.19.33:19998) conf.set(spark.externalBlockStore.baseDir,/spark) val sc = new SparkContext(conf) import org.apache.spark.storage.StorageLevel val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6)) rdd.persist(StorageLevel.OFF_HEAP) val count = rdd.count() val sum = rdd.reduce(_ + _) println(sThe count: $count, The sum is: $sum) 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost): java.lang.RuntimeException: org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
Re: MLlib Prefixspan implementation
A first use case of gap constraint is included in the article. Another application would be customer-shopping sequence analysis where you want to put a constraint on the duration between two purchases for them to be considered as a pertinent sequence. Additional question regarding the code : what's the point of using ReversedPrefix in localprefispan ? The prefix is used neither in finding frequent items of a projected database or computing a new projected database so it looks like it's appended in inverse order just to be reversed when transformed to a sequence. 2015-08-25 12:15 GMT+08:00 Feynman Liang fli...@databricks.com: CCing the mailing list again. It's currently not on the radar. Do you have a use case for it? I can bring it up during 1.6 roadmap planning tomorrow. On Mon, Aug 24, 2015 at 8:28 PM, alexis GILLAIN ila...@hotmail.com wrote: Hi, I just realized the article I mentioned is cited in the jira and not in the code so I guess you didn't use this result. Do you plan to implement sequence with timestamp and gap constraint as in : https://people.mpi-inf.mpg.de/~rgemulla/publications/miliaraki13mg-fsm.pdf 2015-08-25 7:06 GMT+08:00 Feynman Liang fli...@databricks.com: Hi Alexis, Unfortunately, both of the papers you referenced appear to be translations and are quite difficult to understand. We followed http://doi.org/10.1109/ICDE.2001.914830 when implementing PrefixSpan. Perhaps you can find the relevant lines in there so I can elaborate further? Feynman On Thu, Aug 20, 2015 at 9:07 AM, alexis GILLAIN ila...@hotmail.com wrote: I want to use prefixspan so I had a look at the code and the cited paper : Distributed PrefixSpan Algorithm Based on MapReduce. There is a result in the paper I didn't really undertstand and I could'nt find where it is used in the code. Suppose a sequence database S = {1,2...n}, a sequence a... is a length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a prefix of a length-(L-1) sequential pattern a...a, when the support count of a is not less than min_support, it is equal to obtaining a length-L sequential pattern a ... a from projected databases that obtaining a length-L sequential pattern a ... a from a sequence database S. According to the paper It's supposed to add a pruning step in the reduce function but I couldn't find where. This result seems to come from a previous paper : Wang Linlin, Fan Jun. Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan [J]. Computer Engineering, 2009, 35(23): 56-61 but it didn't help me to understand it and how it can improve the algorithm.
Re: BlockNotFoundException when running spark word count on Tachyon
Sometime back I was playing with Spark and Tachyon and I also found this issue . The issue here is TachyonBlockManager put the blocks in WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted from Tachyon Cache when Memory is full and when Spark try to find the block it throws BlockNotFoundException . To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon -project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have worked and I did not see any any Spark Job failed due to BlockNotFoundException. below is my Hierarchical Storage settings which I used.. -Dtachyon.worker.hierarchystore.level.max=2 -Dtachyon.worker.hierarchystore.level0.alias=MEM -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE -Dtachyon.worker.hierarchystore.level1.alias=HDD -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB -Dtachyon.worker.allocate.strategy=MAX_FREE -Dtachyon.worker.evict.strategy=LRU Regards, Dibyendu On Wed, Aug 26, 2015 at 12:25 PM, Todd bit1...@163.com wrote: I am using tachyon in the spark program below,but I encounter a BlockNotFoundxception. Does someone know what's wrong and also is there guide on how to configure spark to work with Tackyon?Thanks! conf.set(spark.externalBlockStore.url, tachyon://10.18.19.33:19998 ) conf.set(spark.externalBlockStore.baseDir,/spark) val sc = new SparkContext(conf) import org.apache.spark.storage.StorageLevel val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6)) rdd.persist(StorageLevel.OFF_HEAP) val count = rdd.count() val sum = rdd.reduce(_ + _) println(sThe count: $count, The sum is: $sum) 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost): java.lang.RuntimeException: org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at
RE: SparkR: exported functions
I believe that is done explicitly while the final API is being figured out. For the moment you could use DataFrame read.df() From: csgilles...@gmail.com Date: Tue, 25 Aug 2015 18:26:50 +0100 Subject: SparkR: exported functions To: user@spark.apache.org Hi, I've just started playing about with SparkR (Spark 1.4.1), and noticed that a number of the functions haven't been exported. For example, the textFile function https://github.com/apache/spark/blob/master/R/pkg/R/context.R isn't exported, i.e. the function isn't in the NAMESPACE file. This is obviously due to the ' missing in the roxygen2 directives. Is this intentional? Thanks Colin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: BlockNotFoundException when running spark word count on Tachyon
The URL seems to have changed .. here is the one .. http://tachyon-project.org/documentation/Tiered-Storage-on-Tachyon.html On Wed, Aug 26, 2015 at 12:32 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Sometime back I was playing with Spark and Tachyon and I also found this issue . The issue here is TachyonBlockManager put the blocks in WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted from Tachyon Cache when Memory is full and when Spark try to find the block it throws BlockNotFoundException . To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon -project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have worked and I did not see any any Spark Job failed due to BlockNotFoundException. below is my Hierarchical Storage settings which I used.. -Dtachyon.worker.hierarchystore.level.max=2 -Dtachyon.worker.hierarchystore.level0.alias=MEM -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE -Dtachyon.worker.hierarchystore.level1.alias=HDD -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB -Dtachyon.worker.allocate.strategy=MAX_FREE -Dtachyon.worker.evict.strategy=LRU Regards, Dibyendu On Wed, Aug 26, 2015 at 12:25 PM, Todd bit1...@163.com wrote: I am using tachyon in the spark program below,but I encounter a BlockNotFoundxception. Does someone know what's wrong and also is there guide on how to configure spark to work with Tackyon?Thanks! conf.set(spark.externalBlockStore.url, tachyon://10.18.19.33:19998 ) conf.set(spark.externalBlockStore.baseDir,/spark) val sc = new SparkContext(conf) import org.apache.spark.storage.StorageLevel val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6)) rdd.persist(StorageLevel.OFF_HEAP) val count = rdd.count() val sum = rdd.reduce(_ + _) println(sThe count: $count, The sum is: $sum) 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost): java.lang.RuntimeException: org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at
Re: MLlib Prefixspan implementation
ReversedPrefix is used because scala's List uses a linked list, which has constant time append to head but linear time append to tail. I'm aware that there are use cases for the gap constraints. My question was more about whether any users of Spark/MLlib have an immediate application for these features. On Wed, Aug 26, 2015 at 12:10 AM, alexis GILLAIN ila...@hotmail.com wrote: A first use case of gap constraint is included in the article. Another application would be customer-shopping sequence analysis where you want to put a constraint on the duration between two purchases for them to be considered as a pertinent sequence. Additional question regarding the code : what's the point of using ReversedPrefix in localprefispan ? The prefix is used neither in finding frequent items of a projected database or computing a new projected database so it looks like it's appended in inverse order just to be reversed when transformed to a sequence. 2015-08-25 12:15 GMT+08:00 Feynman Liang fli...@databricks.com: CCing the mailing list again. It's currently not on the radar. Do you have a use case for it? I can bring it up during 1.6 roadmap planning tomorrow. On Mon, Aug 24, 2015 at 8:28 PM, alexis GILLAIN ila...@hotmail.com wrote: Hi, I just realized the article I mentioned is cited in the jira and not in the code so I guess you didn't use this result. Do you plan to implement sequence with timestamp and gap constraint as in : https://people.mpi-inf.mpg.de/~rgemulla/publications/miliaraki13mg-fsm.pdf 2015-08-25 7:06 GMT+08:00 Feynman Liang fli...@databricks.com: Hi Alexis, Unfortunately, both of the papers you referenced appear to be translations and are quite difficult to understand. We followed http://doi.org/10.1109/ICDE.2001.914830 when implementing PrefixSpan. Perhaps you can find the relevant lines in there so I can elaborate further? Feynman On Thu, Aug 20, 2015 at 9:07 AM, alexis GILLAIN ila...@hotmail.com wrote: I want to use prefixspan so I had a look at the code and the cited paper : Distributed PrefixSpan Algorithm Based on MapReduce. There is a result in the paper I didn't really undertstand and I could'nt find where it is used in the code. Suppose a sequence database S = {1,2...n}, a sequence a... is a length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a prefix of a length-(L-1) sequential pattern a...a, when the support count of a is not less than min_support, it is equal to obtaining a length-L sequential pattern a ... a from projected databases that obtaining a length-L sequential pattern a ... a from a sequence database S. According to the paper It's supposed to add a pruning step in the reduce function but I couldn't find where. This result seems to come from a previous paper : Wang Linlin, Fan Jun. Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan [J]. Computer Engineering, 2009, 35(23): 56-61 but it didn't help me to understand it and how it can improve the algorithm.
Spark cluster multi tenancy
Hi All, We've set up our spark cluster on aws running on yarn (running on hadoop 2.3) with fair scheduling and preemption turned on. The cluster is shared for prod and dev work where prod runs with a higher fair share and can preempt dev jobs if there are not enough resources available for it. It appears that dev jobs which get preempted often get unstable after losing some executors and the whole jobs gets stuck (without making any progress) or end up getting restarted (and hence losing all the work done). Has someone encountered this before ? Is the solution just to set spark.task.maxFailures to a really high value to recover from task failures in such scenarios? Are there other approaches that people have taken for spark multi tenancy that works better in such scenario? Thanks, Sadhan
Re: JDBC Streams
Yes On Wed, Aug 26, 2015 at 10:23 AM, Chen Song chen.song...@gmail.com wrote: Thanks Cody. Are you suggesting to put the cache in global context in each executor JVM, in a Scala object for example. Then have a scheduled task to refresh the cache (or triggered by the expiry if Guava)? Chen On Wed, Aug 26, 2015 at 10:51 AM, Cody Koeninger c...@koeninger.org wrote: If your data only changes every few days, why not restart the job every few days, and just broadcast the data? Or you can keep a local per-jvm cache with an expiry (e.g. guava cache) to avoid many mysql reads On Wed, Aug 26, 2015 at 9:46 AM, Chen Song chen.song...@gmail.com wrote: Piggyback on this question. I have a similar use case but a bit different. My job is consuming a stream from Kafka and I need to join the Kafka stream with some reference table from MySQL (kind of data validation and enrichment). I need to process this stream every 1 min. The data in MySQL is not changed very often, maybe once a few days. So my requirement is: * I cannot easily use broadcast variable because the data does change, although not very often. * I am not sure if it is good practice to read data from MySQL in every batch (in my case, 1 min). Anyone has done this before, any suggestions and feedback is appreciated. Chen On Sun, Jul 5, 2015 at 11:50 AM, Ashic Mahtab as...@live.com wrote: If it is indeed a reactive use case, then Spark Streaming would be a good choice. One approach worth considering - is it possible to receive a message via kafka (or some other queue). That'd not need any polling, and you could use standard consumers. If polling isn't an issue, then writing a custom receiver will work fine. The way a receiver works is this: * Your receiver has a receive() function, where you'd typically start a loop. In your loop, you'd fetch items, and call store(entry). * You control everything in the receiver. If you're listening on a queue, you receive messages, store() and ack your queue. If you're polling, it's up to you to ensure delays between db calls. * The things you store() go on to make up the rdds in your DStream. So, intervals, windowing, etc. apply to those. The receiver is the boundary between your data source and the DStream RDDs. In other words, if your interval is 15 seconds with no windowing, then the things that went to store() every 15 seconds are bunched up into an RDD of your DStream. That's kind of a simplification, but should give you the idea that your db polling interval and streaming interval are not tied together. -Ashic. -- Date: Mon, 6 Jul 2015 01:12:34 +1000 Subject: Re: JDBC Streams From: guha.a...@gmail.com To: as...@live.com CC: ak...@sigmoidanalytics.com; user@spark.apache.org Hi Thanks for the reply. here is my situation: I hve a DB which enbles synchronus CDC, think this as a DBtrigger which writes to a taable with changed values as soon as something changes in production table. My job will need to pick up the data as soon as it arrives which can be every 1 min interval. Ideally it will pick up the changes, transform it into a jsonand puts it to kinesis. In short, I am emulating a Kinesis producer with a DB source (dont even ask why, lets say these are the constraints :) ) Please advice (a) is spark a good choice here (b) whats your suggestion either way. I understand I can easily do it using a simple java/python app but I am little worried about managing scaling/fault tolerance and thats where my concern is. TIA Ayan On Mon, Jul 6, 2015 at 12:51 AM, Ashic Mahtab as...@live.com wrote: Hi Ayan, How continuous is your workload? As Akhil points out, with streaming, you'll give up at least one core for receiving, will need at most one more core for processing. Unless you're running on something like Mesos, this means that those cores are dedicated to your app, and can't be leveraged by other apps / jobs. If it's something periodic (once an hour, once every 15 minutes, etc.), then I'd simply write a normal spark application, and trigger it periodically. There are many things that can take care of that - sometimes a simple cronjob is enough! -- Date: Sun, 5 Jul 2015 22:48:37 +1000 Subject: Re: JDBC Streams From: guha.a...@gmail.com To: ak...@sigmoidanalytics.com CC: user@spark.apache.org Thanks Akhil. In case I go with spark streaming, I guess I have to implment a custom receiver and spark streaming will call this receiver every batch interval, is that correct? Any gotcha you see in this plan? TIA...Best, Ayan On Sun, Jul 5, 2015 at 5:40 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If you want a long running application, then go with spark streaming (which kind of blocks your resources). On the other hand, if you use job server then you can actually use the resources (CPUs) for other jobs also when your dbjob is not using them. Thanks Best
spark streaming 1.3 kafka buffer size
whats the default buffer in spark streaming 1.3 for kafka messages. Say In this run it has to fetch messages from offset 1 to 1. will it fetch all in one go or internally it fetches messages in few messages batch. Is there any setting to configure this no of offsets fetched in one batch?
Re: Building spark-examples takes too much time using Maven
Can you provide a bit more information ? Are Spark artifacts packaged by you have the same names / paths (in maven repo) as the ones published by Apache Spark ? Is Zinc running on the machine where you performed the build ? Cheers On Wed, Aug 26, 2015 at 7:56 AM, Muhammad Haseeb Javed 11besemja...@seecs.edu.pk wrote: I checked out the master branch and started playing around with the examples. I want to build a jar of the examples as I wish run them using the modified spark jar that I have. However, packaging spark-examples takes too much time as maven tries to download the jar dependencies rather than use the jar that are already present int the system as I extended and packaged spark itself locally?
Re: How to add a new column with date duration from 2 date columns in a dataframe
Thanks Davies. HiveContext seems neat to use :) On Thu, Aug 20, 2015 at 3:02 PM, Davies Liu dav...@databricks.com wrote: As Aram said, there two options in Spark 1.4, 1) Use the HiveContext, then you got datediff from Hive, df.selectExpr(datediff(d2, d1)) 2) Use Python UDF: ``` from datetime import date df = sqlContext.createDataFrame([(date(2008, 8, 18), date(2008, 9, 26))], ['d1', 'd2']) from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType diff = udf(lambda a, b: (a - b).days, IntegerType()) df.select(diff(df.d1, df.d2)).show() +-+ |PythonUDF#lambda(d1,d2)| +-+ | -39| +-+ ``` On Thu, Aug 20, 2015 at 7:45 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Hi, hope this will help you import org.apache.spark.sql.functions._ import sqlContext.implicits._ import java.sql.Timestamp val df = sc.parallelize(Array((date1, date2))).toDF(day1, day2) val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) = Days.daysBetween(new DateTime(value2.getTime), new DateTime(value1.getTime)).getDays) df.withColumn(diff, dateDiff(df(day2), df(day1))).show() or you can write sql query using hiveql's datediff function. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel dhaval1...@gmail.com wrote: More update on this question..I am using spark 1.4.1. I was just reading documentation of spark 1.5 (still in development) and I think there will be a new func *datediff* that will solve the issue. So please let me know if there is any work-around until spark 1.5 is out :). pyspark.sql.functions.datediff(end, start)[source] Returns the number of days from start to end. df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2']) df.select(datediff(df.d2, df.d1).alias('diff')).collect() [Row(diff=32)] New in version 1.5. On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel dhaval1...@gmail.com wrote: Apologies, sent too early accidentally. Actual message is below A dataframe has 2 datecolumns (datetime type) and I would like to add another column that would have difference between these two dates. Dataframe snippet is below. new_df.show(5) +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+ Here is what I have tried so far: - new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE)).show() Error: DateType does not support numeric operations - new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() Error: Can't extract value from (next_diag_date#927 - SVCDATE#377); However this simple python code works fine with pySpark: from datetime import date d0 = date(2008, 8, 18) d1 = date(2008, 9, 26) delta = d0 - d1 print (d0 - d1).days # -39 Any suggestions would be appreciated! Also is there a way to add a new column in dataframe without using column expression (e.g. like in pandas or R. df$new_col = 'new col value')? Thanks, Dhaval On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com wrote: new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+
Re: spark streaming 1.3 kafka buffer size
see http://kafka.apache.org/documentation.html#consumerconfigs fetch.message.max.bytes in the kafka params passed to the constructor On Wed, Aug 26, 2015 at 10:39 AM, Shushant Arora shushantaror...@gmail.com wrote: whats the default buffer in spark streaming 1.3 for kafka messages. Say In this run it has to fetch messages from offset 1 to 1. will it fetch all in one go or internally it fetches messages in few messages batch. Is there any setting to configure this no of offsets fetched in one batch?
Re: Efficient sampling from a Hive table
Sorry, I meant without reading from all splits. This is a single partition in the table. On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak tom...@gmail.com wrote: I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from and I don't particularly care which rows. Doing a LIMIT unfortunately results in two stages where the first stage reads the whole table, and the second then performs the limit with a single worker, which is not very efficient. Is there a better way to sample a subset of rows in Spark without, ideally in a single stage without reading all partitions. cheers, Tom
Persisting sorted parquet tables for future sort merge joins
I want to persist a large _sorted_ table to Parquet on S3 and then read this in and join it using the Sorted Merge Join strategy against another large sorted table. The problem is: even though I sort these tables on the join key beforehand, once I persist them to Parquet, they lose the information about their sortedness. Is there anyway to hint to Spark that they do not need to be resorted the next time I read them in? I've been trying this on 1.5 and I keep getting plans looking like: [== Physical Plan ==] [TungstenProject [pos#28400,workf...#28399]] [ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]] [ TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0] [ TungstenExchange hashpartitioning(CHROM#28403,pos#28400)] [ ConvertToUnsafe] [ Scan ParquetRelation[file:/sorted.parquet][pos#2848424]] [ TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0] [ TungstenExchange hashpartitioning(CHROM#28399,pos#28332)] [ ConvertToUnsafe] [ Scan ParquetRelation[file:exploded_sorted.parquet][pos#2.399]] As you can see, this plan exchanges and sorts the data before performing the SortMergeJoin even though these parquet tables are already sorted. Thanks, Jason
Re: How to unit test HiveContext without OutOfMemoryError (using sbt)
Thanks for your response Yana, I can increase the MaxPermSize parameter and it will allow me to run the unit test a few more times before I run out of memory. However, the primary issue is that running the same unit test in the same JVM (multiple times) results in increased memory (each run of the unit test) and I believe it has something to do with HiveContext not reclaiming memory after it is finished (or I'm not shutting it down properly). It could very well be related to sbt, however, it's not clear to me. On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: The PermGen space error is controlled with MaxPermSize parameter. I run with this in my pom, I think copied pretty literally from Spark's own tests... I don't know what the sbt equivalent is but you should be able to pass it...possibly via SBT_OPTS? plugin groupIdorg.scalatest/groupId artifactIdscalatest-maven-plugin/artifactId version1.0/version configuration reportsDirectory${project.build.directory}/surefire-reports/reportsDirectory parallelfalse/parallel junitxml./junitxml filereportsSparkTestSuite.txt/filereports argLine-Xmx3g -XX:MaxPermSize=256m -XX:ReservedCodeCacheSize=512m/argLine stderr/ systemProperties java.awt.headlesstrue/java.awt.headless spark.testing1/spark.testing spark.ui.enabledfalse/spark.ui.enabled spark.driver.allowMultipleContextstrue/spark.driver.allowMultipleContexts /systemProperties /configuration executions execution idtest/id goals goaltest/goal /goals /execution /executions /plugin /plugins On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hello, I am using sbt and created a unit test where I create a `HiveContext` and execute some query and then return. Each time I run the unit test the JVM will increase it's memory usage until I get the error: Internal error when running tests: java.lang.OutOfMemoryError: PermGen space Exception in thread Thread-2 java.io.EOFException As a work-around, I can fork a new JVM each time I run the unit test, however, it seems like a bad solution as takes a while to run the unit test. By the way, I tried to importing the TestHiveContext: - import org.apache.spark.sql.hive.test.TestHiveContext However, it suffers from the same memory issue. Has anyone else suffered from the same problem? Note that I am running these unit tests on my mac. Cheers, Mike.
Just Released V1.0.4 Low Level Receiver Based Kafka-Spark-Consumer in Spark Packages having built-in Back Pressure Controller
Dear All, Just now released the 1.0.4 version of Low Level Receiver based Kafka-Spark-Consumer in spark-packages.org . You can find the latest release here : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Here is github location : https://github.com/dibbhatt/kafka-spark-consumer This consumer is now have built in PID ( Proportional , Integral, Derivative ) Rate controller to control the Spark Back-Pressure . This consumer implemented the Rate Limiting logic not by controlling the number of messages per block ( as it is done in Spark's Out of Box Consumers), but by size of the blocks per batch. i.e. for any given batch, this consumer controls the Rate limit by controlling the size of the batches. As Spark memory is driven by block size rather the number of messages , I think rate limit by block size is more appropriate. e.g. Let assume Kafka contains messages of very small sizes ( say few hundred bytes ) to larger messages ( to few hundred KB ) for same topic. Now if we control the rate limit by number of messages, Block sizes may vary drastically based on what type of messages get pulled per block . Whereas , if I control my rate limiting by size of block, my block size remain constant across batches (even though number of messages differ across blocks ) and can help to tune my memory settings more correctly as I know how much exact memory my Block is going to consume. This Consumer has its own PID (Proportional, Integral, Derivative ) Controller built into the consumer and control the Spark Back Pressure by modifying the size of Block it can consume at run time. The PID Controller rate feedback mechanism is built using Zookeeper. Again the logic to control Back Pressure is not by controlling number of messages ( as it is done in Spark 1.5 , SPARK-7398) but altering size of the Block consumed per batch from Kafka. As the Back Pressure is built into the Consumer, this consumer can be used with any version of Spark if anyone want to have a back pressure controlling mechanism in their existing Spark / Kafka environment. Regards, Dibyendu
Re: JDBC Streams
I would use Sqoop. It has been designed exactly for these types of scenarios. Spark streaming does not make sense here Le dim. 5 juil. 2015 à 1:59, ayan guha guha.a...@gmail.com a écrit : Hi All I have a requireent to connect to a DB every few minutes and bring data to HBase. Can anyone suggest if spark streaming would be appropriate for this senario or I shoud look into jobserver? Thanks in advance -- Best Regards, Ayan Guha
Re: Does the driver program always run local to where you submit the job from?
On Wed, Aug 26, 2015 at 2:03 PM, Jerry jerry.c...@gmail.com wrote: Assuming your submitting the job from terminal; when main() is called, if I try to open a file locally, can I assume the machine is always the one I submitted the job from? See the --deploy-mode option. client works as you describe; cluster does not. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Help! Stuck using withColumn
This simple comand call: val final_df = data.select(month_balance).withColumn(month_date, data.col(month_date_curr)) Is throwing: org.apache.spark.sql.AnalysisException: resolved attribute(s) month_date_curr#324 missing from month_balance#234 in operator !Project [month_balance#234, month_date_curr#324 AS month_date_curr#408]; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42) at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595) at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Ah, I was using the UI coupled with the job logs indicating that offsets were being processed even though it corresponded to 0 events. Looks like I wasn't matching up timestamps correctly: the 0 event batches were queued/processed when offsets were getting skipped: 15/08/26 11:26:05 INFO storage.BlockManager: Removing RDD 0 15/08/26 11:26:05 INFO kafka.KafkaRDD: Beginning offset 831729964 is the same as ending offset skipping install-json 1 15/08/26 11:26:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 6 blocks 15/08/26 11:26:08 INFO storage.BlockManager: Removing RDD 1 But eventually processing of offset 831729964 would resume: 15/08/26 11:27:18 INFO kafka.KafkaRDD: Computing topic install-json, partition 1 offsets 831729964 - 831729976 Lesson learned: will be more focused on reading the job logs properly in the future. Thanks for all the help on this! On Wed, Aug 26, 2015 at 12:16 PM, Cody Koeninger c...@koeninger.org wrote: I'd be less concerned about what the streaming ui shows than what's actually going on with the job. When you say you were losing messages, how were you observing that? The UI, or actual job output? The log lines you posted indicate that the checkpoint was restored and those offsets were processed; what are the log lines for the following KafkaRDD ? On Wed, Aug 26, 2015 at 2:04 PM, Susan Zhang suchenz...@gmail.com wrote: Compared offsets, and it continues from checkpoint loading: 15/08/26 11:24:54 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446), (install-json,4,825772921,825773536), (install-json,1,831654775,831655076), (install-json,0,1296018451,1296018810), (install-json,2,824785282,824785696), (install-json,3, 811428882,811429181)] 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json, partition 0 offsets 1296018451 - 1296018810 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 4 offsets 825773536 - 825907428 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 2 offsets 824785696 - 824889957 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 3 offsets 811429181 - 811529084 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, partition 1 offsets 831655076 - 831729964 ... But for some reason the streaming UI shows it as computing 0 events. Removing the call to checkpoint does remove the queueing of 0 event batches, since offsets just skip to the latest (checked that the first part.fromOffset in the restarted job is larger than the last part.untilOffset before restart). On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger c...@koeninger.org wrote: When the kafka rdd is actually being iterated on the worker, there should be an info line of the form log.info(sComputing topic ${part.topic}, partition ${part.partition} + soffsets ${part.fromOffset} - ${part.untilOffset}) You should be able to compare that to log of offsets during checkpoint loading, to see if they line up. Just out of curiosity, does removing the call to checkpoint on the stream affect anything? On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang suchenz...@gmail.com wrote: Thanks for the suggestions! I tried the following: I removed createOnError = true And reran the same process to reproduce. Double checked that checkpoint is loading: 15/08/26 10:10:40 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528), (install-json,4,825400856,825401058), (install-json,1,831453228,831453396), (install-json,0,1295759888,1295760378), (install-json,2,824443526,82409), (install-json,3, 811222580,811222874)] 15/08/26 10:10:40 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791), (install-json,4,825401058,825401249), (install-json,1,831453396,831453603), (install-json,0,1295760378,1295760809), (install-json,2,82409,824445510), (install-json,3, 811222874,811223285)] ... And the same issue is appearing as before (with 0 event batches getting queued corresponding to dropped messages). Our kafka brokers are on version 0.8.2.0, if that makes a difference. Also as a sanity check, I took out the ZK updates and reran (just in case that was somehow causing problems), and that didn't change anything as expected. Furthermore, the 0 event batches seem to take longer to process than batches with the regular load of events: processing time for 0 event batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000 event batches is consistently 1s. Why would that happen? As for the checkpoint call: directKStream.checkpoint(checkpointDuration) was an attempt to set the
suggest configuration for debugging spark streaming, kafka
Hi I have a Ubuntu box with 4GB memory and duo cores. Do you think it won't be enough to run spark streaming and kafka? I try to install standalone mode spark kafka so I can debug them in IDE. Do I need to install hadoop? Thanks! J - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does the driver program always run local to where you submit the job from?
Thanks! On Wed, Aug 26, 2015 at 2:06 PM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Aug 26, 2015 at 2:03 PM, Jerry jerry.c...@gmail.com wrote: Assuming your submitting the job from terminal; when main() is called, if I try to open a file locally, can I assume the machine is always the one I submitted the job from? See the --deploy-mode option. client works as you describe; cluster does not. -- Marcelo
RE: Help! Stuck using withColumn
I can reproduce this even simpler with the following: val gf = sc.parallelize(Array(3,6,4,7,3,4,5,5,31,4,5,2)).toDF(ASD) val ff = sc.parallelize(Array(4,6,2,3,5,1,4,6,23,6,4,7)).toDF(GFD) gf.withColumn(DSA, ff.col(GFD)) org.apache.spark.sql.AnalysisException: resolved attribute(s) GFD#421 missing from ASD#419 in operator !Project [ASD#419,GFD#421 AS DSA#422]; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42) at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595) at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039) From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com] Sent: Wednesday, August 26, 2015 6:47 PM To: user@spark.apache.org Subject: Help! Stuck using withColumn This simple comand call: val final_df = data.select(month_balance).withColumn(month_date, data.col(month_date_curr)) Is throwing: org.apache.spark.sql.AnalysisException: resolved attribute(s) month_date_curr#324 missing from month_balance#234 in operator !Project [month_balance#234, month_date_curr#324 AS month_date_curr#408]; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42) at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595) at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)
Spark.ml vs Spark.mllib
Hi, We are in the process of developing a new product/Spark application. While the official Spark 1.4.1 page http://spark.apache.org/docs/latest/ml-guide.html invites users and developers to use *Spark.mllib* and optionally contribute to *Spark.ml*, this http://stackoverflow.com/questions/30231840/difference-between-org-apache-spark-ml-classification-and-org-apache-spark-mllib StackOverflow post refers to the /design doc/, saying the Spark.mllib will be deprecated eventually. Could you please confirm which of these is true, and if we need to worry if we are planning to develop the app using Spark.mlli? What would be the timeline for this migration? Thanks in advance, Nikhil -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ml-vs-Spark-mllib-tp24465.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARK_DIST_CLASSPATH, primordial class loader app ClassNotFound
Hey all, I'm trying to do some stuff with a YAML file in the Spark driver using SnakeYAML library in scala. When I put the snakeyaml v1.14 jar on the SPARK_DIST_CLASSPATH and try to de-serialize some objects from YAML into classes in my app JAR on the driver (only the driver). I get the exception below. Yet when I dont have the snakeyaml jar on the SPARK_DIST_CLASSPATH but instead create a fat jar for my application (so it has the snakeyaml jar baked inside), then everything works fine. If I have both the jar on DIST_CLASSPATH fat jarred (with sbt assembly) it still fails with the same exception. I'm guessing that SPARK_DIST_CLASSPATH has jars that end up in the 'primordial' class loader, because SnakeYAML is then live in this class loader, it cant find my application jar classes because they are loaded at a subsequent point/different class loader. What is the workaround for this? Thanks ~N Exception in thread main com.sai.cfg.InvalidConfigurationPropertyException at com.sai.cfg.ConfigurationParser$.getConfig(ConfigurationParser.scala:184) at com.sai.cfg.ConfigurationParser$.getGlobalConfig(ConfigurationParser.scala:171) at com.sai.cfg.ConfigurationParser.globalConfig$lzycompute(ConfigurationParser.scala:26) at com.sai.cfg.ConfigurationParser.globalConfig(ConfigurationParser.scala:26) at com.sai.cfg.ConfigurationParser.init(ConfigurationParser.scala:44) at com.sai.strategy.StrategyEngineMain$.delayedEndpoint$au$com$quantium$personalisation$strategy$StrategyEngineMain$1(StrategyEngineMain.scala:39) at com.sai.strategy.StrategyEngineMain$delayedInit$body.apply(StrategyEngineMain.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at com.sai.strategy.StrategyEngineMain$.main(StrategyEngineMain.scala:15) at com.sai.strategy.StrategyEngineMain.main(StrategyEngineMain.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.yaml.snakeyaml.error.YAMLException: Class not found: com.sai.cfg.models.GlobalConfiguration at org.yaml.snakeyaml.constructor.Constructor.getClassForNode(Constructor.java:647) at org.yaml.snakeyaml.constructor.Constructor$ConstructYamlObject.getConstructor(Constructor.java:330) at org.yaml.snakeyaml.constructor.Constructor$ConstructYamlObject.construct(Constructor.java:340) at org.yaml.snakeyaml.constructor.BaseConstructor.constructObject(BaseConstructor.java:182) at org.yaml.snakeyaml.constructor.BaseConstructor.constructDocument(BaseConstructor.java:141) at org.yaml.snakeyaml.constructor.BaseConstructor.getSingleData(BaseConstructor.java:127) at org.yaml.snakeyaml.Yaml.loadFromReader(Yaml.java:481) at org.yaml.snakeyaml.Yaml.load(Yaml.java:400) at com.sai.cfg.ConfigurationParser$.getConfig(ConfigurationParser.scala:181) ... 24 more
Re: Question on take function - Spark Java API
You can try using wholeTextFile which will give you a pair rdd of fileName, content. flatMap through this and manipulate the content. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co Check out Reifier at Spark Summit 2015 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ http://in.linkedin.com/in/sonalgoyal On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane pankaj.wah...@qiotec.com wrote: Hi community members, Apache Spark is Fantastic and very easy to learn.. Awesome work!!! *Question:* I have multiple files in a folder and and the first line in each file is name of the asset that the file belongs to. Second line is csv header row and data starts from third row.. Ex: File 1 TestAsset01 Time,dp_1,dp_2,dp_3 11-01-2015 15:00:00,123,456,789 11-01-2015 15:00:01,123,456,789 . . . Ex: File 2 TestAsset02 Time,dp_1,dp_2,dp_3 11-01-2015 15:00:00,1230,4560,7890 11-01-2015 15:00:01,1230,4560,7890 . . . I have got nearly 1000 files in each folder sizing ~10G I am using apache spark Java api to read all this files. Following is code extract that I am using: try (JavaSparkContext sc = new JavaSparkContext(conf)) { MapString, String readingTypeMap = getReadingTypesMap(sc); //Read File JavaRDDString data = sc.textFile(resourceBundle.getString(FOLDER_NAME)); //Get Asset String asset = data.take(1).get(0); //Extract Time Series Data JavaRDDString actualData = data.filter(line - line.contains(DELIMERTER)); //Strip header String header = actualData.take(1).get(0); String[] headers = header.split(DELIMERTER); //Extract actual data JavaRDDString timeSeriesLines = actualData.filter(line - !line.equals(header)); //Extract valid records JavaRDDString validated = timeSeriesLines.filter(line - validate(line)); //Find Granularity Integer granularity = toInt(resourceBundle.getString(GRANULARITY)); //Transform to TSD objects JavaRDDTimeSeriesData tsdFlatMap = transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity); //Save to Cassandra javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace), time_series_data, mapToRow(TimeSeriesData.class)).saveToCassandra(); System.out.println(Total Records: + timeSeriesLines.count()); System.out.println(Valid Records: + validated.count()); } Within TimeSeriesData Object I need to set the asset name for the reading, so I need output of data.take(1) to be different for different files. Thank You. Best Regards, Pankaj QIO Technologies Limited is a limited company registered in England Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 This message and the information contained within it is intended solely for the addressee and may contain confidential or privileged information. If you have received this message in error please notify QIO Technologies Limited immediately and then permanently delete this message. If you are not the intended addressee then you must not copy, transmit, disclose or rely on the information contained in this message or in any attachment to it, all such use is prohibited to maximum extent possible by law.
Re:Re:Re: How to increase data scale in Spark SQL Perf
Sorry for the noise, It's my bad...I have worked it out now. At 2015-08-26 13:20:57, Todd bit1...@163.com wrote: I think the answer is No. I only see such message on the console..and #2 is the thread stack trace。 I am thinking is that in Spark SQL Perf forks many dsdgen process to generate data when the scalafactor is increased which at last exhaust the JVM When thread exception is thrown on the console and I leave it there for some while(15min about),then eventually I will see OutOfMemory occur Can you guys try to run it if you have the environment ? I think you may reproduce it. Thanks! At 2015-08-26 13:01:34, Ted Yu yuzhih...@gmail.com wrote: The error in #1 below was not informative. Are you able to get more detailed error message ? Thanks On Aug 25, 2015, at 6:57 PM, Todd bit1...@163.com wrote: Thanks Ted Yu. Following are the error message: 1. The exception that is shown on the UI is : Exception in thread Thread-113 Exception in thread Thread-126 Exception in thread Thread-64 Exception in thread Thread-90 Exception in thread Thread-117 Exception in thread Thread-80 Exception in thread Thread-115 Exception in thread ResponseProcessor for block BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984 Exception in thread qtp1270119920-57 Exception in thread Thread-77 Exception in thread Thread-132 Exception in thread Thread-68 Exception in thread Thread-61 Exception in thread Thread-70 Exception in thread qtp1270119920-52 Exception in thread Thread-88 Exception in thread qtp318933312-47 Exception in thread qtp1270119920-56 2. jstack the process, I see bunch of following message: Thread 31258: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame) - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame) - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 (Interpreted frame) - scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp() @bci=11, line=142 (Interpreted frame) - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 (Interpreted frame) Thread 31257: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame) - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame) - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 (Interpreted frame) - scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp() @bci=11, line=142 (Interpreted frame) - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 (Interpreted frame) At 2015-08-25 19:32:56, Ted Yu yuzhih...@gmail.com wrote: Looks like you were attaching images to your email which didn't go through. Consider using third party site for images - or paste error in text. Cheers On Tue, Aug 25, 2015 at 4:22 AM, Todd bit1...@163.com wrote: Hi, The spark sql perf itself contains benchmark data generation. I am using spark shell to run the spark sql perf to generate the data with 10G memory for both driver and executor. When I increase the scalefactor to be 30,and run the job, Then I got the following error: When I jstack it to see the status of the thread. I see the following: looks it is waiting for the process that the spark job kicks off.
Re: DataFrame Parquet Writer doesn't keep schema
The same as https://mail.google.com/mail/#label/Spark%2Fuser/14f64c75c15f5ccd Please follow the discussion there. On Tue, Aug 25, 2015 at 5:02 PM, Petr Novak oss.mli...@gmail.com wrote: Hi all, when I read parquet files with required fields aka nullable=false they are read correctly. Then I save them (df.write.parquet) and read again all my fields are saved and read as optional, aka nullable=true. Which means I suddenly have files with incompatible schemas. This happens on 1.3.0-1.4.1 and even on 1.5.1-rc1. Should I set some write option to keep nullability? Is there a specific reason why nullability is always overriden to true? Many thanks, Peter
Re: Relation between threads and executor core
Hi Samya, When submitting an application with spark-submit the cores per executor can be set with --executor-cores, meaning you can run that many tasks per executor concurrently. The page below has some more details on submitting applications: https://spark.apache.org/docs/latest/submitting-applications.html thanks, Jem On Wed, Aug 26, 2015 at 9:47 AM Samya samya.ma...@amadeus.com wrote: Hi All, Few basic queries :- 1. Is there a way we can control the number of threads per executor core? 2. Does this parameter “executor-cores” also has say in deciding how many threads to be run? Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Relation between threads and executor core
Thanks Jem, I do understand your suggestion. Actually --executor-cores alone doesn’t control the number of tasks, but is also governed by spark.task.cpus (amount of cores dedicated for each task’s execution). Reframing my Question, How many threads can be spawned per executor core? Is it in user control? Regards, Sam From: Jem Tucker [mailto:jem.tuc...@gmail.com] Sent: Wednesday, August 26, 2015 2:26 PM To: Samya MAITI samya.ma...@amadeus.com; user@spark.apache.org Subject: Re: Relation between threads and executor core Hi Samya, When submitting an application with spark-submit the cores per executor can be set with --executor-cores, meaning you can run that many tasks per executor concurrently. The page below has some more details on submitting applications: https://spark.apache.org/docs/latest/submitting-applications.html thanks, Jem On Wed, Aug 26, 2015 at 9:47 AM Samya samya.ma...@amadeus.commailto:samya.ma...@amadeus.com wrote: Hi All, Few basic queries :- 1. Is there a way we can control the number of threads per executor core? 2. Does this parameter “executor-cores” also has say in deciding how many threads to be run? Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Performance issue with Spark join
Hi, I'm trying to perform an ETL using Spark, but as soon as I start performing joins performance degrades a lot. Let me explain what I'm doing and what I found out until now. First of all, I'm reading avro files that are on a Cloudera cluster, using commands like this: /val tab1 = sc.hadoopFile(hdfs:///path/to/file, classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]], classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], 10)/ After this, I'm applying some filter functions to data (to reproduce where clauses of the original query) and then I'm using one map for each table in order to translate RDD elements in (key,record) format. Let's say I'm doing this: /val elabTab1 = tab1.filter(...).map()/ It is important to notice that if I do something like /elabTab1.first/ or /elabTab1.count/ the task is performed in a short time, let's say around impala's time. Now I need to do the following: /val joined = elabTab1.leftOuterJoin(elabTab2)/ Then I tried something like /joined.count/ to test performance, but it degraded really a lot (let's say that a count on a single table takes like 4 seconds and the count on the joined table takes 12 minutes). I think there's a problem with the configuration, but what might it be? I'll give you some more information: 1] Spark is running on YARN on a Cloudera cluster 2] I'm starting spark-shell with a command like /spark-shell --executor-cores 5 --executor-memory 10G/ that gives the shell approx 10 vcores and 25 GB of memory 3] The task seems still for a lot of time after the map tasks, with the following message in console: /Asked to send map output locations for shuffle ... to .../ 4] If I open the stderr of the executors, I can read plenty of messages like the following: /Thread ... spilling in-memory map of ... MB to disk/, where MBs are in the order of 300-400 5] I tried to raise the number of executors, but the situation didn't seem to change much. I also tried to change the number of splits of the avro files (currently set to 10), but it didn't seem to change much as well 6] Tables aren't particularly big, the bigger one should be few GBs Regards, Luca -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-issue-with-Spark-join-tp24458.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Performance issue with Spark join
Spark joins are different than traditional database joins because of the lack of support of indexes. Spark has to shuffle data between various nodes to perform joins. Hence joins are bound to be much slower than count which is just a parallel scan of the data. Still, to ensure that nothing is wrong with the setup, you may want to look at your Spark Task UI. You may want to look at the Shuffle Reads and Shuffle write parameters. On Wed, Aug 26, 2015 at 3:08 PM, lucap luca-pi...@hotmail.it wrote: Hi, I'm trying to perform an ETL using Spark, but as soon as I start performing joins performance degrades a lot. Let me explain what I'm doing and what I found out until now. First of all, I'm reading avro files that are on a Cloudera cluster, using commands like this: /val tab1 = sc.hadoopFile(hdfs:///path/to/file, classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]], classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], 10)/ After this, I'm applying some filter functions to data (to reproduce where clauses of the original query) and then I'm using one map for each table in order to translate RDD elements in (key,record) format. Let's say I'm doing this: /val elabTab1 = tab1.filter(...).map()/ It is important to notice that if I do something like /elabTab1.first/ or /elabTab1.count/ the task is performed in a short time, let's say around impala's time. Now I need to do the following: /val joined = elabTab1.leftOuterJoin(elabTab2)/ Then I tried something like /joined.count/ to test performance, but it degraded really a lot (let's say that a count on a single table takes like 4 seconds and the count on the joined table takes 12 minutes). I think there's a problem with the configuration, but what might it be? I'll give you some more information: 1] Spark is running on YARN on a Cloudera cluster 2] I'm starting spark-shell with a command like /spark-shell --executor-cores 5 --executor-memory 10G/ that gives the shell approx 10 vcores and 25 GB of memory 3] The task seems still for a lot of time after the map tasks, with the following message in console: /Asked to send map output locations for shuffle ... to .../ 4] If I open the stderr of the executors, I can read plenty of messages like the following: /Thread ... spilling in-memory map of ... MB to disk/, where MBs are in the order of 300-400 5] I tried to raise the number of executors, but the situation didn't seem to change much. I also tried to change the number of splits of the avro files (currently set to 10), but it didn't seem to change much as well 6] Tables aren't particularly big, the bigger one should be few GBs Regards, Luca -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-issue-with-Spark-join-tp24458.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to increase data scale in Spark SQL Perf
Mind sharing how you fixed the issue ? Cheers On Aug 26, 2015, at 1:50 AM, Todd bit1...@163.com wrote: Sorry for the noise, It's my bad...I have worked it out now. At 2015-08-26 13:20:57, Todd bit1...@163.com wrote: I think the answer is No. I only see such message on the console..and #2 is the thread stack trace。 I am thinking is that in Spark SQL Perf forks many dsdgen process to generate data when the scalafactor is increased which at last exhaust the JVM When thread exception is thrown on the console and I leave it there for some while(15min about),then eventually I will see OutOfMemory occur Can you guys try to run it if you have the environment ? I think you may reproduce it. Thanks! At 2015-08-26 13:01:34, Ted Yu yuzhih...@gmail.com wrote: The error in #1 below was not informative. Are you able to get more detailed error message ? Thanks On Aug 25, 2015, at 6:57 PM, Todd bit1...@163.com wrote: Thanks Ted Yu. Following are the error message: 1. The exception that is shown on the UI is : Exception in thread Thread-113 Exception in thread Thread-126 Exception in thread Thread-64 Exception in thread Thread-90 Exception in thread Thread-117 Exception in thread Thread-80 Exception in thread Thread-115 Exception in thread ResponseProcessor for block BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984 Exception in thread qtp1270119920-57 Exception in thread Thread-77 Exception in thread Thread-132 Exception in thread Thread-68 Exception in thread Thread-61 Exception in thread Thread-70 Exception in thread qtp1270119920-52 Exception in thread Thread-88 Exception in thread qtp318933312-47 Exception in thread qtp1270119920-56 2. jstack the process, I see bunch of following message: Thread 31258: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame) - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame) - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 (Interpreted frame) - scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp() @bci=11, line=142 (Interpreted frame) - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 (Interpreted frame) Thread 31257: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame) - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame) - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 (Interpreted frame) - scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp() @bci=11, line=142 (Interpreted frame) - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 (Interpreted frame) At 2015-08-25 19:32:56, Ted Yu yuzhih...@gmail.com wrote: Looks like you were attaching images to your email which didn't go through. Consider using third party site for images - or paste error in text. Cheers On Tue, Aug 25, 2015 at 4:22 AM, Todd bit1...@163.com wrote: Hi, The spark sql perf itself contains benchmark data generation. I am using spark shell to run the spark sql perf to generate the data with 10G memory for both driver and executor. When I increase the scalefactor to be 30,and run the job, Then I got the following error: When I jstack it to see the status of the thread. I see the following: looks it is waiting for the process that the spark job kicks off.
Custom Offset Management
Hi Folks, My Spark application interacts with kafka for getting data through Java Api. I am using Direct Approach (No Receivers) - which use Kafka’s simple consumer API to Read data. So, kafka offsets need to be handles explicitly. In case of Spark failure i need to save the offset state of kafka for resuming from the failure point. I am saving these points in MongoDB. Please tell he how to initialize Kafka DirectStream with saved offset points. I want to initialize kafka stream in Spark Streaming with required offset points. There is method i gets on web. KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet, fromOffsets, arg8); arg8 - kafka.message.MessageAndMetadata Please tell me how to handle and initialize this. Regards, Deepesh
Build k-NN graph for large dataset
Dear all, I'm trying to find an efficient way to build a k-NN graph for a large dataset. Precisely, I have a large set of high dimensional vector (say d 1) and I want to build a graph where those high dimensional points are the vertices and each one is linked to the k-nearest neighbor based on some kind similarity defined on the vertex spaces. My problem is to implement an efficient algorithm to compute the weight matrix of the graph. I need to compute a N*N similarities and the only way I know is to use cartesian operation follow by map operation on RDD. But, this is very slow when the N is large. Is there a more cleaver way to do this for an arbitrary similarity function ? Cheers, Jao
Re: Relation between threads and executor core
Sam, This may be of interest, as far as i can see it suggests that a spark 'task' is always executed as a single thread in the JVM. http://0x0fff.com/spark-architecture/ Thanks, Jem On Wed, Aug 26, 2015 at 10:06 AM Samya MAITI samya.ma...@amadeus.com wrote: Thanks Jem, I do understand your suggestion. Actually --executor-cores alone doesn’t control the number of tasks, but is also governed by *spark.task.cpus* (amount of cores dedicated for each task’s execution). Reframing my Question*, How many threads can be spawned per executor core? Is it in user control? * Regards, Sam *From:* Jem Tucker [mailto:jem.tuc...@gmail.com] *Sent:* Wednesday, August 26, 2015 2:26 PM *To:* Samya MAITI samya.ma...@amadeus.com; user@spark.apache.org *Subject:* Re: Relation between threads and executor core Hi Samya, When submitting an application with spark-submit the cores per executor can be set with --executor-cores, meaning you can run that many tasks per executor concurrently. The page below has some more details on submitting applications: https://spark.apache.org/docs/latest/submitting-applications.html thanks, Jem On Wed, Aug 26, 2015 at 9:47 AM Samya samya.ma...@amadeus.com wrote: Hi All, Few basic queries :- 1. Is there a way we can control the number of threads per executor core? 2. Does this parameter “executor-cores” also has say in deciding how many threads to be run? Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Build k-NN graph for large dataset
You could try dimensionality reduction (PCA or SVD) first. I would imagine that even if you could successfully compute similarities in the high-dimensional space you would probably run into the curse of dimensionality. On 26 Aug 2015, at 12:35, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm trying to find an efficient way to build a k-NN graph for a large dataset. Precisely, I have a large set of high dimensional vector (say d 1) and I want to build a graph where those high dimensional points are the vertices and each one is linked to the k-nearest neighbor based on some kind similarity defined on the vertex spaces. My problem is to implement an efficient algorithm to compute the weight matrix of the graph. I need to compute a N*N similarities and the only way I know is to use cartesian operation follow by map operation on RDD. But, this is very slow when the N is large. Is there a more cleaver way to do this for an arbitrary similarity function ? Cheers, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JobScheduler: Error generating jobs for time for custom InputDStream
Hi, I've developed a ScalaCheck property for testing Spark Streaming transformations. To do that I had to develop a custom InputDStream, which is very similar to QueueInputDStream but has a method for adding new test cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream. You can see the code at https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala. I have developed a few properties that run in local mode https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala. The problem is that when the batch interval is too small, and the machine cannot complete the batches fast enough, I get the following exceptions in the Spark log 15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time 1440580922500 ms java.lang.NullPointerException at org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587) at org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666) at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at
Re: Issue with building Spark v1.4.1-rc4 with Scala 2.11
Have you run dev/change-version-to-2.11.sh ? Cheers On Wed, Aug 26, 2015 at 7:07 AM, Felix Neutatz neut...@googlemail.com wrote: Hi everybody, I tried to build Spark v1.4.1-rc4 with Scala 2.11: ../apache-maven-3.3.3/bin/mvn -Dscala-2.11 -DskipTests clean install Before running this, I deleted: ../.m2/repository/org/apache/spark ../.m2/repository/org/spark-project My changes to the code: I just changed line 174 of org.apache.spark.executor.Executor$TaskRunner to: logInfo(stest Executor is trying to kill $taskName (TID $taskId)) Everything builds without an error, but I have an issue. When I look into the jar of spark-core_2.10, I can see the changed string in Executor$TaskRunner$$anonfun$kill$1.class. But when I look into spark-core_2.11 the corresponding string didn't change. It seems like it downloads the jar from maven. Do you know what I did wrong? I also tried to run mvn -Dscala-2.11 -DskipTests clean install on the current master and got the following error: [ERROR] Failed to execute goal org.apache.maven.plugins:maven-enforcer-plugin:1.4:enforce (enforce-versions) on project spark-parent_2.10: Some Enforcer rules have failed. Look above for specific messages explaining why the rule failed. - [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException Thank you for your help. Best regards, Felix
Re: Finding the number of executors.
As I was writing a long-ish message to explain how it doesn't work, it dawned on me that maybe driver connects to executors only after there's some work to do (while I was trying to find the number of executors BEFORE starting the actual work). So the solution was to simply execute a dummy task ( sparkContext.parallelize(1 until 1000, 200).reduce(_+_) ) before attempting to retrieve the executors. It works now :) Virgil. On Sat, Aug 22, 2015 at 12:44 AM, Du Li l...@yahoo-inc.com wrote: Following is a method that retrieves the list of executors registered to a spark context. It worked perfectly with spark-submit in standalone mode for my project. /** * A simplified method that just returns the current active/registered executors * excluding the driver. * @param sc * The spark context to retrieve registered executors. * @return * A list of executors each in the form of host:port. */ def currentActiveExecutors(sc: SparkContext): Seq[String] = { val allExecutors = sc.getExecutorMemoryStatus.map(_._1) val driverHost: String = sc.getConf.get(spark.driver.host) allExecutors.filter(! _.split(:)(0).equals(driverHost)).toList } On Friday, August 21, 2015 1:53 PM, Virgil Palanciuc virg...@gmail.com wrote: Hi Akhil, I'm using spark 1.4.1. Number of executors is not in the command line, not in the getExecutorMemoryStatus (I already mentioned that I tried that, works in spark-shell but not when executed via spark-submit). I tried looking at defaultParallelism too, it's 112 (7 executors * 16 cores) when ran via spark-shell, but just 2 when ran via spark-submit. But the scheduler obviously knows this information. It *must* know it. How can I access it? Other that parsing the HTML of the WebUI, that is... that's pretty much guaranteed to work, and maybe I'll do that, but it's extremely convoluted. Regards, Virgil. On Fri, Aug 21, 2015 at 11:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Which version spark are you using? There was a discussion happened over here http://apache-spark-user-list.1001560.n3.nabble.com/Determine-number-of-running-executors-td19453.html http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccacbyxk+ya1rbbnkwjheekpnbsbh10rykuzt-laqgpdanvhm...@mail.gmail.com%3E On Aug 21, 2015 7:42 AM, Virgil Palanciuc vir...@palanciuc.eu wrote: Is there any reliable way to find out the number of executors programatically - regardless of how the job is run? A method that preferably works for spark-standalone, yarn, mesos, regardless whether the code runs from the shell or not? Things that I tried and don't work: - sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell, does not work if task submitted via spark-submit - sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't work unless explicitly configured - call to http://master:8080/json (this used to work, but doesn't anymore?) I guess I could parse the output html from the Spark UI... but that seems dumb. is there really no better way? Thanks, Virgil.
application logs for long lived job on YARN
When running long-lived job on YARN like Spark Streaming, I found that container logs gone after days on executor nodes, although the job itself is still running. I am using cdh5.4.0 and have aggregated logs enabled. Because the local logs are gone on executor nodes, I don't see any aggregated logs on hdfs after the job is killed or failed. Is there a YARN config to keep the logs from being deleted for long-lived streaming job? -- Chen Song
Spark 1.3.1 saveAsParquetFile hangs on app exit
I have a simple test that is hanging when using s3a with spark 1.3.1. Is there something I need to do to cleanup the S3A file system? The write to S3 appears to have worked but this job hangs in the spark-shell and using spark-submit. Any help would be greatly appreciated. TIA. import sqlContext.implicits._ import com.datastax.spark.connector._ case class LU(userid: String, timestamp: Long, lat: Double, lon: Double) val uid =testuser val lue = sc.cassandraTable[LU](test, foo).where(userid=?, uid).toDF lue.saveAsParquetFile(s3a://twc-scratch/craig_lues) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-saveAsParquetFile-hangs-on-app-exit-tp24460.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.1 saveAsParquetFile hangs on app exit
Could you please show jstack result of the hanged process? Thanks! Cheng On 8/26/15 10:46 PM, cingram wrote: I have a simple test that is hanging when using s3a with spark 1.3.1. Is there something I need to do to cleanup the S3A file system? The write to S3 appears to have worked but this job hangs in the spark-shell and using spark-submit. Any help would be greatly appreciated. TIA. import sqlContext.implicits._ import com.datastax.spark.connector._ case class LU(userid: String, timestamp: Long, lat: Double, lon: Double) val uid =testuser val lue = sc.cassandraTable[LU](test, foo).where(userid=?, uid).toDF lue.saveAsParquetFile(s3a://twc-scratch/craig_lues) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-saveAsParquetFile-hangs-on-app-exit-tp24460.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Issue with building Spark v1.4.1-rc4 with Scala 2.11
Hi everybody, I tried to build Spark v1.4.1-rc4 with Scala 2.11: ../apache-maven-3.3.3/bin/mvn -Dscala-2.11 -DskipTests clean install Before running this, I deleted: ../.m2/repository/org/apache/spark ../.m2/repository/org/spark-project My changes to the code: I just changed line 174 of org.apache.spark.executor.Executor$TaskRunner to: logInfo(stest Executor is trying to kill $taskName (TID $taskId)) Everything builds without an error, but I have an issue. When I look into the jar of spark-core_2.10, I can see the changed string in Executor$TaskRunner$$anonfun$kill$1.class. But when I look into spark-core_2.11 the corresponding string didn't change. It seems like it downloads the jar from maven. Do you know what I did wrong? I also tried to run mvn -Dscala-2.11 -DskipTests clean install on the current master and got the following error: [ERROR] Failed to execute goal org.apache.maven.plugins:maven-enforcer-plugin:1.4:enforce (enforce-versions) on project spark-parent_2.10: Some Enforcer rules have failed. Look above for specific messages explaining why the rule failed. - [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException Thank you for your help. Best regards, Felix
Re: reduceByKey not working on JavaPairDStream
I don't see that you invoke any action in this code. It won't do anything unless you tell it to perform an action that requires the transformations. On Wed, Aug 26, 2015 at 7:05 AM, Deepesh Maheshwari deepesh.maheshwar...@gmail.com wrote: Hi, I have applied mapToPair and then a reduceByKey on a DStream to obtain a JavaPairDStreamString, MapString, Object. I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained above. But i do not see any logs from reduceByKey operation. Can anyone explain why is this happening..? find My Code Below - /*** * GroupLevel1 Groups - articleId, host and tags */ JavaPairDStreamString, MapString, Object groupLevel1 = inputDataMap .mapToPair( new PairFunctionMapString, Object, String, MapString, Object() { private static final long serialVersionUID = 5196132687044875422L; @Override public Tuple2String, MapString, Object call( MapString, Object map) throws Exception { String host = (String) map.get(host); String articleId = (String) map.get(articleId); List tags = (List) map.get(tags); if (host == null || articleId == null) { logger.error(*** Error Doc \n + map); } String key = articleId_ + articleId + _host_ + host + _tags_ + tags.toString(); //logger.info(key); System.out.println(Printing Key - + key); map.put(articlecount, 1L); return new Tuple2String, MapString, Object(key, map); } }) .reduceByKey( new Function2MapString, Object, MapString, Object, MapString, Object() { private static final long serialVersionUID = 1L; @Override public MapString, Object call( MapString, Object map1, MapString, Object map2) throws Exception { Long count1 = (Long) map1.get(articlecount); Long count2 = (Long) map2.get(articlecount); map1.put(articlecount, count1 + count2); return map1; } }); /*** * Grouping level 1 groups on articleId+host+tags * Tags can be multiple for an article. * Grouping level 2 does - * 1. For each tag in a row, find occurrence of that tag in other rows. * 2. If one tag found in another row, then add the articleCount of current and new row and put as articleCount for that tag. * Note - * Idea behind this grouping is to get all article counts that contain a particular tag and preserve this value. */ JavaPairDStreamString, MapString, Object groupLevel2 = groupLevel1.flatMapToPair(new PairFlatMapFunctionTuple2String, MapString, Object, String, MapString, Object() { @Override public IterableTuple2String, MapString, Object call(Tuple2String, MapString, Object stringMapTuple2) throws Exception { System.out.println(group level 2 tuple 1 - + stringMapTuple2._1()); System.out.println(group level 2 tuple 2 - + stringMapTuple2._2()); ArrayListString tagList = (ArrayListString) stringMapTuple2._2().get(tags); ArrayList tagKeyList = new ArrayList(); String host = (String) stringMapTuple2._2().get(host); StringBuilder key; for (String tag : tagList) { key = new StringBuilder(host_).append(host).append(_tag_).append(tag); System.out.println(generated Key - +key); tagKeyList.add(new Tuple2String, MapString, Object(key.toString(), stringMapTuple2._2())); } return tagKeyList; } }); groupLevel2 = groupLevel2.reduceByKey(new Function2MapString, Object, MapString, Object, MapString, Object() { @Override public MapString, Object call(MapString, Object dataMap1, MapString, Object dataMap2) throws Exception { System.out.println(Type of article map in 1 + dataMap1.get(articleId).getClass()); System.out.println(Type of article map in 2 + dataMap2.get(articleId).getClass());
Relation between threads and executor core
Hi All, Few basic queries :- 1. Is there a way we can control the number of threads per executor core? 2. Does this parameter “executor-cores” also has say in deciding how many threads to be run? Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re:Re: How to increase data scale in Spark SQL Perf
Increase the number of executors, :-) At 2015-08-26 16:57:48, Ted Yu yuzhih...@gmail.com wrote: Mind sharing how you fixed the issue ? Cheers On Aug 26, 2015, at 1:50 AM, Todd bit1...@163.com wrote: Sorry for the noise, It's my bad...I have worked it out now. At 2015-08-26 13:20:57, Todd bit1...@163.com wrote: I think the answer is No. I only see such message on the console..and #2 is the thread stack trace。 I am thinking is that in Spark SQL Perf forks many dsdgen process to generate data when the scalafactor is increased which at last exhaust the JVM When thread exception is thrown on the console and I leave it there for some while(15min about),then eventually I will see OutOfMemory occur Can you guys try to run it if you have the environment ? I think you may reproduce it. Thanks! At 2015-08-26 13:01:34, Ted Yu yuzhih...@gmail.com wrote: The error in #1 below was not informative. Are you able to get more detailed error message ? Thanks On Aug 25, 2015, at 6:57 PM, Todd bit1...@163.com wrote: Thanks Ted Yu. Following are the error message: 1. The exception that is shown on the UI is : Exception in thread Thread-113 Exception in thread Thread-126 Exception in thread Thread-64 Exception in thread Thread-90 Exception in thread Thread-117 Exception in thread Thread-80 Exception in thread Thread-115 Exception in thread ResponseProcessor for block BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984 Exception in thread qtp1270119920-57 Exception in thread Thread-77 Exception in thread Thread-132 Exception in thread Thread-68 Exception in thread Thread-61 Exception in thread Thread-70 Exception in thread qtp1270119920-52 Exception in thread Thread-88 Exception in thread qtp318933312-47 Exception in thread qtp1270119920-56 2. jstack the process, I see bunch of following message: Thread 31258: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame) - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame) - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 (Interpreted frame) - scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp() @bci=11, line=142 (Interpreted frame) - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 (Interpreted frame) Thread 31257: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame) - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame) - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 (Interpreted frame) - scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp() @bci=11, line=142 (Interpreted frame) - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 (Interpreted frame) At 2015-08-25 19:32:56, Ted Yu yuzhih...@gmail.com wrote: Looks like you were attaching images to your email which didn't go through. Consider using third party site for images - or paste error in text. Cheers On Tue, Aug 25, 2015 at 4:22 AM, Todd bit1...@163.com wrote: Hi, The spark sql perf itself contains benchmark data generation. I am using spark shell to run the spark sql perf to generate the data with 10G memory for both driver and executor. When I increase the scalefactor to be 30,and run the job, Then I got the following error: When I jstack it to see the status of the thread. I see the following: looks it is waiting for the process that the spark job kicks off.
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
The first thing that stands out to me is createOnError = true Are you sure the checkpoint is actually loading, as opposed to failing and starting the job anyway? There should be info lines that look like INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 144059718 ms [(test,1,162,220) You should be able to tell from those whether the offset ranges being loaded from the checkpoint look reasonable. Also, is there a reason you're calling directKStream.checkpoint(checkpointDuration) Just calling checkpoint on the streaming context should be sufficient to save the metadata On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang suchenz...@gmail.com wrote: Sure thing! The main looks like: -- val kafkaBrokers = conf.getString(s$varPrefix.metadata.broker.list) val kafkaConf = Map( zookeeper.connect - zookeeper, group.id - options.group, zookeeper.connection.timeout.ms - 1, auto.commit.interval.ms - 1000, rebalance.max.retries - 25, bootstrap.servers - kafkaBrokers ) val ssc = StreamingContext.getOrCreate(checkpointDirectory, () = { createContext(kafkaConf, checkpointDirectory, topic, numThreads, isProd) }, createOnError = true) ssc.start() ssc.awaitTermination() -- And createContext is defined as: -- val batchDuration = Seconds(5) val checkpointDuration = Seconds(20) private val AUTO_OFFSET_COMMIT = auto.commit.enable def createContext(kafkaConf: Map[String, String], checkpointDirectory: String, topic: String, numThreads: Int, isProd: Boolean) : StreamingContext = { val sparkConf = new SparkConf().setAppName(***) val ssc = new StreamingContext(sparkConf, batchDuration) ssc.checkpoint(checkpointDirectory) val topicSet = topic.split(,).toSet val groupId = kafkaConf.getOrElse(group.id, ) val directKStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicSet) directKStream.checkpoint(checkpointDuration) val table = *** directKStream.foreachRDD { rdd = val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.flatMap(rec = someFunc(rec)) .reduceByKey((i1: Long, i2: Long) = if (i1 i2) i1 else i2) .foreachPartition { partitionRec = val dbWrite = DynamoDBWriter() partitionRec.foreach { /* Update Dynamo Here */ } } /** Set up ZK Connection **/ val props = new Properties() kafkaConf.foreach(param = props.put(param._1, param._2)) props.setProperty(AUTO_OFFSET_COMMIT, false) val consumerConfig = new ConsumerConfig(props) assert(!consumerConfig.autoCommitEnable) val zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) offsetRanges.foreach { osr = val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic) val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition} ZkUtils.updatePersistentPath(zkClient, zkPath, osr.untilOffset.toString) } } ssc } On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger c...@koeninger.org wrote: Sounds like something's not set up right... can you post a minimal code example that reproduces the issue? On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com wrote: Yeah. All messages are lost while the streaming job was down. On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org wrote: Are you actually losing messages then? On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote: No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the
Re: Build k-NN graph for large dataset
If you don't want to compute all N^2 similarities, you need to implement some kind of blocking first. For example, LSH (locally sensitive hashing). A quick search gave this link to a Spark implementation: http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm trying to find an efficient way to build a k-NN graph for a large dataset. Precisely, I have a large set of high dimensional vector (say d 1) and I want to build a graph where those high dimensional points are the vertices and each one is linked to the k-nearest neighbor based on some kind similarity defined on the vertex spaces. My problem is to implement an efficient algorithm to compute the weight matrix of the graph. I need to compute a N*N similarities and the only way I know is to use cartesian operation follow by map operation on RDD. But, this is very slow when the N is large. Is there a more cleaver way to do this for an arbitrary similarity function ? Cheers, Jao
Re: Custom Offset Management
That argument takes a function from MessageAndMetadata to whatever you want your stream to contain. See https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala#L57 On Wed, Aug 26, 2015 at 7:55 AM, Deepesh Maheshwari deepesh.maheshwar...@gmail.com wrote: Hi Folks, My Spark application interacts with kafka for getting data through Java Api. I am using Direct Approach (No Receivers) - which use Kafka’s simple consumer API to Read data. So, kafka offsets need to be handles explicitly. In case of Spark failure i need to save the offset state of kafka for resuming from the failure point. I am saving these points in MongoDB. Please tell he how to initialize Kafka DirectStream with saved offset points. I want to initialize kafka stream in Spark Streaming with required offset points. There is method i gets on web. KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet, fromOffsets, arg8); arg8 - kafka.message.MessageAndMetadata Please tell me how to handle and initialize this. Regards, Deepesh
Setting number of CORES from inside the Topology (JAVA code )
Hey , I need to set the number of cores from inside the topology . Its working fine by setting in spark-env.sh but unable to do via setting key/value for conf . SparkConf sparkConf = new SparkConf().setAppName(JavaCustomReceiver).setMaster(local[4]); if(toponame.equals(IdentityTopology)) { sparkConf.setExecutorEnv(SPARK_WORKER_CORES,1); } -- Thanks Regards, Anshu Shukla
Building spark-examples takes too much time using Maven
I checked out the master branch and started playing around with the examples. I want to build a jar of the examples as I wish run them using the modified spark jar that I have. However, packaging spark-examples takes too much time as maven tries to download the jar dependencies rather than use the jar that are already present int the system as I extended and packaged spark itself locally?
Re: Build k-NN graph for large dataset
Yes. And a paper that describes using grids (actually varying grids) is http://research.microsoft.com/en-us/um/people/jingdw/pubs%5CCVPR12-GraphConstruction.pdf In the Spark GraphX In Action book that Robin East and I are writing, we implement a drastically simplified version of this in chapter 7, which should become available in the MEAP mid-September. http://www.manning.com/books/spark-graphx-in-action From: Kristina Rogale Plazonic kpl...@gmail.com To: Jaonary Rabarisoa jaon...@gmail.com Cc: user user@spark.apache.org Sent: Wednesday, August 26, 2015 7:24 AM Subject: Re: Build k-NN graph for large dataset If you don't want to compute all N^2 similarities, you need to implement some kind of blocking first. For example, LSH (locally sensitive hashing). A quick search gave this link to a Spark implementation: http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm trying to find an efficient way to build a k-NN graph for a large dataset. Precisely, I have a large set of high dimensional vector (say d 1) and I want to build a graph where those high dimensional points are the vertices and each one is linked to the k-nearest neighbor based on some kind similarity defined on the vertex spaces. My problem is to implement an efficient algorithm to compute the weight matrix of the graph. I need to compute a N*N similarities and the only way I know is to use cartesian operation follow by map operation on RDD. But, this is very slow when the N is large. Is there a more cleaver way to do this for an arbitrary similarity function ? Cheers, Jao
Re: Build k-NN graph for large dataset
+1 to all of the above esp. Dimensionality reduction and locality sensitive hashing / min hashing. There's also an algorithm implemented in MLlib called DIMSUM which was developed at Twitter for this purpose. I've been meaning to try it and would be interested to hear about results you get. https://blog.twitter.com/2014/all-pairs-similarity-via-dimsum Charlie — Sent from Mailbox On Wednesday, Aug 26, 2015 at 09:57, Michael Malak michaelma...@yahoo.com.invalid, wrote: Yes. And a paper that describes using grids (actually varying grids) is http://research.microsoft.com/en-us/um/people/jingdw/pubs%5CCVPR12-GraphConstruction.pdf In the Spark GraphX In Action book that Robin East and I are writing, we implement a drastically simplified version of this in chapter 7, which should become available in the MEAP mid-September. http://www.manning.com/books/spark-graphx-in-action If you don't want to compute all N^2 similarities, you need to implement some kind of blocking first. For example, LSH (locally sensitive hashing). A quick search gave this link to a Spark implementation: http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm trying to find an efficient way to build a k-NN graph for a large dataset. Precisely, I have a large set of high dimensional vector (say d 1) and I want to build a graph where those high dimensional points are the vertices and each one is linked to the k-nearest neighbor based on some kind similarity defined on the vertex spaces. My problem is to implement an efficient algorithm to compute the weight matrix of the graph. I need to compute a N*N similarities and the only way I know is to use cartesian operation follow by map operation on RDD. But, this is very slow when the N is large. Is there a more cleaver way to do this for an arbitrary similarity function ? Cheers, Jao
Re: DataFrame/JDBC very slow performance
Thanks Michael, much appreciated! Nothing should be held in memory for a query like this (other than a single count per partition), so I don't think that is the problem. There is likely an error buried somewhere. For your above comments - I don't get any error but just get the NULL as return value. I have tried digging deeper in the logs etc but couldn't spot anything. Is there any other suggestions to spot such buried errors? Thanks, Dhaval On Mon, Aug 24, 2015 at 6:38 PM, Michael Armbrust mich...@databricks.com wrote: Much appreciated! I am not comparing with select count(*) for performance, but it was one simple thing I tried to check the performance :). I think it now makes sense since Spark tries to extract all records before doing the count. I thought having an aggregated function query submitted over JDBC/Teradata would let Teradata do the heavy lifting. We currently only push down filters since there is a lot of variability in what types of aggregations various databases support. You can manually pushdown whatever you want by replacing the table name with a subquery (i.e. (SELECT ... FROM ...)) - How come my second query for (5B) records didn't return anything even after a long processing? If I understood correctly, Spark would try to fit it in memory and if not then might use disk space, which I have available? Nothing should be held in memory for a query like this (other than a single count per partition), so I don't think that is the problem. There is likely an error buried somewhere. - Am I supposed to do any Spark related tuning to make it work? My main need is to access data from these large table(s) on demand and provide aggregated and calculated results much quicker, for that I was trying out Spark. Next step I am thinking to export data in Parque files and give it a try. Do you have any suggestions for to deal with the problem? Exporting to parquet will likely be a faster option that trying to query through JDBC, since we have many more opportunities for parallelism here.
Spark-on-YARN LOCAL_DIRS location
Hi, I am having issues with /tmp space filling up during Spark jobs because Spark-on-YARN uses the yarn.nodemanager.local-dirs for shuffle space. I noticed this message appears when submitting Spark-on-YARN jobs: WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). I can’t find much documentation on where to set the LOCAL_DIRS property. Please can someone advise whether this is a yarn-env.sh or a spark-env.sh property and whether it would then use the directory specified by this env variable as a shuffle area instead of the default yarn.nodemanager.local-dirs location? Thanks, Mike This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm
Dataframe collect() work but count() fails
Hello, I'm seeing a strange behavior where count() on a DataFrame errors as shown below but collect() works fine. This is what I tried from spark-shell. solrRDD.queryShards() return a javaRDD. val rdd = solrRDD.queryShards(sc, query, _version_, 2).rdd rdd: org.apache.spark.rdd.RDD[org.apache.solr.common.SolrDocument] = MapPartitionsRDD[3] at flatMap at SolrRDD.java:335 scala val schema = solrRDD.getQuerySchema(query) schema: org.apache.spark.sql.types.StructType = StructType(StructField(ApplicationType,StringType,true), StructField(Language,StringType,true), StructField(MfgCode,StringType,true), StructField(OpSystemCode,StringType,true), StructField(ProductCode,StringType,true), StructField(ProductName,StringType,true), StructField(ProductVersion,StringType,true), StructField(_version_,LongType,true), StructField(id,StringType,true)) scala val rows = rdd.map(doc = RowFactory.create(schema.fieldNames.map(f = doc.getFirstValue(f))) ) //Convert RDD[SolrDocument] to RDD[Row] scala val df = sqlContext.createDataFrame(rows, schema) scala val data = df.collect data: Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@2135773a], [[Ljava.lang.Object;@3d2691de], [[Ljava.lang.Object;@2f32a52f], [[Ljava.lang.Object;@25fac8de] scala df.count 15/08/26 14:53:28 WARN TaskSetManager: Lost task 1.3 in stage 6.0 (TID 42, 172.19.110.1): java.lang.AssertionError: assertion failed: Row column number mismatch, expected 9 columns, but got 1. Row content: [[Ljava.lang.Object;@1d962eb2] at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:140) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) Any idea what is wrong here? Srikanth
Re: Spark cluster multi tenancy
Would be interested to know the answer too. On Wed, Aug 26, 2015 at 11:45 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Interestingly, if there is nothing running on dev spark-shell, it recovers successfully and regains the lost executors. Attaching the log for that. Notice, the Registering block manager .. statements in the very end after all executors were lost. On Wed, Aug 26, 2015 at 11:27 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Attaching log for when the dev job gets stuck (once all its executors are lost due to preemption). This is a spark-shell job running in yarn-client mode. On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood sadhan.s...@gmail.com wrote: Hi All, We've set up our spark cluster on aws running on yarn (running on hadoop 2.3) with fair scheduling and preemption turned on. The cluster is shared for prod and dev work where prod runs with a higher fair share and can preempt dev jobs if there are not enough resources available for it. It appears that dev jobs which get preempted often get unstable after losing some executors and the whole jobs gets stuck (without making any progress) or end up getting restarted (and hence losing all the work done). Has someone encountered this before ? Is the solution just to set spark.task.maxFailures to a really high value to recover from task failures in such scenarios? Are there other approaches that people have taken for spark multi tenancy that works better in such scenario? Thanks, Sadhan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Differing performance in self joins
-dev +user I'd suggest running .explain() on both dataframes to understand the performance better. The problem is likely that we have a pattern that looks for cases where you have an equality predicate where either side can be evaluated using one side of the join. We turn this into a hash join. (df(eday) - laggard(p_eday)) === 1) is pretty tricky for us to understand, and so the pattern misses the possible optimized plan. On Wed, Aug 26, 2015 at 6:10 PM, David Smith das...@gmail.com wrote: I've noticed that two queries, which return identical results, have very different performance. I'd be interested in any hints about how avoid problems like this. The DataFrame df contains a string field series and an integer eday, the number of days since (or before) the 1970-01-01 epoch. I'm doing some analysis over a sliding date window and, for now, avoiding UDAFs. I'm therefore using a self join. First, I create val laggard = df.withColumnRenamed(series, p_series).withColumnRenamed(eday, p_eday) Then, the following query runs in 16s: df.join(laggard, (df(series) === laggard(p_series)) (df(eday) === (laggard(p_eday) + 1))).count while the following query runs in 4 - 6 minutes: df.join(laggard, (df(series) === laggard(p_series)) ((df(eday) - laggard(p_eday)) === 1)).count It's worth noting that the series term is necessary to keep the query from doing a complete cartesian product over the data. Ideally, I'd like to look at lags of more than one day, but the following is equally slow: df.join(laggard, (df(series) === laggard(p_series)) (df(eday) - laggard(p_eday)).between(1,7)).count Any advice about the general principle at work here would be welcome. Thanks, David -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Differing-performance-in-self-joins-tp13864.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Help! Stuck using withColumn
Hi Saif, In both cases you’re referencing columns that don’t exist in the current DataFrame. The first email you did a select and then a withColumn for ‘month_date_cur' on the resulting DF, but that column does not exist, because you did a select for only ‘month_balance’. In the second email you’re using 2 different DFs and trying to select a column from one in a withColumn on the other, that just wouldn’t work. Also, there’s no explicit column names given to either DF, so that column doesn’t exist. Did you intend to do a join instead? Thanks, Silvio From: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com Date: Wednesday, August 26, 2015 at 6:06 PM To: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com, user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Help! Stuck using withColumn I can reproduce this even simpler with the following: val gf = sc.parallelize(Array(3,6,4,7,3,4,5,5,31,4,5,2)).toDF(ASD) val ff = sc.parallelize(Array(4,6,2,3,5,1,4,6,23,6,4,7)).toDF(GFD) gf.withColumn(DSA, ff.col(GFD)) org.apache.spark.sql.AnalysisException: resolved attribute(s) GFD#421 missing from ASD#419 in operator !Project [ASD#419,GFD#421 AS DSA#422]; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42) at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595) at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039) From: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com] Sent: Wednesday, August 26, 2015 6:47 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Help! Stuck using withColumn This simple comand call: val final_df = data.select(month_balance).withColumn(month_date, data.col(month_date_curr)) Is throwing: org.apache.spark.sql.AnalysisException: resolved attribute(s) month_date_curr#324 missing from month_balance#234 in operator !Project [month_balance#234, month_date_curr#324 AS month_date_curr#408]; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42) at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595) at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)
Re: Join with multiple conditions (In reference to SPARK-7197)
Davies, I created an issue - SPARK-10246 https://issues.apache.org/jira/browse/SPARK-10246 On Tue, Aug 25, 2015 at 12:53 PM, Davies Liu dav...@databricks.com wrote: It's good to support this, could you create a JIRA for it and target for 1.6? On Tue, Aug 25, 2015 at 11:21 AM, Michal Monselise michal.monsel...@gmail.com wrote: Hello All, PySpark currently has two ways of performing a join: specifying a join condition or column names. I would like to perform a join using a list of columns that appear in both the left and right DataFrames. I have created an example in this question on Stack Overflow. Basically, I would like to do the following as specified in the documentation in /spark/python/pyspark/sql/dataframe.py row 560 and specify a list of column names: df.join(df4, ['name', 'age']).select(df.name, df.age).collect() However, this produces an error. In JIRA issue SPARK-7197, it is mentioned that the syntax is actually different from the one specified in the documentation for joining using a condition. Documentation: cond = [df.name == df3.name, df.age == df3.age] df.join(df3, cond, 'outer').select(df.name, df3.age).collect() JIRA Issue: a.join(b, (a.year==b.year) (a.month==b.month), 'inner') In other words. the join function cannot take a list. I was wondering if you could also clarify what is the correct syntax for providing a list of columns. Thanks, Michal
error accessing vertexRDD
Hi all, question on an issue im having with a vertexRDD. If i kick of my spark shell with something like this: then run: it will finish and give me the count but is see a few errors (see below). This is okay for this small dataset but when trying with a large data set it doesnt finish because of the number of errors. this works okay if i kick of my spark shell with master = local. Any help appreciated -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-accessing-vertexRDD-tp24466.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: query avro hive table in spark sql
I'd suggest looking at http://spark-packages.org/package/databricks/spark-avro On Wed, Aug 26, 2015 at 11:32 AM, gpatcham gpatc...@gmail.com wrote: Hi, I'm trying to query hive table which is based on avro in spark SQL and seeing below errors. 15/08/26 17:51:12 WARN avro.AvroSerdeUtils: Encountered AvroSerdeException determining schema. Returning signal schema to indicate problem org.apache.hadoop.hive.serde2.avro.AvroSerdeException: Neither avro.schema.literal nor avro.schema.url specified, can't determine table schema at org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrThrowException(AvroSerdeUtils.java:68) at org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrReturnErrorSchema(AvroSerdeUtils.java:93) at org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:60) at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:375) at org.apache.hadoop.hive.ql.metadata.Partition.getDeserializer(Partition.java:249) Its not able to determine schema. Hive table is pointing to avro schema using url. I'm stuck and couldn't find more info on this. Any pointers ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/query-avro-hive-table-in-spark-sql-tp24462.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to unit test HiveContext without OutOfMemoryError (using sbt)
I'd suggest setting sbt to fork when running tests. On Wed, Aug 26, 2015 at 10:51 AM, Mike Trienis mike.trie...@orcsol.com wrote: Thanks for your response Yana, I can increase the MaxPermSize parameter and it will allow me to run the unit test a few more times before I run out of memory. However, the primary issue is that running the same unit test in the same JVM (multiple times) results in increased memory (each run of the unit test) and I believe it has something to do with HiveContext not reclaiming memory after it is finished (or I'm not shutting it down properly). It could very well be related to sbt, however, it's not clear to me. On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: The PermGen space error is controlled with MaxPermSize parameter. I run with this in my pom, I think copied pretty literally from Spark's own tests... I don't know what the sbt equivalent is but you should be able to pass it...possibly via SBT_OPTS? plugin groupIdorg.scalatest/groupId artifactIdscalatest-maven-plugin/artifactId version1.0/version configuration reportsDirectory${project.build.directory}/surefire-reports/reportsDirectory parallelfalse/parallel junitxml./junitxml filereportsSparkTestSuite.txt/filereports argLine-Xmx3g -XX:MaxPermSize=256m -XX:ReservedCodeCacheSize=512m/argLine stderr/ systemProperties java.awt.headlesstrue/java.awt.headless spark.testing1/spark.testing spark.ui.enabledfalse/spark.ui.enabled spark.driver.allowMultipleContextstrue/spark.driver.allowMultipleContexts /systemProperties /configuration executions execution idtest/id goals goaltest/goal /goals /execution /executions /plugin /plugins On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hello, I am using sbt and created a unit test where I create a `HiveContext` and execute some query and then return. Each time I run the unit test the JVM will increase it's memory usage until I get the error: Internal error when running tests: java.lang.OutOfMemoryError: PermGen space Exception in thread Thread-2 java.io.EOFException As a work-around, I can fork a new JVM each time I run the unit test, however, it seems like a bad solution as takes a while to run the unit test. By the way, I tried to importing the TestHiveContext: - import org.apache.spark.sql.hive.test.TestHiveContext However, it suffers from the same memory issue. Has anyone else suffered from the same problem? Note that I am running these unit tests on my mac. Cheers, Mike.
Re: JDBC Streams
Thanks Cody. Are you suggesting to put the cache in global context in each executor JVM, in a Scala object for example. Then have a scheduled task to refresh the cache (or triggered by the expiry if Guava)? Chen On Wed, Aug 26, 2015 at 10:51 AM, Cody Koeninger c...@koeninger.org wrote: If your data only changes every few days, why not restart the job every few days, and just broadcast the data? Or you can keep a local per-jvm cache with an expiry (e.g. guava cache) to avoid many mysql reads On Wed, Aug 26, 2015 at 9:46 AM, Chen Song chen.song...@gmail.com wrote: Piggyback on this question. I have a similar use case but a bit different. My job is consuming a stream from Kafka and I need to join the Kafka stream with some reference table from MySQL (kind of data validation and enrichment). I need to process this stream every 1 min. The data in MySQL is not changed very often, maybe once a few days. So my requirement is: * I cannot easily use broadcast variable because the data does change, although not very often. * I am not sure if it is good practice to read data from MySQL in every batch (in my case, 1 min). Anyone has done this before, any suggestions and feedback is appreciated. Chen On Sun, Jul 5, 2015 at 11:50 AM, Ashic Mahtab as...@live.com wrote: If it is indeed a reactive use case, then Spark Streaming would be a good choice. One approach worth considering - is it possible to receive a message via kafka (or some other queue). That'd not need any polling, and you could use standard consumers. If polling isn't an issue, then writing a custom receiver will work fine. The way a receiver works is this: * Your receiver has a receive() function, where you'd typically start a loop. In your loop, you'd fetch items, and call store(entry). * You control everything in the receiver. If you're listening on a queue, you receive messages, store() and ack your queue. If you're polling, it's up to you to ensure delays between db calls. * The things you store() go on to make up the rdds in your DStream. So, intervals, windowing, etc. apply to those. The receiver is the boundary between your data source and the DStream RDDs. In other words, if your interval is 15 seconds with no windowing, then the things that went to store() every 15 seconds are bunched up into an RDD of your DStream. That's kind of a simplification, but should give you the idea that your db polling interval and streaming interval are not tied together. -Ashic. -- Date: Mon, 6 Jul 2015 01:12:34 +1000 Subject: Re: JDBC Streams From: guha.a...@gmail.com To: as...@live.com CC: ak...@sigmoidanalytics.com; user@spark.apache.org Hi Thanks for the reply. here is my situation: I hve a DB which enbles synchronus CDC, think this as a DBtrigger which writes to a taable with changed values as soon as something changes in production table. My job will need to pick up the data as soon as it arrives which can be every 1 min interval. Ideally it will pick up the changes, transform it into a jsonand puts it to kinesis. In short, I am emulating a Kinesis producer with a DB source (dont even ask why, lets say these are the constraints :) ) Please advice (a) is spark a good choice here (b) whats your suggestion either way. I understand I can easily do it using a simple java/python app but I am little worried about managing scaling/fault tolerance and thats where my concern is. TIA Ayan On Mon, Jul 6, 2015 at 12:51 AM, Ashic Mahtab as...@live.com wrote: Hi Ayan, How continuous is your workload? As Akhil points out, with streaming, you'll give up at least one core for receiving, will need at most one more core for processing. Unless you're running on something like Mesos, this means that those cores are dedicated to your app, and can't be leveraged by other apps / jobs. If it's something periodic (once an hour, once every 15 minutes, etc.), then I'd simply write a normal spark application, and trigger it periodically. There are many things that can take care of that - sometimes a simple cronjob is enough! -- Date: Sun, 5 Jul 2015 22:48:37 +1000 Subject: Re: JDBC Streams From: guha.a...@gmail.com To: ak...@sigmoidanalytics.com CC: user@spark.apache.org Thanks Akhil. In case I go with spark streaming, I guess I have to implment a custom receiver and spark streaming will call this receiver every batch interval, is that correct? Any gotcha you see in this plan? TIA...Best, Ayan On Sun, Jul 5, 2015 at 5:40 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If you want a long running application, then go with spark streaming (which kind of blocks your resources). On the other hand, if you use job server then you can actually use the resources (CPUs) for other jobs also when your dbjob is not using them. Thanks Best Regards On Sun, Jul 5, 2015 at 5:28 AM, ayan guha guha.a...@gmail.com wrote: Hi All
spark streaming 1.3 kafka topic error
Hi My streaming application gets killed with below error 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: ArrayBuffer(kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], [testtopic,193])) 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for time 144062612 ms org.apache.spark.SparkException: ArrayBuffer(kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([testtopic,115])) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at Kafka params in job logs printed are : value.serializer = class org.apache.kafka.common.serialization.StringSerializer key.serializer = class org.apache.kafka.common.serialization.StringSerializer block.on.buffer.full = true retry.backoff.ms = 100 buffer.memory = 1048576 batch.size = 16384 metrics.sample.window.ms = 3 metadata.max.age.ms = 30 receive.buffer.bytes = 32768 timeout.ms = 3 max.in.flight.requests.per.connection = 5 bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] metric.reporters = [] client.id = compression.type = none retries = 0 max.request.size = 1048576 send.buffer.bytes = 131072 acks = all reconnect.backoff.ms = 10 linger.ms = 0 metrics.num.samples = 2 metadata.fetch.timeout.ms = 6 Is it kafka broker getting down and job is getting killed ? Whats the best way to handle it ? Increasing retries and backoff time wil help and to what values those should be set to never have streaming application failure - rather it keep on retrying after few seconds and send a event so that my custom code can send notification of kafka broker down if its because of that. Thanks
Re: spark streaming 1.3 kafka buffer size
Can I change this param fetch.message.max.bytes or spark.streaming.kafka.maxRatePerPartition at run time across batches. Say I detected some fail condition in my system and I decided to sonsume i next batch interval only 10 messages per partition and if that succeed I reset the max limit to unlimited again . On Wed, Aug 26, 2015 at 9:32 PM, Cody Koeninger c...@koeninger.org wrote: see http://kafka.apache.org/documentation.html#consumerconfigs fetch.message.max.bytes in the kafka params passed to the constructor On Wed, Aug 26, 2015 at 10:39 AM, Shushant Arora shushantaror...@gmail.com wrote: whats the default buffer in spark streaming 1.3 for kafka messages. Say In this run it has to fetch messages from offset 1 to 1. will it fetch all in one go or internally it fetches messages in few messages batch. Is there any setting to configure this no of offsets fetched in one batch?