Spark + Mesos + HDFS resource split

2015-04-27 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi,

I am building a mesos cluster for the purposes of using it to run
spark workloads (in addition to other frameworks). I am under the
impression that it is preferable/recommended to run hdfs datanode
process, spark slave on the same physical node (or EC2 instance or VM).

My question is: What is the recommended resource splitting? How much
memory and CPU should I preallocate for HDFS and how much should I set
aside as allocatable by mesos? In addition, is there some
rule-of-thumb recommendation around this?

- -- Ankur Chauhan
-BEGIN PGP SIGNATURE-

iQEcBAEBAgAGBQJVPgiLAAoJEOSJAMhvLp3L2fEIANkmfTjzEhjQ1IEc5W59F8sP
mT06qpxnd3XPg8DFOPIKxxCAtsVU1fImAOnFYobi9mQlEzcEbDtPMoLh0uStFIIS
cuorK4j0Am8Y1xxYa8BhKuWEtpYoFtSEYIF5eHe5vNlt5FlEvs3vTJ3N/zFbxVsq
I0FQH8r9u27pBJ9/rACyruYhgh/b5Tc6s39uKDFFJnhDWezMF2sF1WCgcIbZRP4+
PAhqLNPuVNAPcpi9JAe8u91d8yeFFVb/00mO2am2cr0BcHnfeWq6ZFftZUQrX3PK
FvD7FpfeFLCS5FinDqMHp2nkGetlJMQsIYRzvn3tmim8OeE6ppFsO0LnRNEqEtQ=
=I22x
-END PGP SIGNATURE-

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cassandra Connection Issue with Spark-jobserver

2015-04-27 Thread Noorul Islam K M

Are you using DSE spark, if so are you pointing spark job server to use DSE 
spark?

Thanks and Regards
Noorul

Anand anand.vi...@monotype.com writes:

 *I am new to Spark world and Job Server

 My Code :*

 package spark.jobserver

 import java.nio.ByteBuffer

 import scala.collection.JavaConversions._
 import scala.collection.mutable.ListBuffer
 import scala.collection.immutable.Map

 import org.apache.cassandra.hadoop.ConfigHelper
 import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
 import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
 import org.apache.cassandra.utils.ByteBufferUtil
 import org.apache.hadoop.mapreduce.Job

 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.spark._
 import org.apache.spark.SparkContext._
 import scala.util.Try

 object CassandraCQLTest extends SparkJob{

   def main(args: Array[String]) {   
 val sc = new SparkContext(local[4], CassandraCQLTest)

 sc.addJar(/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar);
 val config = ConfigFactory.parseString()
 val results = runJob(sc, config)
 println(Result is  + test)
   }
   
   override def validate(sc: SparkContext, config: Config):
 SparkJobValidation = {
 Try(config.getString(input.string))
   .map(x = SparkJobValid)
   .getOrElse(SparkJobInvalid(No input.string config param))
   }
   
   override def runJob(sc: SparkContext, config: Config): Any = {
 val cHost: String = localhost
 val cPort: String = 9160
 val KeySpace = retail
 val InputColumnFamily = ordercf
 val OutputColumnFamily = salecount

 val job = new Job()
 job.setInputFormatClass(classOf[CqlPagingInputFormat])
 ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
 ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
 ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace,
 InputColumnFamily)
 ConfigHelper.setInputPartitioner(job.getConfiguration(),
 Murmur3Partitioner)
 CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), 3)

 /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(),
 user_id='bob') */

 /** An UPDATE writes one or more columns to a record in a Cassandra
 column family */
 val query = UPDATE  + KeySpace + . + OutputColumnFamily +  SET
 sale_count = ? 
 CqlConfigHelper.setOutputCql(job.getConfiguration(), query)

 job.setOutputFormatClass(classOf[CqlOutputFormat])
 ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace,
 OutputColumnFamily)
 ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
 ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
 ConfigHelper.setOutputPartitioner(job.getConfiguration(),
 Murmur3Partitioner)

 val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
   classOf[CqlPagingInputFormat],
   classOf[java.util.Map[String,ByteBuffer]],
   classOf[java.util.Map[String,ByteBuffer]])

 
 val productSaleRDD = casRdd.map {
   case (key, value) = {
 (ByteBufferUtil.string(value.get(prod_id)),
 ByteBufferUtil.toInt(value.get(quantity)))
   }
 }
 val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
 aggregatedRDD.collect().foreach {
   case (productId, saleCount) = println(productId + : + saleCount)
 }

 val casoutputCF  = aggregatedRDD.map {
   case (productId, saleCount) = {
 val outColFamKey = Map(prod_id - ByteBufferUtil.bytes(productId))
 val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
 var outColFamVal = new ListBuffer[ByteBuffer]
 outColFamVal += ByteBufferUtil.bytes(saleCount)
 val outVal: java.util.List[ByteBuffer] = outColFamVal
(outKey, outVal)
   }
 }

 casoutputCF.saveAsNewAPIHadoopFile(
 KeySpace,
 classOf[java.util.Map[String, ByteBuffer]],
 classOf[java.util.List[ByteBuffer]],
 classOf[CqlOutputFormat],
 job.getConfiguration()
   )
 casRdd.count
   }
 }

 *When I push the Jar using spark-jobServer and execute it I get this on
 spark-jobserver terminal
 *
 job-server[ERROR] Exception in thread pool-1-thread-1
 java.lang.NoClassDefFoundError:
 org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat
 job-server[ERROR] at
 spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46)
 job-server[ERROR] at
 spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21)
 job-server[ERROR] at
 spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235)
 job-server[ERROR] at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 job-server[ERROR] at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
 

spark-defaults.conf

2015-04-27 Thread James King
I renamed spark-defaults.conf.template to spark-defaults.conf
and invoked

spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh

But I still get

failed to launch org.apache.spark.deploy.worker.Worker:
--properties-file FILE   Path to a custom Spark properties file.
 Default is conf/spark-defaults.conf.

But I'm thinking it should pick up the default spark-defaults.conf from
conf dir

Am I expecting or doing something wrong?

Regards
jk


Disable partition discovery

2015-04-27 Thread Cosmin Cătălin Sanda
How can one disable *Partition discovery* in *Spark 1.3.0 *when using
*sqlContext.parquetFile*?

Alternatively, is there a way to load *.parquet* files without *Partition
discovery*?

Cosmin


Re: Parquet error reading data that contains array of structs

2015-04-27 Thread Jianshi Huang
FYI,

Parquet schema output:

message pig_schema {
  optional binary cust_id (UTF8);
  optional int32 part_num;
  optional group ip_list (LIST) {
repeated group ip_t {
  optional binary ip (UTF8);
}
  }
  optional group vid_list (LIST) {
repeated group vid_t {
  optional binary vid (UTF8);
}
  }
  optional group fso_list (LIST) {
repeated group fso_t {
  optional binary fso (UTF8);
}
  }
}


And Parquet meta output:

creator: [parquet-mr (build ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7),
parquet-mr version 1.6.0rc7 (build
ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7), parquet-mr]
extra:   pig.schema = cust_id: chararray,part_num: int,ip_list: {ip_t:
(ip: chararray)},vid_list: {vid_t: (vid: chararray)},fso_list: {fso_t:
(fso: chararray)}

file schema: pig_schema

cust_id: OPTIONAL BINARY O:UTF8 R:0 D:1
part_num:OPTIONAL INT32 R:0 D:1
ip_list: OPTIONAL F:1
.ip_t:   REPEATED F:1
..ip:OPTIONAL BINARY O:UTF8 R:1 D:3
vid_list:OPTIONAL F:1
.vid_t:  REPEATED F:1
..vid:   OPTIONAL BINARY O:UTF8 R:1 D:3
fso_list:OPTIONAL F:1
.fso_t:  REPEATED F:1
..fso:   OPTIONAL BINARY O:UTF8 R:1 D:3

row group 1: RC:1201092 TS:537930256 OFFSET:4

cust_id:  BINARY GZIP DO:0 FPO:4 SZ:10629422/27627221/2.60 VC:1201092
ENC:PLAIN,RLE,BIT_PACKED
part_num: INT32 GZIP DO:0 FPO:10629426 SZ:358/252/0.70 VC:1201092
ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED
ip_list:
.ip_t:
..ip: BINARY GZIP DO:0 FPO:10629784 SZ:41331065/180501686/4.37
VC:10540378 ENC:PLAIN,RLE
vid_list:
.vid_t:
..vid:BINARY GZIP DO:0 FPO:51960849 SZ:58820404/254819721/4.33
VC:11011894 ENC:PLAIN,RLE
fso_list:
.fso_t:
..fso:BINARY GZIP DO:0 FPO:110781253 SZ:21363255/74981376/3.51
VC:5612655 ENC:PLAIN,RLE

row group 2: RC:1830769 TS:1045506907 OFFSET:132144508

cust_id:  BINARY GZIP DO:0 FPO:132144508 SZ:17720131/42110882/2.38
VC:1830769 ENC:PLAIN,RLE,BIT_PACKED
part_num: INT32 GZIP DO:0 FPO:149864639 SZ:486/346/0.71 VC:1830769
ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED
ip_list:
.ip_t:
..ip: BINARY GZIP DO:0 FPO:149865125 SZ:37687630/342050955/9.08
VC:20061916 ENC:PLAIN,PLAIN_DICTIONARY,RLE
vid_list:
.vid_t:
..vid:BINARY GZIP DO:0 FPO:187552755 SZ:56498124/516700215/9.15
VC:22410351 ENC:PLAIN,PLAIN_DICTIONARY,RLE
fso_list:
.fso_t:
..fso:BINARY GZIP DO:0 FPO:244050879 SZ:20110276/144644509/7.19
VC:10739272 ENC:PLAIN,PLAIN_DICTIONARY,RLE

row group 3: RC:22445 TS:4304290 OFFSET:264161155

cust_id:  BINARY GZIP DO:0 FPO:264161155 SZ:221527/516312/2.33 VC:22445
ENC:PLAIN,RLE,BIT_PACKED
part_num: INT32 GZIP DO:0 FPO:264382682 SZ:102/64/0.63 VC:22445
ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED
ip_list:
.ip_t:
..ip: BINARY GZIP DO:0 FPO:264382784 SZ:483962/1204312/2.49
VC:123097 ENC:PLAIN_DICTIONARY,RLE
vid_list:
.vid_t:
..vid:BINARY GZIP DO:0 FPO:264866746 SZ:622977/2122080/3.41
VC:133136 ENC:PLAIN,PLAIN_DICTIONARY,RLE
fso_list:
.fso_t:
..fso:BINARY GZIP DO:0 FPO:265489723 SZ:240588/461522/1.92 VC:62173
ENC:PLAIN_DICTIONARY,RLE

Jianshi


On Mon, Apr 27, 2015 at 12:40 PM, Cheng Lian lian.cs@gmail.com wrote:

  Had an offline discussion with Jianshi, the dataset was generated by Pig.

 Jianshi - Could you please attach the output of parquet-schema
 path-to-parquet-file? I guess this is a Parquet format
 backwards-compatibility issue. Parquet hadn't standardized representation
 of LIST and MAP until recently, thus many systems made their own choice and
 are not easily inter-operatable. In earlier days, Spark SQL used LIST and
 MAP formats similar to Avro, which was unfortunately not chosen as the
 current standard format. Details can be found here:
 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md This
 document also defines backwards-compatibility rules to handle legacy
 Parquet data written by old Parquet implementations in various systems.

 So ideally, now Spark SQL should always write data following the standard,
 and implement all backwards-compatibility rules to read legacy data. JIRA
 issue for this is https://issues.apache.org/jira/browse/SPARK-6774

 I'm working on a PR https://github.com/apache/spark/pull/5422 for this.
 To fix SPARK-6774, we need to implement backwards-compatibility rules in
 both record converter and schema converter together. This PR has fixed the
 former, but I still need some time to finish the latter part and add tests.

 Cheng

 On 4/25/15 2:22 AM, Yin Huai wrote:

 oh, I missed that. It is fixed in 1.3.0.

  Also, Jianshi, the dataset was not generated by Spark SQL, right?

 On Fri, Apr 24, 2015 at 11:09 AM, Ted Yu yuzhih...@gmail.com wrote:

 Yin:
 Fix 

How to distribute Spark computation recipes

2015-04-27 Thread Olivier Girardot
Hi everyone,
I know that any RDD is related to its SparkContext and the associated
variables (broadcast, accumulators), but I'm looking for a way to
serialize/deserialize full RDD computations ?

@rxin Spark SQL is, in a way, already doing this but the parsers are
private[sql], is there any way to reuse this work to get Logical/Physical
Plans in  out of Spark ?

Regards,

Olivier.


Re: Spark streaming action running the same work in parallel

2015-04-27 Thread ColinMc
I was able to get it working. Instead of using customers.flatMap to return
alerts. I had to use the following:

customers.foreachRDD(new FunctionJavaPairRDDlt;String,
Iterablelt;QueueEvent, Void() {
@Override
public Void call(final JavaPairRDDString,
Iterablelt;QueueEvent rdd) throws Exception {
rdd.foreachPartition(new
VoidFunctionIteratorlt;Tuple2lt;String, Iterablelt;QueueEvent() {
@Override
public void call(final IteratorTuple2lt;String,
Iterablelt;QueueEvent i)
throws Exception {
}
   }
   }
}

This made sure that we only sent one alert per event for a customer. My unit
test showed that there was one RDD that had both customers with their events
as partitions.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-action-running-the-same-work-in-parallel-tp22613p22665.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-defaults.conf

2015-04-27 Thread Zoltán Zvara
You should distribute your configuration file to workers and set the
appropriate environment variables, like HADOOP_HOME, SPARK_HOME,
HADOOP_CONF_DIR, SPARK_CONF_DIR.

On Mon, Apr 27, 2015 at 12:56 PM James King jakwebin...@gmail.com wrote:

 I renamed spark-defaults.conf.template to spark-defaults.conf
 and invoked

 spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh

 But I still get

 failed to launch org.apache.spark.deploy.worker.Worker:
 --properties-file FILE   Path to a custom Spark properties file.
  Default is conf/spark-defaults.conf.

 But I'm thinking it should pick up the default spark-defaults.conf from
 conf dir

 Am I expecting or doing something wrong?

 Regards
 jk





Spark 1.2.1: How to convert SchemaRDD to CassandraRDD?

2015-04-27 Thread Tash Chainar
Hi all, following the

import com.datastax.spark.connector.SelectableColumnRef;
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import org.apache.spark.sql.SchemaRDD;
import static com.datastax.spark.connector.util.JavaApiHelper.toScalaSeq;
import scala.collection.Seq;

SchemaRDD schemaRDD = cassandraSQLContext.sql( select user.id as user_id
from user );
schemaRDD.cache();
schemaRDD.collect();

I want to do SELECT on schemaRDD as
SeqSelectableColumnRef columnRefs =
toScalaSeq(CassandraJavaUtil.toSelectableColumnRefs(user_id));
CassandraRDDUUID rdd = schemaRDD.select(columnRefs);

but having type mismatch at schemaRDD.select() :
incompatible types: SeqSelectableColumnRef cannot be converted to
SeqExpression

so obviously I need to convert SchemaRDD to CassandraRDD. How can it be
done?


Re: Cassandra Connection Issue with Spark-jobserver

2015-04-27 Thread Anand
I was able to fix the issues by providing right version of cassandra-all and
thrift libraries 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-Connection-Issue-with-Spark-jobserver-tp22587p22664.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ReduceByKey and sorting within partitions

2015-04-27 Thread Marco
Hi,

I'm trying, after reducing by key, to get data ordered among partitions
(like RangePartitioner) and within partitions (like sortByKey or
repartitionAndSortWithinPartition) pushing the sorting down to the
shuffles machinery of the reducing phase.

I think, but maybe I'm wrong, that the correct way to do that is that
combineByKey call setKeyOrdering function on the ShuflleRDD that it returns.

Am I wrong? Can be done by a combination of other transformations with
the same efficiency?

Thanks,
Marco

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Submitting to a cluster behind a VPN, configuring different IP address

2015-04-27 Thread TimMalt
Hi, and what can I do when I am on Windows? 
It does not allow me to set the hostname to some IP

Thanks,
Tim



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-to-a-cluster-behind-a-VPN-configuring-different-IP-address-tp9360p22674.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: hive-thriftserver maven artifact

2015-04-27 Thread Ted Yu
This is available for 1.3.1:
http://mvnrepository.com/artifact/org.apache.spark/spark-hive-thriftserver_2.10

FYI

On Mon, Feb 16, 2015 at 7:24 AM, Marco marco@gmail.com wrote:

 Ok, so will it be only available for the next version (1.30)?

 2015-02-16 15:24 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 I searched for 'spark-hive-thriftserver_2.10' on this page:
 http://mvnrepository.com/artifact/org.apache.spark

 Looks like it is not published.

 On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote:

 Hi,

 I am referring to https://issues.apache.org/jira/browse/SPARK-4925
 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the
 artifact in a public repository ? I have not found it @Maven Central.

 Thanks,
 Marco





 --
 Viele Grüße,
 Marco



Re: Group by order by

2015-04-27 Thread Richard Marscher
It's not related to Spark, but the concept of what you are trying to do
with the data. Grouping by ID means consolidating data for each ID down to
1 row per ID. You can sort by time after this point yes, but you would need
to either take each ID and time value pair OR do some aggregate operation
on the time. That's what the error message is explaining. Maybe you can
describe what you want your results to look like?

Here is some detail about the underlying operations here:

Example Data:
ID |  Time |  SomeVal

102-02-154
1   02-03-15 5
2   02-02-15 4
2   02-02-15 5
2   02-05-15 2

A.

So if you do Group By ID this means 1 row per ID like below:

ID

1
2

To include Time in this projection you need to aggregate it with a function
to a single value. Then and only then can you use it in the projection and
sort on it.

SELECT id, max(time) FROM sample GROUP BY id SORT BY max(time) desc;

ID  | max(time)
2 02-05-15
1 02-03-15

B.

Or if you do Group by ID, time then you get 1 row per ID and time pair:

ID | Time
102-02-15
102-03-15
202-02-15
202-05-15

Notice both rows with ID `2` and time `02-02-15` group down to 1 row in the
results here. In this case you can sort the results by time without using
an aggregate function.

SELECT id, time FROM sample GROUP BY id, time SORT BY time desc;

ID | Time
202-05-15
102-03-15
102-02-15
202-02-15


On Mon, Apr 27, 2015 at 3:28 PM, Ulanov, Alexander alexander.ula...@hp.com
wrote:

  Hi Richard,



 There are several values of time per id. Is there a way to perform group
 by id and sort by time in Spark?



 Best regards, Alexander



 *From:* Richard Marscher [mailto:rmarsc...@localytics.com]
 *Sent:* Monday, April 27, 2015 12:20 PM
 *To:* Ulanov, Alexander
 *Cc:* user@spark.apache.org
 *Subject:* Re: Group by order by



 Hi,



 that error seems to indicate the basic query is not properly expressed. If
 you group by just ID, then that means it would need to aggregate all the
 time values into one value per ID, so you can't sort by it. Thus it tries
 to suggest an aggregate function for time so you can have 1 value per ID
 and properly sort it.



 On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander 
 alexander.ula...@hp.com wrote:

  Hi,



 Could you suggest what is the best way to do “group by x order by y” in
 Spark?



 When I try to perform it with Spark SQL I get the following error (Spark
 1.3):



 val results = sqlContext.sql(select * from sample group by id order by
 time)

 org.apache.spark.sql.AnalysisException: expression 'time' is neither
 present in the group by, nor is it an aggregate function. Add to group by
 or wrap in first() if you don't care which value you get.;

 at
 org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37)



 Is there a way to do it with just RDD?



 Best regards, Alexander





bug: numClasses is not a valid argument of LogisticRegressionWithSGD

2015-04-27 Thread Pagliari, Roberto
With the Python APIs, the available arguments I got (using inspect module) are 
the following:

['cls', 'data', 'iterations', 'step', 'miniBatchFraction', 'initialWeights', 
'regParam', 'regType', 'intercept']

numClasses is not available. Can someone comment on this?

Thanks,







Automatic Cache in SparkSQL

2015-04-27 Thread Wenlei Xie
Hi,

I am trying to answer a simple query with SparkSQL over the Parquet file.
When execute the query several times, the first run will take about 2s
while the later run will take 0.1s.

By looking at the log file it seems the later runs doesn't load the data
from disk. However, I didn't enable any cache explicitly. Is there any
automatic cache used by SparkSQL? Is there anyway to check this?

Thank you?

Best,
Wenlei


Fwd: Change ivy cache for spark on Windows

2015-04-27 Thread Burak Yavuz
+user

-- Forwarded message --
From: Burak Yavuz brk...@gmail.com
Date: Mon, Apr 27, 2015 at 1:59 PM
Subject: Re: Change ivy cache for spark on Windows
To: mj jone...@gmail.com


Hi,

In your conf file (SPARK_HOME\conf\spark-defaults.conf) you can set:

`spark.jars.ivy \your\path`


Best,
Burak

On Mon, Apr 27, 2015 at 1:49 PM, mj jone...@gmail.com wrote:

 Hi,

 I'm having trouble using the --packages option for spark-shell.cmd - I have
 to use Windows at work and have been issued a username with a space in it
 that means when I use the --packages option it fails with this message:

 Exception in thread main java.net.URISyntaxException: Illegal character
 in path at index 13: C:/Users/My Name/.ivy2/jars/spark-csv_2.10.jar

 The command I'm trying to run is:
 .\spark-shell.cmd --packages com.databricks:spark-csv_2.10:1.0.3

 I've tried creating an ivysettings.xml file with the content below in my
 .ivy2 directory, but spark doesn't seem to pick it up. Does anyone have any
 ideas of how to get around this issue?

 ivysettings
 caches defaultCacheDir=c:\ivy_cache/
 /ivysettings




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Change-ivy-cache-for-spark-on-Windows-tp22675.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Super slow caching in 1.3?

2015-04-27 Thread Christian Perez
Michael,

There is only one schema: both versions have 200 string columns in one file.

On Mon, Apr 20, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote:
 Now this is very important:



 “Normal RDDs” refers to “batch RDDs”. However the default in-memory
 Serialization of RDDs which are part of DSTream is “Srialized” rather than
 actual (hydrated) Objects. The Spark documentation states that
 “Serialization” is required for space and garbage collection efficiency (but
 creates higher CPU load) – which makes sense consider the large number of
 RDDs which get discarded in a streaming app



 So what does Data Bricks actually recommend as Object Oriented model for RDD
 elements used in Spark Streaming apps – flat or not and can you provide a
 detailed description / spec of both



 From: Michael Armbrust [mailto:mich...@databricks.com]
 Sent: Thursday, April 16, 2015 7:23 PM
 To: Evo Eftimov
 Cc: Christian Perez; user


 Subject: Re: Super slow caching in 1.3?



 Here are the types that we specialize, other types will be much slower.
 This is only for Spark SQL, normal RDDs do not serialize data that is
 cached.  I'll also not that until yesterday we were missing FloatType

 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala#L154



 Christian, can you provide the schema of the fast and slow datasets?



 On Thu, Apr 16, 2015 at 10:14 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 Michael what exactly do you mean by flattened version/structure here e.g.:

 1. An Object with only primitive data types as attributes
 2. An Object with  no more than one level of other Objects as attributes
 3. An Array/List of primitive types
 4. An Array/List of Objects

 This question is in general about RDDs not necessarily RDDs in the context
 of SparkSQL

 When answering can you also score how bad the performance of each of the
 above options is


 -Original Message-
 From: Christian Perez [mailto:christ...@svds.com]
 Sent: Thursday, April 16, 2015 6:09 PM
 To: Michael Armbrust
 Cc: user
 Subject: Re: Super slow caching in 1.3?

 Hi Michael,

 Good question! We checked 1.2 and found that it is also slow cacheing the
 same flat parquet file. Caching other file formats of the same data were
 faster by up to a factor of ~2. Note that the parquet file was created in
 Impala but the other formats were written by Spark SQL.

 Cheers,

 Christian

 On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com
 wrote:
 Do you think you are seeing a regression from 1.2?  Also, are you
 caching nested data or flat rows?  The in-memory caching is not really
 designed for nested data and so performs pretty slowly here (its just
 falling back to kryo and even then there are some locking issues).

 If so, would it be possible to try caching a flattened version?

 CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable

 On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com
 wrote:

 Hi all,

 Has anyone else noticed very slow time to cache a Parquet file? It
 takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
 on M2 EC2 instances. Or are my expectations way off...

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org





 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org





-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Change ivy cache for spark on Windows

2015-04-27 Thread mj
Hi,

I'm having trouble using the --packages option for spark-shell.cmd - I have
to use Windows at work and have been issued a username with a space in it
that means when I use the --packages option it fails with this message:

Exception in thread main java.net.URISyntaxException: Illegal character
in path at index 13: C:/Users/My Name/.ivy2/jars/spark-csv_2.10.jar

The command I'm trying to run is:
.\spark-shell.cmd --packages com.databricks:spark-csv_2.10:1.0.3

I've tried creating an ivysettings.xml file with the content below in my
.ivy2 directory, but spark doesn't seem to pick it up. Does anyone have any
ideas of how to get around this issue?

ivysettings
caches defaultCacheDir=c:\ivy_cache/
/ivysettings




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Change-ivy-cache-for-spark-on-Windows-tp22675.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SQL][1.3.1][JAVA]Use UDF in DataFrame join

2015-04-27 Thread Shuai Zheng
Hi All,

 

I want to ask how to use UDF when I use join function on DataFrame. It looks
like always give me the cannot solve the column name error. 

Anyone can give me an example on how to run this in java?

 

My code is like:

 

edmData.join(yb_lookup,
edmData.col(year(YEARBUILT)).equalTo(yb_lookup.col(yb_class_vdm)));

 

But it won't work in java. I understand col function should only take
columname, but there should be a way to specific some simple expression in
join statement?

 

Regards,

 

Shuai



Re: How to distribute Spark computation recipes

2015-04-27 Thread Reynold Xin
The code themselves are the recipies, no?


On Mon, Apr 27, 2015 at 2:49 AM, Olivier Girardot 
o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 I know that any RDD is related to its SparkContext and the associated
 variables (broadcast, accumulators), but I'm looking for a way to
 serialize/deserialize full RDD computations ?

 @rxin Spark SQL is, in a way, already doing this but the parsers are
 private[sql], is there any way to reuse this work to get Logical/Physical
 Plans in  out of Spark ?

 Regards,

 Olivier.



[SQL][1.3.1][JAVA] UDF in java cause Task not serializable

2015-04-27 Thread Shuai Zheng
Hi All,

 

Basically I try to define a simple UDF and use it in the query, but it gives
me Task not serializable

 

   public void test() {

  RiskGroupModelDefinition model =
registeredRiskGroupMap.get(this.modelId);

  RiskGroupModelDefinition edm = this.createEdm();

  JavaSparkContext ctx = this.createSparkContext();

  SQLContext sql = new SQLContext(ctx);

  sql.udf().register(year, new UDF1Date, Integer() {

 @Override

 public Integer call(Date d) throws Exception {

   return d.getYear();

 }

  }, new org.apache.spark.sql.types.IntegerType());

  /** Retrieve all tables for EDM */

  DataFrame property =
sql.parquetFile(edm.toS3nPath(property)).filter(ISVALID = 1);

  property.registerTempTable(p);

 

  DataFrame yb_lookup =
sql.parquetFile(model.toS3nPath(yb_lookup)).as(yb);

  yb_lookup.registerTempTable(yb);

  sql.sql(select * from p left join yb on
year(p.YEARBUILT)=yb.yb_class_vdm).count();

  ctx.stop();

   }

 

If I remove the UDF, just use p.YEARBUILT=yb.yb_class_vdm, the sql runs
without any problem. But after I add the UDF to the query (just as above
code), the exception as below:

 

 

Exception in thread main
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:

Aggregate false, [], [Coalesce(SUM(PartialCount#43L),0) AS count#41L]

Exchange SinglePartition

  Aggregate true, [], [COUNT(1) AS PartialCount#43L]

   Project []

HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None

 Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)

  Project [YEARBUILT#14]

   Filter ISVALID#18

PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573

 Exchange (HashPartitioning [yb_class_vdm#40L], 200)

  PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573

 

   at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

   at
org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124)

   at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)

   at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)

   at org.apache.spark.sql.DataFrame.count(DataFrame.scala:899)

   at
com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.test(Vulna
bilityEncodeExecutor.java:137)

   at
com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.main(Vulna
bilityEncodeExecutor.java:488)

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:

Exchange SinglePartition

Aggregate true, [], [COUNT(1) AS PartialCount#43L]

  Project []

   HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None

Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)

 Project [YEARBUILT#14]

  Filter ISVALID#18

   PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573

Exchange (HashPartitioning [yb_class_vdm#40L], 200)

 PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573

 

   at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

   at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:48)

   at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:126)

   at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:125)

   at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)

   ... 6 more

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:

Aggregate true, [], [COUNT(1) AS PartialCount#43L]

Project []

  HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None

   Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)

Project [YEARBUILT#14]

 Filter ISVALID#18

  PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573

   Exchange (HashPartitioning [yb_class_vdm#40L], 200)

PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573

 

   at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)

   at
org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124)

   at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:101)

   at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:49)

   at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)

   ... 10 more

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:

Exchange 

RE: Group by order by

2015-04-27 Thread Ulanov, Alexander
Thanks, it should be
“select id, time, min(x1), min(x2), … from data group by id, time order by time”

(“min” or other aggregate function to pick other fields)

Forgot to mention that (id, time) is my primary key and I took for granted that 
it worked in my MySQL example.

Best regards, Alexander


From: Richard Marscher [mailto:rmarsc...@localytics.com]
Sent: Monday, April 27, 2015 12:47 PM
To: Ulanov, Alexander
Cc: user@spark.apache.org
Subject: Re: Group by order by

It's not related to Spark, but the concept of what you are trying to do with 
the data. Grouping by ID means consolidating data for each ID down to 1 row per 
ID. You can sort by time after this point yes, but you would need to either 
take each ID and time value pair OR do some aggregate operation on the time. 
That's what the error message is explaining. Maybe you can describe what you 
want your results to look like?

Here is some detail about the underlying operations here:

Example Data:
ID |  Time |  SomeVal

102-02-154
1   02-03-15 5
2   02-02-15 4
2   02-02-15 5
2   02-05-15 2

A.

So if you do Group By ID this means 1 row per ID like below:

ID

1
2

To include Time in this projection you need to aggregate it with a function to 
a single value. Then and only then can you use it in the projection and sort on 
it.

SELECT id, max(time) FROM sample GROUP BY id SORT BY max(time) desc;

ID  | max(time)
2 02-05-15
1 02-03-15

B.

Or if you do Group by ID, time then you get 1 row per ID and time pair:

ID | Time
102-02-15
102-03-15
202-02-15
202-05-15

Notice both rows with ID `2` and time `02-02-15` group down to 1 row in the 
results here. In this case you can sort the results by time without using an 
aggregate function.

SELECT id, time FROM sample GROUP BY id, time SORT BY time desc;

ID | Time
202-05-15
102-03-15
102-02-15
202-02-15

On Mon, Apr 27, 2015 at 3:28 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
Hi Richard,

There are several values of time per id. Is there a way to perform group by id 
and sort by time in Spark?

Best regards, Alexander

From: Richard Marscher 
[mailto:rmarsc...@localytics.commailto:rmarsc...@localytics.com]
Sent: Monday, April 27, 2015 12:20 PM
To: Ulanov, Alexander
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Group by order by

Hi,

that error seems to indicate the basic query is not properly expressed. If you 
group by just ID, then that means it would need to aggregate all the time 
values into one value per ID, so you can't sort by it. Thus it tries to suggest 
an aggregate function for time so you can have 1 value per ID and properly sort 
it.

On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
Hi,

Could you suggest what is the best way to do “group by x order by y” in Spark?

When I try to perform it with Spark SQL I get the following error (Spark 1.3):

val results = sqlContext.sql(select * from sample group by id order by time)
org.apache.spark.sql.AnalysisException: expression 'time' is neither present in 
the group by, nor is it an aggregate function. Add to group by or wrap in 
first() if you don't care which value you get.;
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37)

Is there a way to do it with just RDD?

Best regards, Alexander




Re: Spark timeout issue

2015-04-27 Thread Akhil Das
You need to look more deep into your worker logs, you may find GC error, IO
exceptions etc if you look closely which is triggering the timeout.

Thanks
Best Regards

On Mon, Apr 27, 2015 at 3:18 AM, Deepak Gopalakrishnan dgk...@gmail.com
wrote:

 Hello Patrick,

 Sure. I've posted this on user as well. Will be cool to get a response.

 Thanks
 Deepak

 On Mon, Apr 27, 2015 at 2:58 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 Hi Deepak - please direct this to the user@ list. This list is for
 development of Spark itself.

 On Sun, Apr 26, 2015 at 12:42 PM, Deepak Gopalakrishnan
 dgk...@gmail.com wrote:
  Hello All,
 
  I'm trying to process a 3.5GB file on standalone mode using spark. I
 could
  run my spark job succesfully on a 100MB file and it works as expected.
 But,
  when I try to run it on the 3.5GB file, I run into the below error :
 
 
  15/04/26 12:45:50 INFO BlockManagerMaster: Updated info of block
 taskresult_83
  15/04/26 12:46:46 WARN AkkaUtils: Error sending message [message =
  Heartbeat(2,[Lscala.Tuple2;@790223d3,BlockManagerId(2,
  master.spark.com, 39143))] in 1 attempts
  java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]
  at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
  at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
  at
 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
  at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.result(package.scala:107)
  at
 org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
  at
 org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
  15/04/26 12:47:15 INFO MemoryStore: ensureFreeSpace(26227673) called
  with curMem=265897, maxMem=5556991426
  15/04/26 12:47:15 INFO MemoryStore: Block taskresult_92 stored as
  bytes in memory (estimated size 25.0 MB, free 5.2 GB)
  15/04/26 12:47:16 INFO MemoryStore: ensureFreeSpace(26272879) called
  with curMem=26493570, maxMem=5556991426
  15/04/26 12:47:16 INFO MemoryStore: Block taskresult_94 stored as
  bytes in memory (estimated size 25.1 MB, free 5.1 GB)
  15/04/26 12:47:18 INFO MemoryStore: ensureFreeSpace(26285327) called
  with curMem=52766449, maxMem=5556991426
 
 
  and the job fails.
 
 
  I'm on AWS and have opened all ports. Also, since the 100MB file works,
 it
  should not be a connection issue.  I've a r3 xlarge and 2 m3 large.
 
  Can anyone suggest a way to fix this?




 --
 Regards,
 *Deepak Gopalakrishnan*
 *Mobile*:+918891509774
 *Skype* : deepakgk87
 http://myexps.blogspot.com




Re: How to debug Spark on Yarn?

2015-04-27 Thread ๏̯͡๏
Spark 1.3

1. View stderr/stdout from executor from Web UI: when the job is running i
figured out the executor that am suppose to see, and those two links show 4
special characters on browser.

2. Tail on Yarn logs:

/apache/hadoop/bin/yarn logs -applicationId
application_1429087638744_151059 | less
Threw me: Application has not completed. Logs are only available after an
application completes


Any other ideas that i can try ?



On Sat, Apr 25, 2015 at 12:07 AM, Sven Krasser kras...@gmail.com wrote:

 On Fri, Apr 24, 2015 at 11:31 AM, Marcelo Vanzin van...@cloudera.com
 wrote:


 Spark 1.3 should have links to the executor logs in the UI while the
 application is running. Not yet in the history server, though.


 You're absolutely correct -- didn't notice it until now. This is a great
 addition!

 --
 www.skrasser.com http://www.skrasser.com/?utm_source=sig




-- 
Deepak


Re: Understand the running time of SparkSQL queries

2015-04-27 Thread Akhil Das
Isn't it already available on the driver UI (that runs on 4040)?

Thanks
Best Regards

On Mon, Apr 27, 2015 at 9:55 AM, Wenlei Xie wenlei@gmail.com wrote:

 Hi,

 I am wondering how should we understand the running time of SparkSQL
 queries? For example the physical query plan and the running time on each
 stage? Is there any guide talking about this?

 Thank you!

 Best,
 Wenlei




Re: Question on Spark SQL performance of Range Queries on Large Datasets

2015-04-27 Thread ayan guha
The answer is it depends :)

The fact that query runtime increases indicates more shuffle. You may want
to construct rdds based on keys you use.

You may want to specify what kind of node you are using and how many
executors you are using. You may also want to play around with executor
memory allocation s.

Best
Ayan
On 27 Apr 2015 17:59, Mani man...@vt.edu wrote:

 Hi,

 I am a graduate student from Virginia Tech (USA) pursuing my Masters in
 Computer Science. I’ve been researching on parallel and distributed
 databases and their performance for running some Range queries involving
 simple joins and group by on large datasets. As part of my research, I
 tried evaluating query performance of Spark SQL on the data set that I
 have. It would be really great if you could please confirm on the numbers
 that I get from Spark SQL? Following is the type of query that am running,

 Table 1 - 22,000,483 records
 Table 2 - 10,173,311 records

 Query : SELECT b.x, count(b.y) FROM Table1 a, Table2 b WHERE a.y=b.y AND
 a.z=‘' GROUP BY b.x ORDER BY b.x

 Total Running Time
 4 Worker Nodes:177.68s
 8 Worker Nodes: 186.72s

 I am using Apache Spark 1.3.0 with the default configuration. Is the query
 running time reasonable? Is it because of non-availability of indexes
 increasing the query run time? Can you please clarify?

 Thanks
 Mani
 Graduate Student, Department of Computer Science
 Virginia Tech









A problem of using spark streaming to capture network packets

2015-04-27 Thread Hai Shan Wu

Hi Everyone

We use pcap4j to capture network packets and then use spark streaming to
analyze captured packets. However, we met a strange problem.

If we run our application on spark locally (for example, spark-submit
--master local[2]), then the program runs successfully.

If we run our application on spark standalone cluster, then the program
will tell us that NO NIFs found.

I also attach two test files for clarification.

So anyone can help on this? Thanks in advance!


(See attached file: PcapReceiver.java)(See attached file:
TestPcapSpark.java)

Best regards,

- Haishan

Haishan Wu (吴海珊)

IBM Research - China
Tel: 86-10-58748508
Fax: 86-10-58748330
Email: wuh...@cn.ibm.com
Lotus Notes: Hai Shan Wu/China/IBM

PcapReceiver.java
Description: Binary data


TestPcapSpark.java
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: How to debug Spark on Yarn?

2015-04-27 Thread ๏̯͡๏
1) Application container logs from Web RM UI never load on browser. I
eventually have to kill the browser.
2)  /apache/hadoop/bin/yarn logs -applicationId
application_1429087638744_151059
| less emits logs only after the application has completed.

Are there no better ways to see the logs as they are emitted. Something
similar to hadoop world ?


On Mon, Apr 27, 2015 at 1:58 PM, Zoltán Zvara zoltan.zv...@gmail.com
wrote:

 You can check container logs from RM web UI or when log-aggregation is
 enabled with the yarn command. There are other, but less convenient
 options.

 On Mon, Apr 27, 2015 at 8:53 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Spark 1.3

 1. View stderr/stdout from executor from Web UI: when the job is running
 i figured out the executor that am suppose to see, and those two links show
 4 special characters on browser.

 2. Tail on Yarn logs:

 /apache/hadoop/bin/yarn logs -applicationId
 application_1429087638744_151059 | less
 Threw me: Application has not completed. Logs are only available after an
 application completes


 Any other ideas that i can try ?



 On Sat, Apr 25, 2015 at 12:07 AM, Sven Krasser kras...@gmail.com wrote:

 On Fri, Apr 24, 2015 at 11:31 AM, Marcelo Vanzin van...@cloudera.com
 wrote:


 Spark 1.3 should have links to the executor logs in the UI while the
 application is running. Not yet in the history server, though.


 You're absolutely correct -- didn't notice it until now. This is a great
 addition!

 --
 www.skrasser.com http://www.skrasser.com/?utm_source=sig




 --
 Deepak




-- 
Deepak


Question on Spark SQL performance of Range Queries on Large Datasets

2015-04-27 Thread Mani
Hi,

I am a graduate student from Virginia Tech (USA) pursuing my Masters in 
Computer Science. I’ve been researching on parallel and distributed databases 
and their performance for running some Range queries involving simple joins and 
group by on large datasets. As part of my research, I tried evaluating query 
performance of Spark SQL on the data set that I have. It would be really great 
if you could please confirm on the numbers that I get from Spark SQL? Following 
is the type of query that am running,

Table 1 - 22,000,483 records
Table 2 - 10,173,311 records

Query : SELECT b.x, count(b.y) FROM Table1 a, Table2 b WHERE a.y=b.y AND 
a.z=‘' GROUP BY b.x ORDER BY b.x

Total Running Time
4 Worker Nodes:177.68s
8 Worker Nodes: 186.72s

I am using Apache Spark 1.3.0 with the default configuration. Is the query 
running time reasonable? Is it because of non-availability of indexes 
increasing the query run time? Can you please clarify?

Thanks
Mani
Graduate Student, Department of Computer Science
Virginia Tech








Re: How to debug Spark on Yarn?

2015-04-27 Thread Zoltán Zvara
You can check container logs from RM web UI or when log-aggregation is
enabled with the yarn command. There are other, but less convenient options.

On Mon, Apr 27, 2015 at 8:53 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Spark 1.3

 1. View stderr/stdout from executor from Web UI: when the job is running i
 figured out the executor that am suppose to see, and those two links show 4
 special characters on browser.

 2. Tail on Yarn logs:

 /apache/hadoop/bin/yarn logs -applicationId
 application_1429087638744_151059 | less
 Threw me: Application has not completed. Logs are only available after an
 application completes


 Any other ideas that i can try ?



 On Sat, Apr 25, 2015 at 12:07 AM, Sven Krasser kras...@gmail.com wrote:

 On Fri, Apr 24, 2015 at 11:31 AM, Marcelo Vanzin van...@cloudera.com
 wrote:


 Spark 1.3 should have links to the executor logs in the UI while the
 application is running. Not yet in the history server, though.


 You're absolutely correct -- didn't notice it until now. This is a great
 addition!

 --
 www.skrasser.com http://www.skrasser.com/?utm_source=sig




 --
 Deepak




Re: ReduceByKey and sorting within partitions

2015-04-27 Thread Saisai Shao
Hi Marco,

As I know, current combineByKey() does not expose the related argument
where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is
package private, if you can get the ShuffledRDD through reflection or other
way, the keyOrdering you set will be pushed down to shuffle. If you use a
combination of transformations to do it, the result will be same but the
efficiency may be different, some transformations will separate into
different stages, which will introduce additional shuffle.

Thanks
Jerry


2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com:

 Hi,

 I'm trying, after reducing by key, to get data ordered among partitions
 (like RangePartitioner) and within partitions (like sortByKey or
 repartitionAndSortWithinPartition) pushing the sorting down to the
 shuffles machinery of the reducing phase.

 I think, but maybe I'm wrong, that the correct way to do that is that
 combineByKey call setKeyOrdering function on the ShuflleRDD that it
 returns.

 Am I wrong? Can be done by a combination of other transformations with
 the same efficiency?

 Thanks,
 Marco

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark-defaults.conf

2015-04-27 Thread James King
Thanks.

I've set SPARK_HOME and SPARK_CONF_DIR appropriately in .bash_profile

But when I start worker like this

spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh

I still get

failed to launch org.apache.spark.deploy.worker.Worker:
 Default is conf/spark-defaults.conf.
  15/04/27 11:51:33 DEBUG Utils: Shutdown hook called





On Mon, Apr 27, 2015 at 1:15 PM, Zoltán Zvara zoltan.zv...@gmail.com
wrote:

 You should distribute your configuration file to workers and set the
 appropriate environment variables, like HADOOP_HOME, SPARK_HOME,
 HADOOP_CONF_DIR, SPARK_CONF_DIR.

 On Mon, Apr 27, 2015 at 12:56 PM James King jakwebin...@gmail.com wrote:

 I renamed spark-defaults.conf.template to spark-defaults.conf
 and invoked

 spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh

 But I still get

 failed to launch org.apache.spark.deploy.worker.Worker:
 --properties-file FILE   Path to a custom Spark properties file.
  Default is conf/spark-defaults.conf.

 But I'm thinking it should pick up the default spark-defaults.conf from
 conf dir

 Am I expecting or doing something wrong?

 Regards
 jk





Bigints in pyspark

2015-04-27 Thread jamborta
hi all,

I have just come across a problem where I have a table that has a few bigint
columns, it seems if I read that table into a dataframe then collect it in
pyspark, the bigints are stored and integers in python. 

(The problem is if I write it back to another table, I detect the hive type
programmatically from the python type, so it turns those columns to
integers)

Is that intended this way or a bug?

thanks,




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bigints-in-pyspark-tp22668.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL UDF returning object of case class; regression from 1.2.0

2015-04-27 Thread Ophir Cohen
A short update: eventually we manually upgraded to 1.3.1 and the problem
fixed.
On Apr 26, 2015 2:26 PM, Ophir Cohen oph...@gmail.com wrote:

 I happened to hit the following issue that prevents me from using UDFs
 with case classes: https://issues.apache.org/jira/browse/SPARK-6054.

 The issue already fixed for 1.3.1 but we are working on Amazon and it
 looks that Amazon provide deployment of Spark 1.3.1 using their scripts.

 Did someone encounter the issue? Any suggestion will be happily taken,
 either for a workaround or for a way to deploy Spark 1.3.1 on EMR
 Thanks,
 Ophir



Exception in using updateStateByKey

2015-04-27 Thread Sea
Hi, all:
I use function updateStateByKey in Spark Streaming, I need to store the states 
for one minite,  I set spark.cleaner.ttl to 120, the duration is 2 seconds, 
but it throws Exception 




Caused by: 
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does 
not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)


at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)



Why?


my code is 


ssc = StreamingContext(sc,2)
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1})
kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\
.filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \
.filter(lambda x: x[1]['isExisted'] != 1) \
.foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))

Re: Exception in using updateStateByKey

2015-04-27 Thread Ted Yu
Which hadoop release are you using ?

Can you check hdfs audit log to see who / when deleted spark/ck/hdfsaudit/
receivedData/0/log-1430139541443-1430139601443 ?

Cheers

On Mon, Apr 27, 2015 at 6:21 AM, Sea 261810...@qq.com wrote:

 Hi, all:
 I use function updateStateByKey in Spark Streaming, I need to store the
 states for one minite,  I set spark.cleaner.ttl to 120, the duration is 2
 seconds, but it throws Exception


 Caused by:
 org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File
 does not exist:
 spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443
 at
 org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
 at
 org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51)
 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499)
 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448)
 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428)
 at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402)
 at
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468)
 at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269)
 at
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566)
 at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
 at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
 at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:396)
 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

 at org.apache.hadoop.ipc.Client.call(Client.java:1347)
 at org.apache.hadoop.ipc.Client.call(Client.java:1300)
 at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
 at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
 at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188)
 at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)

 Why?

 my code is

 ssc = StreamingContext(sc,2)
 kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1})
 kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\
 .filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \
 .filter(lambda x: x[1]['isExisted'] != 1) \
 .foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))





Re: directory loader in windows

2015-04-27 Thread Steve Loughran

This a hadoop-side stack trace

it looks like the code is trying to get the filesystem permissions by running

%HADOOP_HOME%\bin\WINUTILS.EXE  ls -F


and something is triggering a null pointer exception.

There isn't any HADOOP- JIRA with this specific stack trace in it, so it's not 
a known/fixed problem.

At a guess, your environment HADOOP_HOME environment variable isn't point to 
the right place. If that's the case there should have been a warning in the logs




Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.NullPointerException

at java.lang.ProcessBuilder.start(Unknown Source)

at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)

at org.apache.hadoop.util.Shell.run(Shell.java:455)

at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)

at org.apache.hadoop.util.Shell.execCommand(Shell.java:808)

at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)

at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)

at 
org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:582)

at 
org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:557)

at org.apache.hadoop.fs.LocatedFileStatus.init(LocatedFileStatus.java:42)

at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1699)

at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1681)

at 
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:268)

at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)

at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)

at org.apache.spark.rdd.RDD.collect(RDD.scala:813)

at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:374)

at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

at java.lang.reflect.Method.invoke(Unknown Source)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:207)

at java.lang.Thread.run(Unknown Source)

--
Best Regards,
Ayan Guha







?????? Exception in using updateStateByKey

2015-04-27 Thread Sea
I make it to 240, it happens again when 240 seconds is reached.




--  --
??: 261810726;261810...@qq.com;
: 2015??4??27??(??) 10:24
??: Ted Yuyuzhih...@gmail.com; 

: ?? Exception in using updateStateByKey



Yes??I can make it larger, but I also want to know whether there is a formula 
to estimate it




--  --
??: Ted Yu;yuzhih...@gmail.com;
: 2015??4??27??(??) 10:20
??: Sea261810...@qq.com; 

: Re: Exception in using updateStateByKey



Can you make the value for spark.cleaner.ttl larger ?Cheers


On Mon, Apr 27, 2015 at 7:13 AM, Sea 261810...@qq.com wrote:
my hadoop version is 2.2.0?? the hdfs-audit.log is too large?? The problem is 
that?? when  the checkpoint info is deleted(it depends on  
??spark.cleaner.ttl??)??it will throw this exception??
 



-  --
??: Ted Yu;yuzhih...@gmail.com;
: 2015??4??27??(??) 9:55
??: Sea261810...@qq.com; 
: useruser@spark.apache.org; 
: Re: Exception in using updateStateByKey



Which hadoop release are you using ?

Can you check hdfs audit log to see who / when deleted 
spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 ?


Cheers


On Mon, Apr 27, 2015 at 6:21 AM, Sea 261810...@qq.com wrote:
Hi, all:
I use function updateStateByKey in Spark Streaming, I need to store the states 
for one minite,  I set spark.cleaner.ttl to 120, the duration is 2 seconds, 
but it throws Exception 




Caused by: 
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does 
not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)


at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)



Why?


my code is 


ssc = StreamingContext(sc,2)
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1})
kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\
.filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \
.filter(lambda x: x[1]['isExisted'] != 1) \
.foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))

RE: Slower performance when bigger memory?

2015-04-27 Thread Shuai Zheng
Thanks. So may I know what is your configuration for more/smaller executors on 
r3.8xlarge, how big of the memory that you eventually decide to give one 
executor without impact performance (for example: 64g? ).

 

From: Sven Krasser [mailto:kras...@gmail.com] 
Sent: Friday, April 24, 2015 1:59 PM
To: Dean Wampler
Cc: Shuai Zheng; user@spark.apache.org
Subject: Re: Slower performance when bigger memory?

 

FWIW, I ran into a similar issue on r3.8xlarge nodes and opted for more/smaller 
executors. Another observation was that one large executor results in less 
overall read throughput from S3 (using Amazon's EMRFS implementation) in case 
that matters to your application.

-Sven

 

On Thu, Apr 23, 2015 at 10:18 AM, Dean Wampler deanwamp...@gmail.com wrote:

JVM's often have significant GC overhead with heaps bigger than 64GB. You might 
try your experiments with configurations below this threshold.

 

dean




Dean Wampler, Ph.D.

Author: Programming Scala, 2nd Edition 
http://shop.oreilly.com/product/0636920033073.do  (O'Reilly)

Typesafe http://typesafe.com 
@deanwampler http://twitter.com/deanwampler 

http://polyglotprogramming.com

 

On Thu, Apr 23, 2015 at 12:14 PM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I am running some benchmark on r3*8xlarge instance. I have a cluster with one 
master (no executor on it) and one slave (r3*8xlarge).

 

My job has 1000 tasks in stage 0.

 

R3*8xlarge has 244G memory and 32 cores.

 

If I create 4 executors, each has 8 core+50G memory, each task will take around 
320s-380s. And if I only use one big executor with 32 cores and 200G memory, 
each task will take 760s-900s.

 

And I check the log, looks like the minor GC takes much longer when using 200G 
memory:

 

285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)] 
38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95 
sys=120.65, real=11.25 secs] 

 

And when it uses 50G memory, the minor GC takes only less than 1s.

 

I try to see what is the best way to configure the Spark. For some special 
reason, I tempt to use a bigger memory on single executor if no significant 
penalty on performance. But now looks like it is?

 

Anyone has any idea?

 

Regards,

 

Shuai

 




-- 

www.skrasser.com http://www.skrasser.com/?utm_source=sig 



Re: Super slow caching in 1.3?

2015-04-27 Thread Wenlei Xie
I face the similar issue in Spark 1.2. Cache the schema RDD takes about 50s
for 400MB data. The schema is similar to the TPC-H LineItem.

Here is the code I tried the cache. I am wondering if there is any setting
missing?

Thank you so much!

lineitemSchemaRDD.registerTempTable(lineitem);
sqlContext.sqlContext().cacheTable(lineitem);
System.out.println(lineitemSchemaRDD.count());


On Mon, Apr 6, 2015 at 8:00 PM, Christian Perez christ...@svds.com wrote:

 Hi all,

 Has anyone else noticed very slow time to cache a Parquet file? It
 takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
 on M2 EC2 instances. Or are my expectations way off...

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Wenlei Xie (谢文磊)

Ph.D. Candidate
Department of Computer Science
456 Gates Hall, Cornell University
Ithaca, NY 14853, USA
Email: wenlei@gmail.com


Why Spark is much faster than Hadoop MapReduce even on disk

2015-04-27 Thread bit1...@163.com
Hi,

I am frequently asked why spark is also much faster than Hadoop MapReduce on 
disk (without the use of memory cache). I have no convencing answer for this 
question, could you guys elaborate on this? Thanks!






Re: Why Spark is much faster than Hadoop MapReduce even on disk

2015-04-27 Thread Ilya Ganelin
I believe the typical answer is that Spark is actually a bit slower.
On Mon, Apr 27, 2015 at 7:34 PM bit1...@163.com bit1...@163.com wrote:

 Hi,

 I am frequently asked why spark is also much faster than Hadoop MapReduce
 on disk (without the use of memory cache). I have no convencing answer for
 this question, could you guys elaborate on this? Thanks!

 --




New JIRA - [SQL] Can't remove columns from DataFrame or save DataFrame from a join due to duplicate columns

2015-04-27 Thread Don Drake
https://issues.apache.org/jira/browse/SPARK-7182

Can anyone suggest a workaround for the above issue?

Thanks.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/


RE: Scalability of group by

2015-04-27 Thread Ulanov, Alexander
It works on a smaller dataset of 100 rows. Probably I could find the size when 
it fails using binary search. However, it would not help me because I need to 
work with 2B rows.

From: ayan guha [mailto:guha.a...@gmail.com]
Sent: Monday, April 27, 2015 6:58 PM
To: Ulanov, Alexander
Cc: user@spark.apache.org
Subject: Re: Scalability of group by


Hi

Can you test on a smaller dataset to identify if it is cluster issue or scaling 
issue in spark
On 28 Apr 2015 11:30, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
Hi,

I am running a group by on a dataset of 2B of RDD[Row [id, time, value]] in 
Spark 1.3 as follows:
“select id, time, first(value) from data group by id, time”

My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor is 
allocated with 5GB of memory. However, all executors are being lost during the 
query execution and I get “ExecutorLostFailure”.

Could you suggest what might be the reason for it? Could it be that “group by” 
is implemented as RDD.groupBy so it holds the group by result in memory? What 
is the workaround?

Best regards, Alexander


Re: Automatic Cache in SparkSQL

2015-04-27 Thread ayan guha
Spark keeps job in memory by default for kind of performance gains you are
seeing. Additionally depending on your query spark runs stages and any
point of time spark's code behind the scene may issue explicit cache. If
you hit any such scenario you will find those cached objects in UI under
storage. Note if caching is done by spark it may be transient.
On 28 Apr 2015 08:00, Wenlei Xie wenlei@gmail.com wrote:

 Hi,

 I am trying to answer a simple query with SparkSQL over the Parquet file.
 When execute the query several times, the first run will take about 2s
 while the later run will take 0.1s.

 By looking at the log file it seems the later runs doesn't load the data
 from disk. However, I didn't enable any cache explicitly. Is there any
 automatic cache used by SparkSQL? Is there anyway to check this?

 Thank you?

 Best,
 Wenlei





Re: Re: Why Spark is much faster than Hadoop MapReduce even on disk

2015-04-27 Thread bit1...@163.com
Is it? I learned somewhere else that spark's speed is 5~10 times faster than 
Hadoop MapReduce.



bit1...@163.com
 
From: Ilya Ganelin
Date: 2015-04-28 10:55
To: bit1...@163.com; user
Subject: Re: Why Spark is much faster than Hadoop MapReduce even on disk
I believe the typical answer is that Spark is actually a bit slower. 
On Mon, Apr 27, 2015 at 7:34 PM bit1...@163.com bit1...@163.com wrote:
Hi,

I am frequently asked why spark is also much faster than Hadoop MapReduce on 
disk (without the use of memory cache). I have no convencing answer for this 
question, could you guys elaborate on this? Thanks!






Spark 1.3.1 JavaStreamingContext - fileStream compile error

2015-04-27 Thread lokeshkumar
Hi Forum

I am facing below compile error when using the fileStream method of the
JavaStreamingContext class.
I have copied the code from JavaAPISuite.java test class of spark test code.

Please help me to find a solution for this.

http://apache-spark-user-list.1001560.n3.nabble.com/file/n22679/47.png 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-JavaStreamingContext-fileStream-compile-error-tp22679.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Scalability of group by

2015-04-27 Thread ayan guha
Hi

Can you test on a smaller dataset to identify if it is cluster issue or
scaling issue in spark
On 28 Apr 2015 11:30, Ulanov, Alexander alexander.ula...@hp.com wrote:

  Hi,



 I am running a group by on a dataset of 2B of RDD[Row [id, time, value]]
 in Spark 1.3 as follows:

 “select id, time, first(value) from data group by id, time”



 My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor
 is allocated with 5GB of memory. However, all executors are being lost
 during the query execution and I get “ExecutorLostFailure”.



 Could you suggest what might be the reason for it? Could it be that “group
 by” is implemented as RDD.groupBy so it holds the group by result in
 memory? What is the workaround?



 Best regards, Alexander



java.lang.UnsupportedOperationException: empty collection

2015-04-27 Thread xweb
I am running following code on Spark 1.3.0. It is from
https://spark.apache.org/docs/1.3.0/ml-guide.html
On running val model1 = lr.fit(training.toDF) I get
java.lang.UnsupportedOperationException: empty collection

what could be the reason?



import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.{Row, SQLContext}

val conf = new SparkConf().setAppName(SimpleParamsExample)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

// Prepare training data.
// We use LabeledPoint, which is a case class.  Spark SQL can convert RDDs
of case classes
// into DataFrames, where it uses the case class metadata to infer the
schema.
val training = sc.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5

// Create a LogisticRegression instance.  This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(LogisticRegression parameters:\n + lr.explainParams() + \n)

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)

// Learn a LogisticRegression model.  This uses the parameters stored in lr.
*val model1 = lr.fit(training.toDF)*


*Some more information:*
scala training.toDF
res26: org.apache.spark.sql.DataFrame = [label: double, features: vecto]

scala training.toDF.collect()
res27: Array[org.apache.spark.sql.Row] = Array([1.0,[0.0,1.1,0.1]],
[0.0,[2.0,1.0,-1.0]], [0.0,[2.0,1.3,1.0]], [1.0,[0.0,1.2,-0.5]])




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-UnsupportedOperationException-empty-collection-tp22677.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



1.3.1: Persisting RDD in parquet - Conflicting partition column names

2015-04-27 Thread sranga
Hi

I am getting the following error when persisting an RDD in parquet format to
an S3 location. This is code that was working in the 1.2 version. The
version that it is failing to work is 1.3.1.
Any help is appreciated. 

Caused by: java.lang.AssertionError: assertion failed: Conflicting partition
column names detected:
ArrayBuffer(batch_id)
ArrayBuffer()
at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.sql.parquet.ParquetRelation2$.resolvePartitions(newParquet.scala:933)
at
org.apache.spark.sql.parquet.ParquetRelation2$.parsePartitions(newParquet.scala:851)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$7.apply(newParquet.scala:311)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$7.apply(newParquet.scala:303)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:303)
at
org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:692)
at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129)
at
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
at
org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:995)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/1-3-1-Persisting-RDD-in-parquet-Conflicting-partition-column-names-tp22678.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why Spark is much faster than Hadoop MapReduce even on disk

2015-04-27 Thread Michael Malak
http://www.datascienceassn.org/content/making-sense-making-sense-performance-data-analytics-frameworks
  
  From: bit1...@163.com bit1...@163.com
 To: user user@spark.apache.org 
 Sent: Monday, April 27, 2015 8:33 PM
 Subject: Why Spark is much faster than Hadoop MapReduce even on disk
   
#yiv1713360705 body {line-height:1.5;}#yiv1713360705 body 
{font-size:10.5pt;color:rgb(0, 0, 0);line-height:1.5;}Hi,
I am frequently asked why spark is also much faster than Hadoop MapReduce on 
disk (without the use of memory cache). I have no convencing answer for this 
question, could you guys elaborate on this? Thanks!



  

Re: Spark Cluster Setup

2015-04-27 Thread Denny Lee
Similar to what Dean called out, we build Puppet manifests so we could do
the automation - its a bit of work to setup, but well worth the effort.

On Fri, Apr 24, 2015 at 11:27 AM Dean Wampler deanwamp...@gmail.com wrote:

 It's mostly manual. You could try automating with something like Chef, of
 course, but there's nothing already available in terms of automation.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Fri, Apr 24, 2015 at 10:33 AM, James King jakwebin...@gmail.com
 wrote:

 Thanks Dean,

 Sure I have that setup locally and testing it with ZK.

 But to start my multiple Masters do I need to go to each host and start
 there or is there a better way to do this.

 Regards
 jk

 On Fri, Apr 24, 2015 at 5:23 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 The convention for standalone cluster is to use Zookeeper to manage
 master failover.

 http://spark.apache.org/docs/latest/spark-standalone.html

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Fri, Apr 24, 2015 at 5:01 AM, James King jakwebin...@gmail.com
 wrote:

 I'm trying to find out how to setup a resilient Spark cluster.

 Things I'm thinking about include:

 - How to start multiple masters on different hosts?
 - there isn't a conf/masters file from what I can see


 Thank you.







Driver ID from spark-submit

2015-04-27 Thread Rares Vernica
Hello,

I am trying to use the default Spark cluster manager in a production
environment. I will be submitting jobs with spark-submit. I wonder if the
following is possible:

1. Get the Driver ID from spark-submit. We will use this ID to keep track
of the job and kill it if necessary.

2. Weather it is possible to run spark-submit in a mode where it ends and
returns control to the user immediately after the job is submitted.

Thanks!
Rares


?????? Exception in using updateStateByKey

2015-04-27 Thread Sea
Maybe I found the solution??do not set 'spark.cleaner.ttl', just use function 
'remember' in StreamingContext to set the rememberDuration. 




--  --
??: Ted Yu;yuzhih...@gmail.com;
: 2015??4??27??(??) 10:20
??: Sea261810...@qq.com; 

: Re: Exception in using updateStateByKey



Can you make the value for spark.cleaner.ttl larger ?Cheers


On Mon, Apr 27, 2015 at 7:13 AM, Sea 261810...@qq.com wrote:
my hadoop version is 2.2.0?? the hdfs-audit.log is too large?? The problem is 
that?? when  the checkpoint info is deleted(it depends on  
??spark.cleaner.ttl??)??it will throw this exception??
 



-  --
??: Ted Yu;yuzhih...@gmail.com;
: 2015??4??27??(??) 9:55
??: Sea261810...@qq.com; 
: useruser@spark.apache.org; 
: Re: Exception in using updateStateByKey



Which hadoop release are you using ?

Can you check hdfs audit log to see who / when deleted 
spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 ?


Cheers


On Mon, Apr 27, 2015 at 6:21 AM, Sea 261810...@qq.com wrote:
Hi, all:
I use function updateStateByKey in Spark Streaming, I need to store the states 
for one minite,  I set spark.cleaner.ttl to 120, the duration is 2 seconds, 
but it throws Exception 




Caused by: 
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does 
not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
at 
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)


at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)



Why?


my code is 


ssc = StreamingContext(sc,2)
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1})
kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\
.filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \
.filter(lambda x: x[1]['isExisted'] != 1) \
.foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))

Spark JDBC data source API issue with mysql

2015-04-27 Thread madhu phatak
Hi,
 I have been trying out spark data source api with JDBC. The following is
the code to get DataFrame,

 Try(hc.load(org.apache.spark.sql.jdbc,Map(url - dbUrl,dbtable-s($
query) )))


By looking at test cases, I found that query has to be inside brackets,
otherwise it's treated as table name.  But with when used with MySQL, query
inside the ( ) is treated as derived table which is throwing exception. Is
this the right way to pass the queries to jdbc source or am I missing
something?


Regards,
Madhukara Phatak
http://datamantra.io/


Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?

2015-04-27 Thread ๏̯͡๏
I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple mirrors
and direct link. Each time i untar i get below error

spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar: (Empty
error message)

tar: Error exit delayed from previous errors


Is it broken ?

-- 
Deepak


RE: ReduceByKey and sorting within partitions

2015-04-27 Thread Ganelin, Ilya
Marco - why do you want data sorted both within and across partitions? If you 
need to take an ordered sequence across all your data you need to either 
aggregate your RDD on the driver and sort it, or use zipWithIndex to apply an 
ordered index to your data that matches the order it was stored on HDFS. You 
can then get the data in order by filtering based on that index. Let me know if 
that's not what you need - thanks!



Sent with Good (www.good.com)


-Original Message-
From: Marco [marcope...@gmail.commailto:marcope...@gmail.com]
Sent: Monday, April 27, 2015 07:01 AM Eastern Standard Time
To: user@spark.apache.org
Subject: ReduceByKey and sorting within partitions


Hi,

I'm trying, after reducing by key, to get data ordered among partitions
(like RangePartitioner) and within partitions (like sortByKey or
repartitionAndSortWithinPartition) pushing the sorting down to the
shuffles machinery of the reducing phase.

I think, but maybe I'm wrong, that the correct way to do that is that
combineByKey call setKeyOrdering function on the ShuflleRDD that it returns.

Am I wrong? Can be done by a combination of other transformations with
the same efficiency?

Thanks,
Marco

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


data locality in spark

2015-04-27 Thread Grandl Robert
Hi guys,
I am running some SQL queries, but all my tasks are reported as either 
NODE_LOCAL or PROCESS_LOCAL. 
In case of Hadoop world, the reduce tasks are RACK or NON_RACK LOCAL because 
they have to aggregate data from multiple hosts. However, in Spark even the 
aggregation stages are reported as NODE/PROCESS LOCAL.
Do I miss something, or why the reduce-like tasks are still NODE/PROCESS LOCAL ?
Thanks,Robert



RE: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?

2015-04-27 Thread Ganelin, Ilya
What command are you using to untar? Are you running out of disk space?



Sent with Good (www.good.com)


-Original Message-
From: ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.commailto:deepuj...@gmail.com]
Sent: Monday, April 27, 2015 11:44 AM Eastern Standard Time
To: user
Subject: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?

I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple mirrors and 
direct link. Each time i untar i get below error


spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar: (Empty 
error message)

tar: Error exit delayed from previous errors


Is it broken ?

--
Deepak



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Timeout Error

2015-04-27 Thread Deepak Gopalakrishnan
Hello All,

I dug a little deeper and found this error :

15/04/27 16:05:39 WARN TransportChannelHandler: Exception in
connection from /10.1.0.90:40590
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
15/04/27 16:05:39 ERROR TransportRequestHandler: Error sending result
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=45314884029,
chunkIndex=0}, buffer=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0
lim=26227673 cap=26227673]}} to /10.1.0.90:40590; closing connection
java.nio.channels.ClosedChannelException
15/04/27 16:05:39 ERROR TransportRequestHandler: Error sending result
RpcResponse{requestId=8439869725098873668, response=[B@1bdcdf63} to
/10.1.0.90:40590; closing connection
java.nio.channels.ClosedChannelException
15/04/27 16:05:39 ERROR CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkexecu...@master.spark.com:60802] -
[akka.tcp://sparkdri...@master.spark.com:37195] disassociated!
Shutting down.
15/04/27 16:05:39 WARN ReliableDeliverySupervisor: Association with
remote system [akka.tcp://sparkdri...@master.spark.com:37195] has
failed, address is now gated for [5000] ms. Reason is:
[Disassociated].


On Mon, Apr 27, 2015 at 8:35 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 The configuration key should be spark.akka.askTimeout for this timeout.
 The time unit is seconds.

 Best Regards,
 Shixiong(Ryan) Zhu

 2015-04-26 15:15 GMT-07:00 Deepak Gopalakrishnan dgk...@gmail.com:

 Hello,


 Just to add a bit more context :

 I have done that in the code, but I cannot see it change from 30 seconds
 in the log.

 .set(spark.executor.memory, 10g)

 .set(spark.driver.memory, 20g)

 .set(spark.akka.timeout,6000)

 PS : I understand that 6000 is quite large, but I'm just trying to see if
 it actually changes


 Here is the command that I'm running

  sudo MASTER=spark://master.spark.com:7077
 /opt/spark/spark-1.3.0-bin-hadoop2.4/bin/spark-submit --class
 class-name   --executor-memory 20G --driver-memory 10G  --deploy-mode
 client --conf spark.akka.timeout=6000 --conf spark.akka.askTimeout=6000
 jar file path


 and here is how I load the file JavaPairRDDString, String
 learningRdd=sc.wholeTextFiles(filePath,10);
 Thanks

 On Mon, Apr 27, 2015 at 3:36 AM, Bryan Cutler cutl...@gmail.com wrote:

 I'm not sure what the expected performance should be for this amount of
 data, but you could try to increase the timeout with the property
 spark.akka.timeout to see if that helps.

 Bryan

 On Sun, Apr 26, 2015 at 6:57 AM, Deepak Gopalakrishnan dgk...@gmail.com
  wrote:

 Hello All,

 I'm trying to process a 3.5GB file on standalone mode using spark. I
 could run my spark job succesfully on a 100MB file and it works as
 expected. But, when I try to run it on the 3.5GB file, I run into the below
 error :


 15/04/26 12:45:50 INFO BlockManagerMaster: Updated info of block 
 taskresult_83
 15/04/26 12:46:46 WARN AkkaUtils: Error sending message [message = 
 Heartbeat(2,[Lscala.Tuple2;@790223d3,BlockManagerId(2, master.spark.com, 
 39143))] in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
 15/04/26 12:47:15 INFO MemoryStore: ensureFreeSpace(26227673) called with 
 curMem=265897, maxMem=5556991426
 15/04/26 

Re: What is difference btw reduce fold?

2015-04-27 Thread keegan
Hi Q,

fold and reduce both aggregate over a collection by implementing an
operation you specify, the major different is the starting point of the
aggregation. For fold(), you have to specify the starting value, and for
reduce() the starting value is the first (or possibly an arbitrary) element
in the collection. 

Simple examples - we can sum the numbers in a collection using both
functions:
(1 until 10).reduce( (a,b) = a+b )
(1 until 10).fold(0)( (a,b) = a+b )

With fold, we want to start at 0 and cumulatively add each element. In this
case, the operation passed to fold() and reduce() were very similar, but it
is helpful to think about fold in the following way. For the operation we
pass to fold(), imagine its two arguments are (i) the current accumulated
value and (ii) the next value in the collection,

(1 until 10).fold(0)( (accumulated_so_far, next_value) = accumulated_so_far
+ next_value ).

So the result of the operation, accumulated_so_far + next_value, will be
passed to the operation again as the first argument, and so on. 

In this way, we could count the number of elements in a collection using
fold,

(1 until 10).fold(0)( (accumulated_so_far, next_value) = accumulated_so_far
+ 1 ).


When it comes to Spark, here’s another thing to keep in mind. For both
reduce and fold, you need to make sure your operation is both commutative
and associative. For RDDs, reduce and fold are implemented on each partition
separately, and then the results are combined using the operation.  With
fold, this could get you into trouble because an empty partition will emit
fold’s starting value, so the number of partitions might erroneously affect
the result of the calculation, if you’re not careful about the operation.
This would occur with the ( (a,b) = a+1) operation from above (see
http://stackoverflow.com/questions/29150202/pyspark-fold-method-output). 

Hope this helps. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-difference-btw-reduce-fold-tp22653p22671.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Getting error running MLlib example with new cluster

2015-04-27 Thread Xiangrui Meng
How did you run the example app? Did you use spark-submit? -Xiangrui

On Thu, Apr 23, 2015 at 2:27 PM, Su She suhsheka...@gmail.com wrote:
 Sorry, accidentally sent the last email before finishing.

 I had asked this question before, but wanted to ask again as I think
 it is now related to my pom file or project setup. Really appreciate the help!

 I have been trying on/off for the past month to try to run this MLlib
 example: 
 https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala

 I am able to build the project successfully. When I run it, it returns:

 features in spam: 8
 features in ham: 7

 and then freezes. According to the UI, the description of the job is
 count at DataValidators.scala.38. This corresponds to this line in
 the code:

 val model = lrLearner.run(trainingData)

 I've tried just about everything I can think of...changed numFeatures
 from 1 - 10,000, set executor memory to 1g, set up a new cluster, at
 this point I think I might have missed dependencies as that has
 usually been the problem in other spark apps I have tried to run. This
 is my pom file, that I have used for other successful spark apps.
 Please let me know if you think I need any additional dependencies or
 there are incompatibility issues, or a pom.xml that is better to use.
 Thank you!

 Cluster information:

 Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0)
 java version 1.7.0_25
 Scala version: 2.10.4
 hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0)



 project xmlns = http://maven.apache.org/POM/4.0.0;
 xmlns:xsi=http://w3.org/2001/XMLSchema-instance; xsi:schemaLocation
 =http://maven.apache.org/POM/4.0.0
 http://maven.apache.org/maven-v4_0_0.xsd;
 groupId edu.berkely/groupId
 artifactId simple-project /artifactId
 modelVersion 4.0.0/modelVersion
 name Simple Project /name
 packaging jar /packaging
 version 1.0 /version
 repositories
 repository
 idcloudera/id
 url http://repository.cloudera.com/artifactory/cloudera-repos//url
 /repository

 repository
 idscala-tools.org/id
 nameScala-tools Maven2 Repository/name
 urlhttp://scala-tools.org/repo-releases/url
 /repository

 /repositories

 pluginRepositories
 pluginRepository
 idscala-tools.org/id
 nameScala-tools Maven2 Repository/name
 urlhttp://scala-tools.org/repo-releases/url
 /pluginRepository
 /pluginRepositories

 build
 plugins
 plugin
 groupIdorg.scala-tools/groupId
 artifactIdmaven-scala-plugin/artifactId
 executions

 execution
 idcompile/id
 goals
 goalcompile/goal
 /goals
 phasecompile/phase
 /execution
 execution
 idtest-compile/id
 goals
 goaltestCompile/goal
 /goals
 phasetest-compile/phase
 /execution
 execution
phaseprocess-resources/phase
goals
  goalcompile/goal
/goals
 /execution
 /executions
 /plugin
 plugin
 artifactIdmaven-compiler-plugin/artifactId
 configuration
 source1.7/source
 target1.7/target
 /configuration
 /plugin
 /plugins
 /build


 dependencies
 dependency !--Spark dependency --
 groupId org.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.2.0-cdh5.3.0/version
 /dependency

 dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-client/artifactId
 version2.5.0-mr1-cdh5.3.0/version
 /dependency

 dependency
 groupIdorg.scala-lang/groupId
 artifactIdscala-library/artifactId
 version2.10.4/version
 /dependency

 dependency
 groupIdorg.scala-lang/groupId
 artifactIdscala-compiler/artifactId
 version2.10.4/version
 /dependency

 dependency
 groupIdcom.101tec/groupId
 artifactIdzkclient/artifactId
 version0.3/version
 /dependency

  

Re: MLlib - Collaborative Filtering - trainImplicit task size

2015-04-27 Thread Xiangrui Meng
Could you try different ranks and see whether the task size changes?
We do use YtY in the closure, which should work the same as broadcast.
If that is the case, it should be safe to ignore this warning.
-Xiangrui

On Thu, Apr 23, 2015 at 4:52 AM, Christian S. Perone
christian.per...@gmail.com wrote:
 All these warnings come from ALS iterations, from flatMap and also from
 aggregate, for instance the origin of the state where the flatMap is showing
 these warnings (w/ Spark 1.3.0, they are also shown in Spark 1.3.1):

 org.apache.spark.rdd.RDD.flatMap(RDD.scala:296)
 org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065)
 org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530)
 org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
 scala.collection.immutable.Range.foreach(Range.scala:141)
 org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
 org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)

 And from the aggregate:

 org.apache.spark.rdd.RDD.aggregate(RDD.scala:968)
 org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112)
 org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064)
 org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538)
 org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
 scala.collection.immutable.Range.foreach(Range.scala:141)
 org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
 org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)



 On Thu, Apr 23, 2015 at 2:49 AM, Xiangrui Meng men...@gmail.com wrote:

 This is the size of the serialized task closure. Is stage 246 part of
 ALS iterations, or something before or after it? -Xiangrui

 On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone
 christian.per...@gmail.com wrote:
  Hi Sean, thanks for the answer. I tried to call repartition() on the
  input
  with many different sizes and it still continues to show that warning
  message.
 
  On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen so...@cloudera.com wrote:
 
  I think maybe you need more partitions in your input, which might make
  for smaller tasks?
 
  On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone
  christian.per...@gmail.com wrote:
   I keep seeing these warnings when using trainImplicit:
  
   WARN TaskSetManager: Stage 246 contains a task of very large size
   (208
   KB).
   The maximum recommended task size is 100 KB.
  
   And then the task size starts to increase. Is this a known issue ?
  
   Thanks !
  
   --
   Blog | Github | Twitter
   Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great
   big
   joke on me.
 
 
 
 
  --
  Blog | Github | Twitter
  Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great
  big
  joke on me.




 --
 Blog | Github | Twitter
 Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
 joke on me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?

2015-04-27 Thread Sean Owen
Works fine for me. Make sure you're not downloading the HTML
redirector page and thinking it's the archive.

On Mon, Apr 27, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:
 I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple mirrors
 and direct link. Each time i untar i get below error

 spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar: (Empty
 error message)

 tar: Error exit delayed from previous errors


 Is it broken ?


 --
 Deepak


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StandardScaler failing with OOM errors in PySpark

2015-04-27 Thread Xiangrui Meng
You might need to specify driver memory in spark-submit instead of
passing JVM options. spark-submit is designed to handle different
deployments correctly. -Xiangrui

On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar rokros...@gmail.com wrote:
 ok yes, I think I have narrowed it down to being a problem with driver
 memory settings. It looks like the application master/driver is not being
 launched with the settings specified:

 For the driver process on the main node I see -XX:MaxPermSize=128m -Xms512m
 -Xmx512m as options used to start the JVM, even though I specified

 'spark.yarn.am.memory', '5g'
 'spark.yarn.am.memoryOverhead', '2000'

 The info shows that these options were read:

 15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with 7120 MB
 memory including 2000 MB overhead

 Is there some reason why these options are being ignored and instead
 starting the driver with just 512Mb of heap?

 On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote:

 the feature dimension is 800k.

 yes, I believe the driver memory is likely the problem since it doesn't
 crash until the very last part of the tree aggregation.

 I'm running it via pyspark through YARN -- I have to run in client mode so
 I can't set spark.driver.memory -- I've tried setting the
 spark.yarn.am.memory and overhead parameters but it doesn't seem to have an
 effect.

 Thanks,

 Rok

 On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote:

  What is the feature dimension? Did you set the driver memory? -Xiangrui
 
  On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
  I'm trying to use the StandardScaler in pyspark on a relatively small
  (a few
  hundred Mb) dataset of sparse vectors with 800k features. The fit
  method of
  StandardScaler crashes with Java heap space or Direct buffer memory
  errors.
  There should be plenty of memory around -- 10 executors with 2 cores
  each
  and 8 Gb per core. I'm giving the executors 9g of memory and have also
  tried
  lots of overhead (3g), thinking it might be the array creation in the
  aggregators that's causing issues.
 
  The bizarre thing is that this isn't always reproducible -- sometimes
  it
  actually works without problems. Should I be setting up executors
  differently?
 
  Thanks,
 
  Rok
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
  Sent from the Apache Spark User List mailing list archive at
  Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Powered By Spark

2015-04-27 Thread Justin
Hi,

Would you mind adding our company to the Powered By Spark page?

organization name: atp
URL: https://atp.io
a list of which Spark components you are using: SparkSQL, MLLib, Databricks
Cloud
and a short description of your use case: Predictive models and
learning algorithms to improve the relevance of programmatic marketing


Thanks!
Justin




Justin Barton
CTO
+1 (718) 404 9272
+44 203 290 9272
atp.io | jus...@atp.io | find us https://atp.io/find-us


spark sql LEFT OUTER JOIN java.lang.ClassCastException

2015-04-27 Thread kiran mavatoor
Hi There,
I am using spark sql left out join query. 
The sql query is 
scala val test = sqlContext.sql(SELECT e.departmentID FROM employee e LEFT 
OUTER JOIN department d ON d.departmentId = e.departmentId).toDF()
In the spark 1.3.1 its working fine, but the latest pull is give the below error
15/04/27 23:02:49 ERROR Executor: Exception in task 4.0 in stage 67.0 (TID 
118)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost 
task 4.0 in stage 67.0 (TID 118) on executor localhost: 
java.lang.ClassCastException (null) [duplicate 1]15/04/27 23:02:49 ERROR 
Executor: Exception in task 2.0 in stage 67.0 (TID 
116)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost 
task 2.0 in stage 67.0 (TID 116) on executor localhost: 
java.lang.ClassCastException (null) [duplicate 2]15/04/27 23:02:49 ERROR 
Executor: Exception in task 3.0 in stage 67.0 (TID 
117)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost 
task 3.0 in stage 67.0 (TID 117) on executor localhost: 
java.lang.ClassCastException (null) [duplicate 3]15/04/27 23:02:49 ERROR 
Executor: Exception in task 0.0 in stage 66.0 (TID 
112)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost 
task 0.0 in stage 66.0 (TID 112) on executor localhost: 
java.lang.ClassCastException (null) [duplicate 1]15/04/27 23:02:49 INFO 
TaskSchedulerImpl: Removed TaskSet 66.0, whose tasks have all completed, from 
pool 15/04/27 23:02:49 ERROR Executor: Exception in task 5.0 in stage 67.0 (TID 
119)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost 
task 5.0 in stage 67.0 (TID 119) on executor localhost: 
java.lang.ClassCastException (null) [duplicate 4]15/04/27 23:02:49 ERROR 
Executor: Exception in task 0.0 in stage 67.0 (TID 
114)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost 
task 0.0 in stage 67.0 (TID 114) on executor localhost: 
java.lang.ClassCastException (null) [duplicate 5]15/04/27 23:02:49 INFO 
TaskSchedulerImpl: Removed TaskSet 67.0, whose tasks have all completed, from 
pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 
in stage 66.0 failed 1 times, most recent failure: Lost task 1.0 in stage 66.0 
(TID 113, localhost): java.lang.ClassCastException
Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1241)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1232)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1231)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1231) at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:705)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:705)
 at scala.Option.foreach(Option.scala:236) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:705)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1424)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1385)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
ThanksKiran. 

Streaming app with windowing and persistence

2015-04-27 Thread Alexander Krasheninnikov

Hello, everyone.
I develop stream application, working with window functions - each 
window create table and perform some SQL-operations on extracted data.
I met such problem: when using window operations and checkpointing, 
application does not start next time.

Here is the code:



finalDuration batchDuration = Durations.seconds(10);
finalDuration slideDuration = Durations.seconds(10);
finalDuration windowDuration = Durations.seconds(600);

finalSparkConf conf =newSparkConf();
conf.setAppName(Streaming);
conf.setMaster(local[4]);


JavaStreamingContextFactory contextFactory =newJavaStreamingContextFactory() {
@Override
publicJavaStreamingContext create() {
JavaStreamingContext streamingContext 
=newJavaStreamingContext(conf,batchDuration);
streamingContext.checkpoint(CHECKPOINT_DIR);

returnstreamingContext;
}
};

JavaStreamingContext streamingContext = 
JavaStreamingContext.getOrCreate(CHECKPOINT_DIR,newConfiguration(), 
contextFactory,true);
JavaDStreamString lines = streamingContext.textFileStream(SOURCE_DIR);

lines.countByWindow(windowDuration,slideDuration).print();

streamingContext.start();
streamingContext.awaitTermination();



I expect, that after application restart, Spark will merge old event 
counter with new values (if it is not so, I am ready to merge old data 
manually).

But, after application restart, I have this error:
Exception in thread main org.apache.spark.SparkException: 
org.apache.spark.streaming.dstream.MappedDStream@49db6f23 has not been 
initialized
at 
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)

at scala.Option.orElse(Option.scala:289)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)

at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
at 
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
at 
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:584)

at my.package.FileAggregations.main(FileAggregations.java:76)

At FileAggregations.java:76 is

streamingContext.start();

Spark version is 1.3.0.

---
wbr, Alexandr Krasheninnikov


deos randomSplit return a copy or a reference to the original rdd? [Python]

2015-04-27 Thread Pagliari, Roberto
Suppose I have something like the code below


for idx in xrange(0, 10):
train_test_split = training.randomSplit(weights=[0.75, 0.25])
train_cv = train_test_split[0]
test_cv = train_test_split[1]
# scale train_cv and test_cv


by scaling train_cv and test_cv, will the original data be affected?

Thanks,



Re: Getting error running MLlib example with new cluster

2015-04-27 Thread Su She
Hello Xiangrui,

I am using this spark-submit command (as I do for all other jobs):

/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/spark/bin/spark-submit
--class MLlib --master local[2] --jars $(echo
/home/ec2-user/sparkApps/learning-spark/lib/*.jar | tr ' ' ',')
/home/ec2-user/sparkApps/learning-spark/target/simple-project-1.1.jar

Thank you for the help!

Best,

Su


On Mon, Apr 27, 2015 at 9:58 AM, Xiangrui Meng men...@gmail.com wrote:
 How did you run the example app? Did you use spark-submit? -Xiangrui

 On Thu, Apr 23, 2015 at 2:27 PM, Su She suhsheka...@gmail.com wrote:
 Sorry, accidentally sent the last email before finishing.

 I had asked this question before, but wanted to ask again as I think
 it is now related to my pom file or project setup. Really appreciate the 
 help!

 I have been trying on/off for the past month to try to run this MLlib
 example: 
 https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala

 I am able to build the project successfully. When I run it, it returns:

 features in spam: 8
 features in ham: 7

 and then freezes. According to the UI, the description of the job is
 count at DataValidators.scala.38. This corresponds to this line in
 the code:

 val model = lrLearner.run(trainingData)

 I've tried just about everything I can think of...changed numFeatures
 from 1 - 10,000, set executor memory to 1g, set up a new cluster, at
 this point I think I might have missed dependencies as that has
 usually been the problem in other spark apps I have tried to run. This
 is my pom file, that I have used for other successful spark apps.
 Please let me know if you think I need any additional dependencies or
 there are incompatibility issues, or a pom.xml that is better to use.
 Thank you!

 Cluster information:

 Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0)
 java version 1.7.0_25
 Scala version: 2.10.4
 hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0)



 project xmlns = http://maven.apache.org/POM/4.0.0;
 xmlns:xsi=http://w3.org/2001/XMLSchema-instance; xsi:schemaLocation
 =http://maven.apache.org/POM/4.0.0
 http://maven.apache.org/maven-v4_0_0.xsd;
 groupId edu.berkely/groupId
 artifactId simple-project /artifactId
 modelVersion 4.0.0/modelVersion
 name Simple Project /name
 packaging jar /packaging
 version 1.0 /version
 repositories
 repository
 idcloudera/id
 url 
 http://repository.cloudera.com/artifactory/cloudera-repos//url
 /repository

 repository
 idscala-tools.org/id
 nameScala-tools Maven2 Repository/name
 urlhttp://scala-tools.org/repo-releases/url
 /repository

 /repositories

 pluginRepositories
 pluginRepository
 idscala-tools.org/id
 nameScala-tools Maven2 Repository/name
 urlhttp://scala-tools.org/repo-releases/url
 /pluginRepository
 /pluginRepositories

 build
 plugins
 plugin
 groupIdorg.scala-tools/groupId
 artifactIdmaven-scala-plugin/artifactId
 executions

 execution
 idcompile/id
 goals
 goalcompile/goal
 /goals
 phasecompile/phase
 /execution
 execution
 idtest-compile/id
 goals
 goaltestCompile/goal
 /goals
 phasetest-compile/phase
 /execution
 execution
phaseprocess-resources/phase
goals
  goalcompile/goal
/goals
 /execution
 /executions
 /plugin
 plugin
 artifactIdmaven-compiler-plugin/artifactId
 configuration
 source1.7/source
 target1.7/target
 /configuration
 /plugin
 /plugins
 /build


 dependencies
 dependency !--Spark dependency --
 groupId org.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.2.0-cdh5.3.0/version
 /dependency

 dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-client/artifactId
 version2.5.0-mr1-cdh5.3.0/version
 /dependency

 

Spark Job fails with 6 executors and succeeds with 8 ?

2015-04-27 Thread ๏̯͡๏
I have this Spark App and it fails when i run with 6 executors but succeeds
with 8.

Any suggestions ?

Command:

 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
--num-executors 6 --driver-memory 12g --driver-java-options
-XX:MaxPermSize=8G --executor-memory 12g --executor-cores 6 --queue
hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-04-6 endDate=2015-04-7
input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=2G

Input Data Sets Size:

-sh-4.1$ hadoop fs -ls XX/YY/part-r-0

 2663019338 bytes

-sh-4.1$ hadoop fs -ls /AA/BB/part-r-0

 2688348022  bytes

-sh-4.1$ hadoop fs -ls /FOO/BAAR/ch196out83-r-0.avro

 1274065689 bytes

-sh-4.1$

Any thouhgts ?
Exception:
15/04/27 22:12:46 INFO Configuration.deprecation:
mapred.output.compression.codec is deprecated. Instead, use
mapreduce.output.fileoutputformat.compress.codec
15/04/27 22:12:47 ERROR executor.Executor: Exception in task 8.0 in stage
4.0 (TID 36)
scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference
involving object SchemaUtil
at
scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1220)
at
scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1218)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.reflect.internal.Symbols$Symbol.lock(Symbols.scala:482)
at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1218)
at scala.reflect.internal.Symbols$Symbol.initialize(Symbols.scala:1374)
at scala.reflect.internal.Symbols$Symbol.hasFlag(Symbols.scala:607)
at scala.reflect.internal.Symbols$TermSymbol.isTermMacro(Symbols.scala:2453)
at scala.reflect.internal.Symbols$Symbol.symbolKind(Symbols.scala:2263)
at
scala.reflect.internal.Symbols$Symbol.sanitizedKindString(Symbols.scala:2297)
at scala.reflect.internal.Symbols$Symbol.kindString(Symbols.scala:2305)
at scala.reflect.internal.Symbols$Symbol.toString(Symbols.scala:2350)
at java.lang.String.valueOf(String.java:2847)
at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
at
scala.collection.TraversableOnce$$anonfun$addString$1.apply(TraversableOnce.scala:327)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableOnce$class.addString(TraversableOnce.scala:320)
at scala.collection.AbstractTraversable.addString(Traversable.scala:105)
at
scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:286)
at scala.collection.AbstractTraversable.mkString(Traversable.scala:105)
at
scala.collection.TraversableLike$class.toString(TraversableLike.scala:639)
at scala.collection.SeqLike$class.toString(SeqLike.scala:646)
at scala.collection.AbstractSeq.toString(Seq.scala:40)
at java.lang.String.valueOf(String.java:2847)
at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678)
at
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:44)
at
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
at
scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72)
at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:161)
at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:21)
at
com.ebay.ep.poc.spark.reporting.process.util.SchemaUtil$$typecreator3$1.apply(SchemaUtil.scala:108)
at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231)
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)
at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:335)
at scala.reflect.api.Universe.typeOf(Universe.scala:59)
at
com.ebay.ep.poc.spark.reporting.process.util.SchemaUtil$.com$ebay$ep$poc$spark$reporting$process$util$SchemaUtil$$getField(SchemaUtil.scala:108)
at
com.ebay.ep.poc.spark.reporting.process.util.SchemaUtil$$anonfun$6$$anonfun$apply$4.apply(SchemaUtil.scala:94)
at
com.ebay.ep.poc.spark.reporting.process.util.SchemaUtil$$anonfun$6$$anonfun$apply$4.apply(SchemaUtil.scala:90)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.reflect.internal.Scopes$Scope.foreach(Scopes.scala:315)
at 

Serialization error

2015-04-27 Thread madhvi

Hi,

While connecting to accumulo through spark by making sparkRDD I am 
getting the following error:

 object not serializable (class: org.apache.accumulo.core.data.Key)

This is due to the 'key' class of accumulo which does not implement 
serializable interface.How it can be solved and accumulo can be used 
with spark


Thanks
Madhvi

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to work with foreachrdd

2015-04-27 Thread drarse
Can u write the code? Maby is the foreachRDD body. :)

El martes, 28 de abril de 2015, CH.KMVPRASAD [via Apache Spark User List] 
ml-node+s1001560n22681...@n3.nabble.com escribió:

 When i run spark streaming application print method is  printing result it
 is f9, but i used foreachrdd on that dstream object it is not working ?
 why? what is the reason!

 please help me!

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-work-with-foreachrdd-tp22681.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 javascript:_e(%7B%7D,'cvml','ml-node%2bs1001560n1...@n3.nabble.com');
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=ZHJhcnNlLmFtZXNAZ21haWwuY29tfDF8MTUyMzY0MjQyMA==
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



-- 
Atte. Sergio Jiménez




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-work-with-foreachrdd-tp22681p22682.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: gridsearch - python

2015-04-27 Thread Xiangrui Meng
We will try to make them available in 1.4, which is coming soon. -Xiangrui

On Thu, Apr 23, 2015 at 10:18 PM, Pagliari, Roberto
rpagli...@appcomsci.com wrote:
 I know grid search with cross validation is not supported. However, I was
 wondering if there is something availalable for the time being.



 Thanks,





 From: Punyashloka Biswal [mailto:punya.bis...@gmail.com]
 Sent: Thursday, April 23, 2015 9:06 PM
 To: Pagliari, Roberto; user@spark.apache.org
 Subject: Re: gridsearch - python



 https://issues.apache.org/jira/browse/SPARK-7022.

 Punya



 On Thu, Apr 23, 2015 at 5:47 PM Pagliari, Roberto rpagli...@appcomsci.com
 wrote:

 Can anybody point me to an example, if available, about gridsearch with
 python?



 Thank you,



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on Mesos

2015-04-27 Thread Stephen Carman
So I installed spark on each of the slaves 1.3.1 built with hadoop2.6 I just 
basically got the pre-built from the spark website…

I placed those compiled spark installs on each slave at /opt/spark

My spark properties seem to be getting picked up on my side fine…

[cid:683C1BA0-C9EC-448C-B1DB-E93AC4576DE9@coldlight.corp]
The framework is registered in Mesos, it shows up just fine, it doesn’t matter 
if I turn off the executor uri or not, but I always get the same error…

org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in 
stage 0.0 failed 4 times, most recent failure: Lost task 6.3 in stage 0.0 (TID 
23, 10.253.1.117): ExecutorLostFailure (executor 
20150424-104711-1375862026-5050-20113-S1 lost)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

These boxes are totally open to one another so they shouldn’t have any firewall 
issues, everything seems to show up in mesos and spark just fine, but actually 
running stuff totally blows up.

There is nothing in the stderr or stdout, it downloads the package and untars 
it but doesn’t seem to do much after that. Any insights?

Steve


On Apr 24, 2015, at 5:50 PM, Yang Lei 
genia...@gmail.commailto:genia...@gmail.com wrote:

SPARK_PUBLIC_DNS, SPARK_LOCAL_IP, SPARK_LOCAL_HOST

This e-mail is intended solely for the above-mentioned recipient and it may 
contain confidential or privileged information. If you have received it in 
error, please notify us immediately and delete the e-mail. You must not copy, 
distribute, disclose or take any action in reliance on it. In addition, the 
contents of an attachment to this e-mail may contain software viruses which 
could damage your own computer system. While ColdLight Solutions, LLC has taken 
every reasonable precaution to minimize this risk, we cannot accept liability 
for any damage which you sustain as a result of software viruses. You should 
perform your own virus checks before opening the attachment.


Group by order by

2015-04-27 Thread Ulanov, Alexander
Hi,

Could you suggest what is the best way to do group by x order by y in Spark?

When I try to perform it with Spark SQL I get the following error (Spark 1.3):

val results = sqlContext.sql(select * from sample group by id order by time)
org.apache.spark.sql.AnalysisException: expression 'time' is neither present in 
the group by, nor is it an aggregate function. Add to group by or wrap in 
first() if you don't care which value you get.;
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37)

Is there a way to do it with just RDD?

Best regards, Alexander


Re: Group by order by

2015-04-27 Thread Richard Marscher
Hi,

that error seems to indicate the basic query is not properly expressed. If
you group by just ID, then that means it would need to aggregate all the
time values into one value per ID, so you can't sort by it. Thus it tries
to suggest an aggregate function for time so you can have 1 value per ID
and properly sort it.

On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander alexander.ula...@hp.com
wrote:

  Hi,



 Could you suggest what is the best way to do “group by x order by y” in
 Spark?



 When I try to perform it with Spark SQL I get the following error (Spark
 1.3):



 val results = sqlContext.sql(select * from sample group by id order by
 time)

 org.apache.spark.sql.AnalysisException: expression 'time' is neither
 present in the group by, nor is it an aggregate function. Add to group by
 or wrap in first() if you don't care which value you get.;

 at
 org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37)



 Is there a way to do it with just RDD?



 Best regards, Alexander



RE: Group by order by

2015-04-27 Thread Ulanov, Alexander
Hi Richard,

There are several values of time per id. Is there a way to perform group by id 
and sort by time in Spark?

Best regards, Alexander

From: Richard Marscher [mailto:rmarsc...@localytics.com]
Sent: Monday, April 27, 2015 12:20 PM
To: Ulanov, Alexander
Cc: user@spark.apache.org
Subject: Re: Group by order by

Hi,

that error seems to indicate the basic query is not properly expressed. If you 
group by just ID, then that means it would need to aggregate all the time 
values into one value per ID, so you can't sort by it. Thus it tries to suggest 
an aggregate function for time so you can have 1 value per ID and properly sort 
it.

On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander 
alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote:
Hi,

Could you suggest what is the best way to do “group by x order by y” in Spark?

When I try to perform it with Spark SQL I get the following error (Spark 1.3):

val results = sqlContext.sql(select * from sample group by id order by time)
org.apache.spark.sql.AnalysisException: expression 'time' is neither present in 
the group by, nor is it an aggregate function. Add to group by or wrap in 
first() if you don't care which value you get.;
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37)

Is there a way to do it with just RDD?

Best regards, Alexander



Understanding Spark's caching

2015-04-27 Thread Eran Medan
Hi Everyone!

I'm trying to understand how Spark's cache work.

Here is my naive understanding, please let me know if I'm missing something:

val rdd1 = sc.textFile(some data)
rdd.cache() //marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.saveAsTextFile(...)
rdd3.saveAsTextFile(...)

In the above, rdd1 will be loaded from disk (e.g. HDFS) only once. (when
rdd2 is saved I assume) and then from cache (assuming there is enough RAM)
when rdd3 is saved)

Now here is my question. Let's say I want to cache rdd2 and rdd3 as they
will both be used later on, but I don't need rdd1 after creating them.

Basically there is duplication, isn't it? Since once rdd2 and rdd3 are
calculated, I don't need rdd1 anymore, I should probably unpersist it,
right? the question is when?

*Will this work? (Option A)*

val rdd1 = sc.textFile(some data)
rdd.cache() //marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()

Does spark add the unpersist call to the DAG? or is it done immediately? if
it's done immediately, then basically rdd1 will be non cached when I read
from rdd2 and rdd3, right?

*Should I do it this way instead (Option B)?*

val rdd1 = sc.textFile(some data)
rdd.cache() //marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)

rdd2.cache()
rdd3.cache()

rdd2.saveAsTextFile(...)
rdd3.saveAsTextFile(...)

rdd1.unpersist()

*So the question is this:* Is Option A good enough? e.g. will rdd1 be still
accessing the file only once? Or do I need to go with Option B?

(see also
http://stackoverflow.com/questions/29903675/understanding-sparks-caching)

Thanks in advance