Spark + Mesos + HDFS resource split
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi, I am building a mesos cluster for the purposes of using it to run spark workloads (in addition to other frameworks). I am under the impression that it is preferable/recommended to run hdfs datanode process, spark slave on the same physical node (or EC2 instance or VM). My question is: What is the recommended resource splitting? How much memory and CPU should I preallocate for HDFS and how much should I set aside as allocatable by mesos? In addition, is there some rule-of-thumb recommendation around this? - -- Ankur Chauhan -BEGIN PGP SIGNATURE- iQEcBAEBAgAGBQJVPgiLAAoJEOSJAMhvLp3L2fEIANkmfTjzEhjQ1IEc5W59F8sP mT06qpxnd3XPg8DFOPIKxxCAtsVU1fImAOnFYobi9mQlEzcEbDtPMoLh0uStFIIS cuorK4j0Am8Y1xxYa8BhKuWEtpYoFtSEYIF5eHe5vNlt5FlEvs3vTJ3N/zFbxVsq I0FQH8r9u27pBJ9/rACyruYhgh/b5Tc6s39uKDFFJnhDWezMF2sF1WCgcIbZRP4+ PAhqLNPuVNAPcpi9JAe8u91d8yeFFVb/00mO2am2cr0BcHnfeWq6ZFftZUQrX3PK FvD7FpfeFLCS5FinDqMHp2nkGetlJMQsIYRzvn3tmim8OeE6ppFsO0LnRNEqEtQ= =I22x -END PGP SIGNATURE- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cassandra Connection Issue with Spark-jobserver
Are you using DSE spark, if so are you pointing spark job server to use DSE spark? Thanks and Regards Noorul Anand anand.vi...@monotype.com writes: *I am new to Spark world and Job Server My Code :* package spark.jobserver import java.nio.ByteBuffer import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer import scala.collection.immutable.Map import org.apache.cassandra.hadoop.ConfigHelper import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat import org.apache.cassandra.hadoop.cql3.CqlConfigHelper import org.apache.cassandra.hadoop.cql3.CqlOutputFormat import org.apache.cassandra.utils.ByteBufferUtil import org.apache.hadoop.mapreduce.Job import com.typesafe.config.{Config, ConfigFactory} import org.apache.spark._ import org.apache.spark.SparkContext._ import scala.util.Try object CassandraCQLTest extends SparkJob{ def main(args: Array[String]) { val sc = new SparkContext(local[4], CassandraCQLTest) sc.addJar(/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar); val config = ConfigFactory.parseString() val results = runJob(sc, config) println(Result is + test) } override def validate(sc: SparkContext, config: Config): SparkJobValidation = { Try(config.getString(input.string)) .map(x = SparkJobValid) .getOrElse(SparkJobInvalid(No input.string config param)) } override def runJob(sc: SparkContext, config: Config): Any = { val cHost: String = localhost val cPort: String = 9160 val KeySpace = retail val InputColumnFamily = ordercf val OutputColumnFamily = salecount val job = new Job() job.setInputFormatClass(classOf[CqlPagingInputFormat]) ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily) ConfigHelper.setInputPartitioner(job.getConfiguration(), Murmur3Partitioner) CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), 3) /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), user_id='bob') */ /** An UPDATE writes one or more columns to a record in a Cassandra column family */ val query = UPDATE + KeySpace + . + OutputColumnFamily + SET sale_count = ? CqlConfigHelper.setOutputCql(job.getConfiguration(), query) job.setOutputFormatClass(classOf[CqlOutputFormat]) ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily) ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost) ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) ConfigHelper.setOutputPartitioner(job.getConfiguration(), Murmur3Partitioner) val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[CqlPagingInputFormat], classOf[java.util.Map[String,ByteBuffer]], classOf[java.util.Map[String,ByteBuffer]]) val productSaleRDD = casRdd.map { case (key, value) = { (ByteBufferUtil.string(value.get(prod_id)), ByteBufferUtil.toInt(value.get(quantity))) } } val aggregatedRDD = productSaleRDD.reduceByKey(_ + _) aggregatedRDD.collect().foreach { case (productId, saleCount) = println(productId + : + saleCount) } val casoutputCF = aggregatedRDD.map { case (productId, saleCount) = { val outColFamKey = Map(prod_id - ByteBufferUtil.bytes(productId)) val outKey: java.util.Map[String, ByteBuffer] = outColFamKey var outColFamVal = new ListBuffer[ByteBuffer] outColFamVal += ByteBufferUtil.bytes(saleCount) val outVal: java.util.List[ByteBuffer] = outColFamVal (outKey, outVal) } } casoutputCF.saveAsNewAPIHadoopFile( KeySpace, classOf[java.util.Map[String, ByteBuffer]], classOf[java.util.List[ByteBuffer]], classOf[CqlOutputFormat], job.getConfiguration() ) casRdd.count } } *When I push the Jar using spark-jobServer and execute it I get this on spark-jobserver terminal * job-server[ERROR] Exception in thread pool-1-thread-1 java.lang.NoClassDefFoundError: org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46) job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21) job-server[ERROR] at spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235) job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
spark-defaults.conf
I renamed spark-defaults.conf.template to spark-defaults.conf and invoked spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh But I still get failed to launch org.apache.spark.deploy.worker.Worker: --properties-file FILE Path to a custom Spark properties file. Default is conf/spark-defaults.conf. But I'm thinking it should pick up the default spark-defaults.conf from conf dir Am I expecting or doing something wrong? Regards jk
Disable partition discovery
How can one disable *Partition discovery* in *Spark 1.3.0 *when using *sqlContext.parquetFile*? Alternatively, is there a way to load *.parquet* files without *Partition discovery*? Cosmin
Re: Parquet error reading data that contains array of structs
FYI, Parquet schema output: message pig_schema { optional binary cust_id (UTF8); optional int32 part_num; optional group ip_list (LIST) { repeated group ip_t { optional binary ip (UTF8); } } optional group vid_list (LIST) { repeated group vid_t { optional binary vid (UTF8); } } optional group fso_list (LIST) { repeated group fso_t { optional binary fso (UTF8); } } } And Parquet meta output: creator: [parquet-mr (build ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7), parquet-mr version 1.6.0rc7 (build ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7), parquet-mr] extra: pig.schema = cust_id: chararray,part_num: int,ip_list: {ip_t: (ip: chararray)},vid_list: {vid_t: (vid: chararray)},fso_list: {fso_t: (fso: chararray)} file schema: pig_schema cust_id: OPTIONAL BINARY O:UTF8 R:0 D:1 part_num:OPTIONAL INT32 R:0 D:1 ip_list: OPTIONAL F:1 .ip_t: REPEATED F:1 ..ip:OPTIONAL BINARY O:UTF8 R:1 D:3 vid_list:OPTIONAL F:1 .vid_t: REPEATED F:1 ..vid: OPTIONAL BINARY O:UTF8 R:1 D:3 fso_list:OPTIONAL F:1 .fso_t: REPEATED F:1 ..fso: OPTIONAL BINARY O:UTF8 R:1 D:3 row group 1: RC:1201092 TS:537930256 OFFSET:4 cust_id: BINARY GZIP DO:0 FPO:4 SZ:10629422/27627221/2.60 VC:1201092 ENC:PLAIN,RLE,BIT_PACKED part_num: INT32 GZIP DO:0 FPO:10629426 SZ:358/252/0.70 VC:1201092 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED ip_list: .ip_t: ..ip: BINARY GZIP DO:0 FPO:10629784 SZ:41331065/180501686/4.37 VC:10540378 ENC:PLAIN,RLE vid_list: .vid_t: ..vid:BINARY GZIP DO:0 FPO:51960849 SZ:58820404/254819721/4.33 VC:11011894 ENC:PLAIN,RLE fso_list: .fso_t: ..fso:BINARY GZIP DO:0 FPO:110781253 SZ:21363255/74981376/3.51 VC:5612655 ENC:PLAIN,RLE row group 2: RC:1830769 TS:1045506907 OFFSET:132144508 cust_id: BINARY GZIP DO:0 FPO:132144508 SZ:17720131/42110882/2.38 VC:1830769 ENC:PLAIN,RLE,BIT_PACKED part_num: INT32 GZIP DO:0 FPO:149864639 SZ:486/346/0.71 VC:1830769 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED ip_list: .ip_t: ..ip: BINARY GZIP DO:0 FPO:149865125 SZ:37687630/342050955/9.08 VC:20061916 ENC:PLAIN,PLAIN_DICTIONARY,RLE vid_list: .vid_t: ..vid:BINARY GZIP DO:0 FPO:187552755 SZ:56498124/516700215/9.15 VC:22410351 ENC:PLAIN,PLAIN_DICTIONARY,RLE fso_list: .fso_t: ..fso:BINARY GZIP DO:0 FPO:244050879 SZ:20110276/144644509/7.19 VC:10739272 ENC:PLAIN,PLAIN_DICTIONARY,RLE row group 3: RC:22445 TS:4304290 OFFSET:264161155 cust_id: BINARY GZIP DO:0 FPO:264161155 SZ:221527/516312/2.33 VC:22445 ENC:PLAIN,RLE,BIT_PACKED part_num: INT32 GZIP DO:0 FPO:264382682 SZ:102/64/0.63 VC:22445 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED ip_list: .ip_t: ..ip: BINARY GZIP DO:0 FPO:264382784 SZ:483962/1204312/2.49 VC:123097 ENC:PLAIN_DICTIONARY,RLE vid_list: .vid_t: ..vid:BINARY GZIP DO:0 FPO:264866746 SZ:622977/2122080/3.41 VC:133136 ENC:PLAIN,PLAIN_DICTIONARY,RLE fso_list: .fso_t: ..fso:BINARY GZIP DO:0 FPO:265489723 SZ:240588/461522/1.92 VC:62173 ENC:PLAIN_DICTIONARY,RLE Jianshi On Mon, Apr 27, 2015 at 12:40 PM, Cheng Lian lian.cs@gmail.com wrote: Had an offline discussion with Jianshi, the dataset was generated by Pig. Jianshi - Could you please attach the output of parquet-schema path-to-parquet-file? I guess this is a Parquet format backwards-compatibility issue. Parquet hadn't standardized representation of LIST and MAP until recently, thus many systems made their own choice and are not easily inter-operatable. In earlier days, Spark SQL used LIST and MAP formats similar to Avro, which was unfortunately not chosen as the current standard format. Details can be found here: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md This document also defines backwards-compatibility rules to handle legacy Parquet data written by old Parquet implementations in various systems. So ideally, now Spark SQL should always write data following the standard, and implement all backwards-compatibility rules to read legacy data. JIRA issue for this is https://issues.apache.org/jira/browse/SPARK-6774 I'm working on a PR https://github.com/apache/spark/pull/5422 for this. To fix SPARK-6774, we need to implement backwards-compatibility rules in both record converter and schema converter together. This PR has fixed the former, but I still need some time to finish the latter part and add tests. Cheng On 4/25/15 2:22 AM, Yin Huai wrote: oh, I missed that. It is fixed in 1.3.0. Also, Jianshi, the dataset was not generated by Spark SQL, right? On Fri, Apr 24, 2015 at 11:09 AM, Ted Yu yuzhih...@gmail.com wrote: Yin: Fix
How to distribute Spark computation recipes
Hi everyone, I know that any RDD is related to its SparkContext and the associated variables (broadcast, accumulators), but I'm looking for a way to serialize/deserialize full RDD computations ? @rxin Spark SQL is, in a way, already doing this but the parsers are private[sql], is there any way to reuse this work to get Logical/Physical Plans in out of Spark ? Regards, Olivier.
Re: Spark streaming action running the same work in parallel
I was able to get it working. Instead of using customers.flatMap to return alerts. I had to use the following: customers.foreachRDD(new FunctionJavaPairRDDlt;String, Iterablelt;QueueEvent, Void() { @Override public Void call(final JavaPairRDDString, Iterablelt;QueueEvent rdd) throws Exception { rdd.foreachPartition(new VoidFunctionIteratorlt;Tuple2lt;String, Iterablelt;QueueEvent() { @Override public void call(final IteratorTuple2lt;String, Iterablelt;QueueEvent i) throws Exception { } } } } This made sure that we only sent one alert per event for a customer. My unit test showed that there was one RDD that had both customers with their events as partitions. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-action-running-the-same-work-in-parallel-tp22613p22665.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-defaults.conf
You should distribute your configuration file to workers and set the appropriate environment variables, like HADOOP_HOME, SPARK_HOME, HADOOP_CONF_DIR, SPARK_CONF_DIR. On Mon, Apr 27, 2015 at 12:56 PM James King jakwebin...@gmail.com wrote: I renamed spark-defaults.conf.template to spark-defaults.conf and invoked spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh But I still get failed to launch org.apache.spark.deploy.worker.Worker: --properties-file FILE Path to a custom Spark properties file. Default is conf/spark-defaults.conf. But I'm thinking it should pick up the default spark-defaults.conf from conf dir Am I expecting or doing something wrong? Regards jk
Spark 1.2.1: How to convert SchemaRDD to CassandraRDD?
Hi all, following the import com.datastax.spark.connector.SelectableColumnRef; import com.datastax.spark.connector.japi.CassandraJavaUtil; import org.apache.spark.sql.SchemaRDD; import static com.datastax.spark.connector.util.JavaApiHelper.toScalaSeq; import scala.collection.Seq; SchemaRDD schemaRDD = cassandraSQLContext.sql( select user.id as user_id from user ); schemaRDD.cache(); schemaRDD.collect(); I want to do SELECT on schemaRDD as SeqSelectableColumnRef columnRefs = toScalaSeq(CassandraJavaUtil.toSelectableColumnRefs(user_id)); CassandraRDDUUID rdd = schemaRDD.select(columnRefs); but having type mismatch at schemaRDD.select() : incompatible types: SeqSelectableColumnRef cannot be converted to SeqExpression so obviously I need to convert SchemaRDD to CassandraRDD. How can it be done?
Re: Cassandra Connection Issue with Spark-jobserver
I was able to fix the issues by providing right version of cassandra-all and thrift libraries -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-Connection-Issue-with-Spark-jobserver-tp22587p22664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ReduceByKey and sorting within partitions
Hi, I'm trying, after reducing by key, to get data ordered among partitions (like RangePartitioner) and within partitions (like sortByKey or repartitionAndSortWithinPartition) pushing the sorting down to the shuffles machinery of the reducing phase. I think, but maybe I'm wrong, that the correct way to do that is that combineByKey call setKeyOrdering function on the ShuflleRDD that it returns. Am I wrong? Can be done by a combination of other transformations with the same efficiency? Thanks, Marco - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting to a cluster behind a VPN, configuring different IP address
Hi, and what can I do when I am on Windows? It does not allow me to set the hostname to some IP Thanks, Tim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-to-a-cluster-behind-a-VPN-configuring-different-IP-address-tp9360p22674.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hive-thriftserver maven artifact
This is available for 1.3.1: http://mvnrepository.com/artifact/org.apache.spark/spark-hive-thriftserver_2.10 FYI On Mon, Feb 16, 2015 at 7:24 AM, Marco marco@gmail.com wrote: Ok, so will it be only available for the next version (1.30)? 2015-02-16 15:24 GMT+01:00 Ted Yu yuzhih...@gmail.com: I searched for 'spark-hive-thriftserver_2.10' on this page: http://mvnrepository.com/artifact/org.apache.spark Looks like it is not published. On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote: Hi, I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact in a public repository ? I have not found it @Maven Central. Thanks, Marco -- Viele Grüße, Marco
Re: Group by order by
It's not related to Spark, but the concept of what you are trying to do with the data. Grouping by ID means consolidating data for each ID down to 1 row per ID. You can sort by time after this point yes, but you would need to either take each ID and time value pair OR do some aggregate operation on the time. That's what the error message is explaining. Maybe you can describe what you want your results to look like? Here is some detail about the underlying operations here: Example Data: ID | Time | SomeVal 102-02-154 1 02-03-15 5 2 02-02-15 4 2 02-02-15 5 2 02-05-15 2 A. So if you do Group By ID this means 1 row per ID like below: ID 1 2 To include Time in this projection you need to aggregate it with a function to a single value. Then and only then can you use it in the projection and sort on it. SELECT id, max(time) FROM sample GROUP BY id SORT BY max(time) desc; ID | max(time) 2 02-05-15 1 02-03-15 B. Or if you do Group by ID, time then you get 1 row per ID and time pair: ID | Time 102-02-15 102-03-15 202-02-15 202-05-15 Notice both rows with ID `2` and time `02-02-15` group down to 1 row in the results here. In this case you can sort the results by time without using an aggregate function. SELECT id, time FROM sample GROUP BY id, time SORT BY time desc; ID | Time 202-05-15 102-03-15 102-02-15 202-02-15 On Mon, Apr 27, 2015 at 3:28 PM, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi Richard, There are several values of time per id. Is there a way to perform group by id and sort by time in Spark? Best regards, Alexander *From:* Richard Marscher [mailto:rmarsc...@localytics.com] *Sent:* Monday, April 27, 2015 12:20 PM *To:* Ulanov, Alexander *Cc:* user@spark.apache.org *Subject:* Re: Group by order by Hi, that error seems to indicate the basic query is not properly expressed. If you group by just ID, then that means it would need to aggregate all the time values into one value per ID, so you can't sort by it. Thus it tries to suggest an aggregate function for time so you can have 1 value per ID and properly sort it. On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi, Could you suggest what is the best way to do “group by x order by y” in Spark? When I try to perform it with Spark SQL I get the following error (Spark 1.3): val results = sqlContext.sql(select * from sample group by id order by time) org.apache.spark.sql.AnalysisException: expression 'time' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37) Is there a way to do it with just RDD? Best regards, Alexander
bug: numClasses is not a valid argument of LogisticRegressionWithSGD
With the Python APIs, the available arguments I got (using inspect module) are the following: ['cls', 'data', 'iterations', 'step', 'miniBatchFraction', 'initialWeights', 'regParam', 'regType', 'intercept'] numClasses is not available. Can someone comment on this? Thanks,
Automatic Cache in SparkSQL
Hi, I am trying to answer a simple query with SparkSQL over the Parquet file. When execute the query several times, the first run will take about 2s while the later run will take 0.1s. By looking at the log file it seems the later runs doesn't load the data from disk. However, I didn't enable any cache explicitly. Is there any automatic cache used by SparkSQL? Is there anyway to check this? Thank you? Best, Wenlei
Fwd: Change ivy cache for spark on Windows
+user -- Forwarded message -- From: Burak Yavuz brk...@gmail.com Date: Mon, Apr 27, 2015 at 1:59 PM Subject: Re: Change ivy cache for spark on Windows To: mj jone...@gmail.com Hi, In your conf file (SPARK_HOME\conf\spark-defaults.conf) you can set: `spark.jars.ivy \your\path` Best, Burak On Mon, Apr 27, 2015 at 1:49 PM, mj jone...@gmail.com wrote: Hi, I'm having trouble using the --packages option for spark-shell.cmd - I have to use Windows at work and have been issued a username with a space in it that means when I use the --packages option it fails with this message: Exception in thread main java.net.URISyntaxException: Illegal character in path at index 13: C:/Users/My Name/.ivy2/jars/spark-csv_2.10.jar The command I'm trying to run is: .\spark-shell.cmd --packages com.databricks:spark-csv_2.10:1.0.3 I've tried creating an ivysettings.xml file with the content below in my .ivy2 directory, but spark doesn't seem to pick it up. Does anyone have any ideas of how to get around this issue? ivysettings caches defaultCacheDir=c:\ivy_cache/ /ivysettings -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Change-ivy-cache-for-spark-on-Windows-tp22675.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Super slow caching in 1.3?
Michael, There is only one schema: both versions have 200 string columns in one file. On Mon, Apr 20, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote: Now this is very important: “Normal RDDs” refers to “batch RDDs”. However the default in-memory Serialization of RDDs which are part of DSTream is “Srialized” rather than actual (hydrated) Objects. The Spark documentation states that “Serialization” is required for space and garbage collection efficiency (but creates higher CPU load) – which makes sense consider the large number of RDDs which get discarded in a streaming app So what does Data Bricks actually recommend as Object Oriented model for RDD elements used in Spark Streaming apps – flat or not and can you provide a detailed description / spec of both From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Thursday, April 16, 2015 7:23 PM To: Evo Eftimov Cc: Christian Perez; user Subject: Re: Super slow caching in 1.3? Here are the types that we specialize, other types will be much slower. This is only for Spark SQL, normal RDDs do not serialize data that is cached. I'll also not that until yesterday we were missing FloatType https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala#L154 Christian, can you provide the schema of the fast and slow datasets? On Thu, Apr 16, 2015 at 10:14 AM, Evo Eftimov evo.efti...@isecc.com wrote: Michael what exactly do you mean by flattened version/structure here e.g.: 1. An Object with only primitive data types as attributes 2. An Object with no more than one level of other Objects as attributes 3. An Array/List of primitive types 4. An Array/List of Objects This question is in general about RDDs not necessarily RDDs in the context of SparkSQL When answering can you also score how bad the performance of each of the above options is -Original Message- From: Christian Perez [mailto:christ...@svds.com] Sent: Thursday, April 16, 2015 6:09 PM To: Michael Armbrust Cc: user Subject: Re: Super slow caching in 1.3? Hi Michael, Good question! We checked 1.2 and found that it is also slow cacheing the same flat parquet file. Caching other file formats of the same data were faster by up to a factor of ~2. Note that the parquet file was created in Impala but the other formats were written by Spark SQL. Cheers, Christian On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote: Do you think you are seeing a regression from 1.2? Also, are you caching nested data or flat rows? The in-memory caching is not really designed for nested data and so performs pretty slowly here (its just falling back to kryo and even then there are some locking issues). If so, would it be possible to try caching a flattened version? CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote: Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Change ivy cache for spark on Windows
Hi, I'm having trouble using the --packages option for spark-shell.cmd - I have to use Windows at work and have been issued a username with a space in it that means when I use the --packages option it fails with this message: Exception in thread main java.net.URISyntaxException: Illegal character in path at index 13: C:/Users/My Name/.ivy2/jars/spark-csv_2.10.jar The command I'm trying to run is: .\spark-shell.cmd --packages com.databricks:spark-csv_2.10:1.0.3 I've tried creating an ivysettings.xml file with the content below in my .ivy2 directory, but spark doesn't seem to pick it up. Does anyone have any ideas of how to get around this issue? ivysettings caches defaultCacheDir=c:\ivy_cache/ /ivysettings -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Change-ivy-cache-for-spark-on-Windows-tp22675.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SQL][1.3.1][JAVA]Use UDF in DataFrame join
Hi All, I want to ask how to use UDF when I use join function on DataFrame. It looks like always give me the cannot solve the column name error. Anyone can give me an example on how to run this in java? My code is like: edmData.join(yb_lookup, edmData.col(year(YEARBUILT)).equalTo(yb_lookup.col(yb_class_vdm))); But it won't work in java. I understand col function should only take columname, but there should be a way to specific some simple expression in join statement? Regards, Shuai
Re: How to distribute Spark computation recipes
The code themselves are the recipies, no? On Mon, Apr 27, 2015 at 2:49 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, I know that any RDD is related to its SparkContext and the associated variables (broadcast, accumulators), but I'm looking for a way to serialize/deserialize full RDD computations ? @rxin Spark SQL is, in a way, already doing this but the parsers are private[sql], is there any way to reuse this work to get Logical/Physical Plans in out of Spark ? Regards, Olivier.
[SQL][1.3.1][JAVA] UDF in java cause Task not serializable
Hi All, Basically I try to define a simple UDF and use it in the query, but it gives me Task not serializable public void test() { RiskGroupModelDefinition model = registeredRiskGroupMap.get(this.modelId); RiskGroupModelDefinition edm = this.createEdm(); JavaSparkContext ctx = this.createSparkContext(); SQLContext sql = new SQLContext(ctx); sql.udf().register(year, new UDF1Date, Integer() { @Override public Integer call(Date d) throws Exception { return d.getYear(); } }, new org.apache.spark.sql.types.IntegerType()); /** Retrieve all tables for EDM */ DataFrame property = sql.parquetFile(edm.toS3nPath(property)).filter(ISVALID = 1); property.registerTempTable(p); DataFrame yb_lookup = sql.parquetFile(model.toS3nPath(yb_lookup)).as(yb); yb_lookup.registerTempTable(yb); sql.sql(select * from p left join yb on year(p.YEARBUILT)=yb.yb_class_vdm).count(); ctx.stop(); } If I remove the UDF, just use p.YEARBUILT=yb.yb_class_vdm, the sql runs without any problem. But after I add the UDF to the query (just as above code), the exception as below: Exception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Aggregate false, [], [Coalesce(SUM(PartialCount#43L),0) AS count#41L] Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#43L] Project [] HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter, None Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200) Project [YEARBUILT#14] Filter ISVALID#18 PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at newParquet.scala:573 Exchange (HashPartitioning [yb_class_vdm#40L], 200) PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at newParquet.scala:573 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887) at org.apache.spark.sql.DataFrame.count(DataFrame.scala:899) at com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.test(Vulna bilityEncodeExecutor.java:137) at com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.main(Vulna bilityEncodeExecutor.java:488) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#43L] Project [] HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter, None Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200) Project [YEARBUILT#14] Filter ISVALID#18 PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at newParquet.scala:573 Exchange (HashPartitioning [yb_class_vdm#40L], 200) PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at newParquet.scala:573 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:48) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate. scala:126) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate. scala:125) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 6 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Aggregate true, [], [COUNT(1) AS PartialCount#43L] Project [] HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter, None Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200) Project [YEARBUILT#14] Filter ISVALID#18 PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at newParquet.scala:573 Exchange (HashPartitioning [yb_class_vdm#40L], 200) PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at newParquet.scala:573 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc ala:101) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc ala:49) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 10 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange
RE: Group by order by
Thanks, it should be “select id, time, min(x1), min(x2), … from data group by id, time order by time” (“min” or other aggregate function to pick other fields) Forgot to mention that (id, time) is my primary key and I took for granted that it worked in my MySQL example. Best regards, Alexander From: Richard Marscher [mailto:rmarsc...@localytics.com] Sent: Monday, April 27, 2015 12:47 PM To: Ulanov, Alexander Cc: user@spark.apache.org Subject: Re: Group by order by It's not related to Spark, but the concept of what you are trying to do with the data. Grouping by ID means consolidating data for each ID down to 1 row per ID. You can sort by time after this point yes, but you would need to either take each ID and time value pair OR do some aggregate operation on the time. That's what the error message is explaining. Maybe you can describe what you want your results to look like? Here is some detail about the underlying operations here: Example Data: ID | Time | SomeVal 102-02-154 1 02-03-15 5 2 02-02-15 4 2 02-02-15 5 2 02-05-15 2 A. So if you do Group By ID this means 1 row per ID like below: ID 1 2 To include Time in this projection you need to aggregate it with a function to a single value. Then and only then can you use it in the projection and sort on it. SELECT id, max(time) FROM sample GROUP BY id SORT BY max(time) desc; ID | max(time) 2 02-05-15 1 02-03-15 B. Or if you do Group by ID, time then you get 1 row per ID and time pair: ID | Time 102-02-15 102-03-15 202-02-15 202-05-15 Notice both rows with ID `2` and time `02-02-15` group down to 1 row in the results here. In this case you can sort the results by time without using an aggregate function. SELECT id, time FROM sample GROUP BY id, time SORT BY time desc; ID | Time 202-05-15 102-03-15 102-02-15 202-02-15 On Mon, Apr 27, 2015 at 3:28 PM, Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote: Hi Richard, There are several values of time per id. Is there a way to perform group by id and sort by time in Spark? Best regards, Alexander From: Richard Marscher [mailto:rmarsc...@localytics.commailto:rmarsc...@localytics.com] Sent: Monday, April 27, 2015 12:20 PM To: Ulanov, Alexander Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Group by order by Hi, that error seems to indicate the basic query is not properly expressed. If you group by just ID, then that means it would need to aggregate all the time values into one value per ID, so you can't sort by it. Thus it tries to suggest an aggregate function for time so you can have 1 value per ID and properly sort it. On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote: Hi, Could you suggest what is the best way to do “group by x order by y” in Spark? When I try to perform it with Spark SQL I get the following error (Spark 1.3): val results = sqlContext.sql(select * from sample group by id order by time) org.apache.spark.sql.AnalysisException: expression 'time' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37) Is there a way to do it with just RDD? Best regards, Alexander
Re: Spark timeout issue
You need to look more deep into your worker logs, you may find GC error, IO exceptions etc if you look closely which is triggering the timeout. Thanks Best Regards On Mon, Apr 27, 2015 at 3:18 AM, Deepak Gopalakrishnan dgk...@gmail.com wrote: Hello Patrick, Sure. I've posted this on user as well. Will be cool to get a response. Thanks Deepak On Mon, Apr 27, 2015 at 2:58 AM, Patrick Wendell pwend...@gmail.com wrote: Hi Deepak - please direct this to the user@ list. This list is for development of Spark itself. On Sun, Apr 26, 2015 at 12:42 PM, Deepak Gopalakrishnan dgk...@gmail.com wrote: Hello All, I'm trying to process a 3.5GB file on standalone mode using spark. I could run my spark job succesfully on a 100MB file and it works as expected. But, when I try to run it on the 3.5GB file, I run into the below error : 15/04/26 12:45:50 INFO BlockManagerMaster: Updated info of block taskresult_83 15/04/26 12:46:46 WARN AkkaUtils: Error sending message [message = Heartbeat(2,[Lscala.Tuple2;@790223d3,BlockManagerId(2, master.spark.com, 39143))] in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) 15/04/26 12:47:15 INFO MemoryStore: ensureFreeSpace(26227673) called with curMem=265897, maxMem=5556991426 15/04/26 12:47:15 INFO MemoryStore: Block taskresult_92 stored as bytes in memory (estimated size 25.0 MB, free 5.2 GB) 15/04/26 12:47:16 INFO MemoryStore: ensureFreeSpace(26272879) called with curMem=26493570, maxMem=5556991426 15/04/26 12:47:16 INFO MemoryStore: Block taskresult_94 stored as bytes in memory (estimated size 25.1 MB, free 5.1 GB) 15/04/26 12:47:18 INFO MemoryStore: ensureFreeSpace(26285327) called with curMem=52766449, maxMem=5556991426 and the job fails. I'm on AWS and have opened all ports. Also, since the 100MB file works, it should not be a connection issue. I've a r3 xlarge and 2 m3 large. Can anyone suggest a way to fix this? -- Regards, *Deepak Gopalakrishnan* *Mobile*:+918891509774 *Skype* : deepakgk87 http://myexps.blogspot.com
Re: How to debug Spark on Yarn?
Spark 1.3 1. View stderr/stdout from executor from Web UI: when the job is running i figured out the executor that am suppose to see, and those two links show 4 special characters on browser. 2. Tail on Yarn logs: /apache/hadoop/bin/yarn logs -applicationId application_1429087638744_151059 | less Threw me: Application has not completed. Logs are only available after an application completes Any other ideas that i can try ? On Sat, Apr 25, 2015 at 12:07 AM, Sven Krasser kras...@gmail.com wrote: On Fri, Apr 24, 2015 at 11:31 AM, Marcelo Vanzin van...@cloudera.com wrote: Spark 1.3 should have links to the executor logs in the UI while the application is running. Not yet in the history server, though. You're absolutely correct -- didn't notice it until now. This is a great addition! -- www.skrasser.com http://www.skrasser.com/?utm_source=sig -- Deepak
Re: Understand the running time of SparkSQL queries
Isn't it already available on the driver UI (that runs on 4040)? Thanks Best Regards On Mon, Apr 27, 2015 at 9:55 AM, Wenlei Xie wenlei@gmail.com wrote: Hi, I am wondering how should we understand the running time of SparkSQL queries? For example the physical query plan and the running time on each stage? Is there any guide talking about this? Thank you! Best, Wenlei
Re: Question on Spark SQL performance of Range Queries on Large Datasets
The answer is it depends :) The fact that query runtime increases indicates more shuffle. You may want to construct rdds based on keys you use. You may want to specify what kind of node you are using and how many executors you are using. You may also want to play around with executor memory allocation s. Best Ayan On 27 Apr 2015 17:59, Mani man...@vt.edu wrote: Hi, I am a graduate student from Virginia Tech (USA) pursuing my Masters in Computer Science. I’ve been researching on parallel and distributed databases and their performance for running some Range queries involving simple joins and group by on large datasets. As part of my research, I tried evaluating query performance of Spark SQL on the data set that I have. It would be really great if you could please confirm on the numbers that I get from Spark SQL? Following is the type of query that am running, Table 1 - 22,000,483 records Table 2 - 10,173,311 records Query : SELECT b.x, count(b.y) FROM Table1 a, Table2 b WHERE a.y=b.y AND a.z=‘' GROUP BY b.x ORDER BY b.x Total Running Time 4 Worker Nodes:177.68s 8 Worker Nodes: 186.72s I am using Apache Spark 1.3.0 with the default configuration. Is the query running time reasonable? Is it because of non-availability of indexes increasing the query run time? Can you please clarify? Thanks Mani Graduate Student, Department of Computer Science Virginia Tech
A problem of using spark streaming to capture network packets
Hi Everyone We use pcap4j to capture network packets and then use spark streaming to analyze captured packets. However, we met a strange problem. If we run our application on spark locally (for example, spark-submit --master local[2]), then the program runs successfully. If we run our application on spark standalone cluster, then the program will tell us that NO NIFs found. I also attach two test files for clarification. So anyone can help on this? Thanks in advance! (See attached file: PcapReceiver.java)(See attached file: TestPcapSpark.java) Best regards, - Haishan Haishan Wu (吴海珊) IBM Research - China Tel: 86-10-58748508 Fax: 86-10-58748330 Email: wuh...@cn.ibm.com Lotus Notes: Hai Shan Wu/China/IBM PcapReceiver.java Description: Binary data TestPcapSpark.java Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to debug Spark on Yarn?
1) Application container logs from Web RM UI never load on browser. I eventually have to kill the browser. 2) /apache/hadoop/bin/yarn logs -applicationId application_1429087638744_151059 | less emits logs only after the application has completed. Are there no better ways to see the logs as they are emitted. Something similar to hadoop world ? On Mon, Apr 27, 2015 at 1:58 PM, Zoltán Zvara zoltan.zv...@gmail.com wrote: You can check container logs from RM web UI or when log-aggregation is enabled with the yarn command. There are other, but less convenient options. On Mon, Apr 27, 2015 at 8:53 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Spark 1.3 1. View stderr/stdout from executor from Web UI: when the job is running i figured out the executor that am suppose to see, and those two links show 4 special characters on browser. 2. Tail on Yarn logs: /apache/hadoop/bin/yarn logs -applicationId application_1429087638744_151059 | less Threw me: Application has not completed. Logs are only available after an application completes Any other ideas that i can try ? On Sat, Apr 25, 2015 at 12:07 AM, Sven Krasser kras...@gmail.com wrote: On Fri, Apr 24, 2015 at 11:31 AM, Marcelo Vanzin van...@cloudera.com wrote: Spark 1.3 should have links to the executor logs in the UI while the application is running. Not yet in the history server, though. You're absolutely correct -- didn't notice it until now. This is a great addition! -- www.skrasser.com http://www.skrasser.com/?utm_source=sig -- Deepak -- Deepak
Question on Spark SQL performance of Range Queries on Large Datasets
Hi, I am a graduate student from Virginia Tech (USA) pursuing my Masters in Computer Science. I’ve been researching on parallel and distributed databases and their performance for running some Range queries involving simple joins and group by on large datasets. As part of my research, I tried evaluating query performance of Spark SQL on the data set that I have. It would be really great if you could please confirm on the numbers that I get from Spark SQL? Following is the type of query that am running, Table 1 - 22,000,483 records Table 2 - 10,173,311 records Query : SELECT b.x, count(b.y) FROM Table1 a, Table2 b WHERE a.y=b.y AND a.z=‘' GROUP BY b.x ORDER BY b.x Total Running Time 4 Worker Nodes:177.68s 8 Worker Nodes: 186.72s I am using Apache Spark 1.3.0 with the default configuration. Is the query running time reasonable? Is it because of non-availability of indexes increasing the query run time? Can you please clarify? Thanks Mani Graduate Student, Department of Computer Science Virginia Tech
Re: How to debug Spark on Yarn?
You can check container logs from RM web UI or when log-aggregation is enabled with the yarn command. There are other, but less convenient options. On Mon, Apr 27, 2015 at 8:53 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Spark 1.3 1. View stderr/stdout from executor from Web UI: when the job is running i figured out the executor that am suppose to see, and those two links show 4 special characters on browser. 2. Tail on Yarn logs: /apache/hadoop/bin/yarn logs -applicationId application_1429087638744_151059 | less Threw me: Application has not completed. Logs are only available after an application completes Any other ideas that i can try ? On Sat, Apr 25, 2015 at 12:07 AM, Sven Krasser kras...@gmail.com wrote: On Fri, Apr 24, 2015 at 11:31 AM, Marcelo Vanzin van...@cloudera.com wrote: Spark 1.3 should have links to the executor logs in the UI while the application is running. Not yet in the history server, though. You're absolutely correct -- didn't notice it until now. This is a great addition! -- www.skrasser.com http://www.skrasser.com/?utm_source=sig -- Deepak
Re: ReduceByKey and sorting within partitions
Hi Marco, As I know, current combineByKey() does not expose the related argument where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is package private, if you can get the ShuffledRDD through reflection or other way, the keyOrdering you set will be pushed down to shuffle. If you use a combination of transformations to do it, the result will be same but the efficiency may be different, some transformations will separate into different stages, which will introduce additional shuffle. Thanks Jerry 2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com: Hi, I'm trying, after reducing by key, to get data ordered among partitions (like RangePartitioner) and within partitions (like sortByKey or repartitionAndSortWithinPartition) pushing the sorting down to the shuffles machinery of the reducing phase. I think, but maybe I'm wrong, that the correct way to do that is that combineByKey call setKeyOrdering function on the ShuflleRDD that it returns. Am I wrong? Can be done by a combination of other transformations with the same efficiency? Thanks, Marco - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-defaults.conf
Thanks. I've set SPARK_HOME and SPARK_CONF_DIR appropriately in .bash_profile But when I start worker like this spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh I still get failed to launch org.apache.spark.deploy.worker.Worker: Default is conf/spark-defaults.conf. 15/04/27 11:51:33 DEBUG Utils: Shutdown hook called On Mon, Apr 27, 2015 at 1:15 PM, Zoltán Zvara zoltan.zv...@gmail.com wrote: You should distribute your configuration file to workers and set the appropriate environment variables, like HADOOP_HOME, SPARK_HOME, HADOOP_CONF_DIR, SPARK_CONF_DIR. On Mon, Apr 27, 2015 at 12:56 PM James King jakwebin...@gmail.com wrote: I renamed spark-defaults.conf.template to spark-defaults.conf and invoked spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh But I still get failed to launch org.apache.spark.deploy.worker.Worker: --properties-file FILE Path to a custom Spark properties file. Default is conf/spark-defaults.conf. But I'm thinking it should pick up the default spark-defaults.conf from conf dir Am I expecting or doing something wrong? Regards jk
Bigints in pyspark
hi all, I have just come across a problem where I have a table that has a few bigint columns, it seems if I read that table into a dataframe then collect it in pyspark, the bigints are stored and integers in python. (The problem is if I write it back to another table, I detect the hive type programmatically from the python type, so it turns those columns to integers) Is that intended this way or a bug? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bigints-in-pyspark-tp22668.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL UDF returning object of case class; regression from 1.2.0
A short update: eventually we manually upgraded to 1.3.1 and the problem fixed. On Apr 26, 2015 2:26 PM, Ophir Cohen oph...@gmail.com wrote: I happened to hit the following issue that prevents me from using UDFs with case classes: https://issues.apache.org/jira/browse/SPARK-6054. The issue already fixed for 1.3.1 but we are working on Amazon and it looks that Amazon provide deployment of Spark 1.3.1 using their scripts. Did someone encounter the issue? Any suggestion will be happily taken, either for a workaround or for a way to deploy Spark 1.3.1 on EMR Thanks, Ophir
Exception in using updateStateByKey
Hi, all: I use function updateStateByKey in Spark Streaming, I need to store the states for one minite, I set spark.cleaner.ttl to 120, the duration is 2 seconds, but it throws Exception Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042) at org.apache.hadoop.ipc.Client.call(Client.java:1347) at org.apache.hadoop.ipc.Client.call(Client.java:1300) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) Why? my code is ssc = StreamingContext(sc,2) kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1}) kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\ .filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \ .filter(lambda x: x[1]['isExisted'] != 1) \ .foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))
Re: Exception in using updateStateByKey
Which hadoop release are you using ? Can you check hdfs audit log to see who / when deleted spark/ck/hdfsaudit/ receivedData/0/log-1430139541443-1430139601443 ? Cheers On Mon, Apr 27, 2015 at 6:21 AM, Sea 261810...@qq.com wrote: Hi, all: I use function updateStateByKey in Spark Streaming, I need to store the states for one minite, I set spark.cleaner.ttl to 120, the duration is 2 seconds, but it throws Exception Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042) at org.apache.hadoop.ipc.Client.call(Client.java:1347) at org.apache.hadoop.ipc.Client.call(Client.java:1300) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) Why? my code is ssc = StreamingContext(sc,2) kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1}) kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\ .filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \ .filter(lambda x: x[1]['isExisted'] != 1) \ .foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))
Re: directory loader in windows
This a hadoop-side stack trace it looks like the code is trying to get the filesystem permissions by running %HADOOP_HOME%\bin\WINUTILS.EXE ls -F and something is triggering a null pointer exception. There isn't any HADOOP- JIRA with this specific stack trace in it, so it's not a known/fixed problem. At a guess, your environment HADOOP_HOME environment variable isn't point to the right place. If that's the case there should have been a warning in the logs Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : java.lang.NullPointerException at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) at org.apache.hadoop.util.Shell.execCommand(Shell.java:808) at org.apache.hadoop.util.Shell.execCommand(Shell.java:791) at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097) at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:582) at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:557) at org.apache.hadoop.fs.LocatedFileStatus.init(LocatedFileStatus.java:42) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1699) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1681) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:268) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) at org.apache.spark.rdd.RDD.collect(RDD.scala:813) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:374) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) -- Best Regards, Ayan Guha
?????? Exception in using updateStateByKey
I make it to 240, it happens again when 240 seconds is reached. -- -- ??: 261810726;261810...@qq.com; : 2015??4??27??(??) 10:24 ??: Ted Yuyuzhih...@gmail.com; : ?? Exception in using updateStateByKey Yes??I can make it larger, but I also want to know whether there is a formula to estimate it -- -- ??: Ted Yu;yuzhih...@gmail.com; : 2015??4??27??(??) 10:20 ??: Sea261810...@qq.com; : Re: Exception in using updateStateByKey Can you make the value for spark.cleaner.ttl larger ?Cheers On Mon, Apr 27, 2015 at 7:13 AM, Sea 261810...@qq.com wrote: my hadoop version is 2.2.0?? the hdfs-audit.log is too large?? The problem is that?? when the checkpoint info is deleted(it depends on ??spark.cleaner.ttl??)??it will throw this exception?? - -- ??: Ted Yu;yuzhih...@gmail.com; : 2015??4??27??(??) 9:55 ??: Sea261810...@qq.com; : useruser@spark.apache.org; : Re: Exception in using updateStateByKey Which hadoop release are you using ? Can you check hdfs audit log to see who / when deleted spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 ? Cheers On Mon, Apr 27, 2015 at 6:21 AM, Sea 261810...@qq.com wrote: Hi, all: I use function updateStateByKey in Spark Streaming, I need to store the states for one minite, I set spark.cleaner.ttl to 120, the duration is 2 seconds, but it throws Exception Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042) at org.apache.hadoop.ipc.Client.call(Client.java:1347) at org.apache.hadoop.ipc.Client.call(Client.java:1300) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) Why? my code is ssc = StreamingContext(sc,2) kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1}) kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\ .filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \ .filter(lambda x: x[1]['isExisted'] != 1) \ .foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))
RE: Slower performance when bigger memory?
Thanks. So may I know what is your configuration for more/smaller executors on r3.8xlarge, how big of the memory that you eventually decide to give one executor without impact performance (for example: 64g? ). From: Sven Krasser [mailto:kras...@gmail.com] Sent: Friday, April 24, 2015 1:59 PM To: Dean Wampler Cc: Shuai Zheng; user@spark.apache.org Subject: Re: Slower performance when bigger memory? FWIW, I ran into a similar issue on r3.8xlarge nodes and opted for more/smaller executors. Another observation was that one large executor results in less overall read throughput from S3 (using Amazon's EMRFS implementation) in case that matters to your application. -Sven On Thu, Apr 23, 2015 at 10:18 AM, Dean Wampler deanwamp...@gmail.com wrote: JVM's often have significant GC overhead with heaps bigger than 64GB. You might try your experiments with configurations below this threshold. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 23, 2015 at 12:14 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am running some benchmark on r3*8xlarge instance. I have a cluster with one master (no executor on it) and one slave (r3*8xlarge). My job has 1000 tasks in stage 0. R3*8xlarge has 244G memory and 32 cores. If I create 4 executors, each has 8 core+50G memory, each task will take around 320s-380s. And if I only use one big executor with 32 cores and 200G memory, each task will take 760s-900s. And I check the log, looks like the minor GC takes much longer when using 200G memory: 285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)] 38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95 sys=120.65, real=11.25 secs] And when it uses 50G memory, the minor GC takes only less than 1s. I try to see what is the best way to configure the Spark. For some special reason, I tempt to use a bigger memory on single executor if no significant penalty on performance. But now looks like it is? Anyone has any idea? Regards, Shuai -- www.skrasser.com http://www.skrasser.com/?utm_source=sig
Re: Super slow caching in 1.3?
I face the similar issue in Spark 1.2. Cache the schema RDD takes about 50s for 400MB data. The schema is similar to the TPC-H LineItem. Here is the code I tried the cache. I am wondering if there is any setting missing? Thank you so much! lineitemSchemaRDD.registerTempTable(lineitem); sqlContext.sqlContext().cacheTable(lineitem); System.out.println(lineitemSchemaRDD.count()); On Mon, Apr 6, 2015 at 8:00 PM, Christian Perez christ...@svds.com wrote: Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Wenlei Xie (谢文磊) Ph.D. Candidate Department of Computer Science 456 Gates Hall, Cornell University Ithaca, NY 14853, USA Email: wenlei@gmail.com
Why Spark is much faster than Hadoop MapReduce even on disk
Hi, I am frequently asked why spark is also much faster than Hadoop MapReduce on disk (without the use of memory cache). I have no convencing answer for this question, could you guys elaborate on this? Thanks!
Re: Why Spark is much faster than Hadoop MapReduce even on disk
I believe the typical answer is that Spark is actually a bit slower. On Mon, Apr 27, 2015 at 7:34 PM bit1...@163.com bit1...@163.com wrote: Hi, I am frequently asked why spark is also much faster than Hadoop MapReduce on disk (without the use of memory cache). I have no convencing answer for this question, could you guys elaborate on this? Thanks! --
New JIRA - [SQL] Can't remove columns from DataFrame or save DataFrame from a join due to duplicate columns
https://issues.apache.org/jira/browse/SPARK-7182 Can anyone suggest a workaround for the above issue? Thanks. -Don -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ http://www.MailLaunder.com/
RE: Scalability of group by
It works on a smaller dataset of 100 rows. Probably I could find the size when it fails using binary search. However, it would not help me because I need to work with 2B rows. From: ayan guha [mailto:guha.a...@gmail.com] Sent: Monday, April 27, 2015 6:58 PM To: Ulanov, Alexander Cc: user@spark.apache.org Subject: Re: Scalability of group by Hi Can you test on a smaller dataset to identify if it is cluster issue or scaling issue in spark On 28 Apr 2015 11:30, Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote: Hi, I am running a group by on a dataset of 2B of RDD[Row [id, time, value]] in Spark 1.3 as follows: “select id, time, first(value) from data group by id, time” My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor is allocated with 5GB of memory. However, all executors are being lost during the query execution and I get “ExecutorLostFailure”. Could you suggest what might be the reason for it? Could it be that “group by” is implemented as RDD.groupBy so it holds the group by result in memory? What is the workaround? Best regards, Alexander
Re: Automatic Cache in SparkSQL
Spark keeps job in memory by default for kind of performance gains you are seeing. Additionally depending on your query spark runs stages and any point of time spark's code behind the scene may issue explicit cache. If you hit any such scenario you will find those cached objects in UI under storage. Note if caching is done by spark it may be transient. On 28 Apr 2015 08:00, Wenlei Xie wenlei@gmail.com wrote: Hi, I am trying to answer a simple query with SparkSQL over the Parquet file. When execute the query several times, the first run will take about 2s while the later run will take 0.1s. By looking at the log file it seems the later runs doesn't load the data from disk. However, I didn't enable any cache explicitly. Is there any automatic cache used by SparkSQL? Is there anyway to check this? Thank you? Best, Wenlei
Re: Re: Why Spark is much faster than Hadoop MapReduce even on disk
Is it? I learned somewhere else that spark's speed is 5~10 times faster than Hadoop MapReduce. bit1...@163.com From: Ilya Ganelin Date: 2015-04-28 10:55 To: bit1...@163.com; user Subject: Re: Why Spark is much faster than Hadoop MapReduce even on disk I believe the typical answer is that Spark is actually a bit slower. On Mon, Apr 27, 2015 at 7:34 PM bit1...@163.com bit1...@163.com wrote: Hi, I am frequently asked why spark is also much faster than Hadoop MapReduce on disk (without the use of memory cache). I have no convencing answer for this question, could you guys elaborate on this? Thanks!
Spark 1.3.1 JavaStreamingContext - fileStream compile error
Hi Forum I am facing below compile error when using the fileStream method of the JavaStreamingContext class. I have copied the code from JavaAPISuite.java test class of spark test code. Please help me to find a solution for this. http://apache-spark-user-list.1001560.n3.nabble.com/file/n22679/47.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-JavaStreamingContext-fileStream-compile-error-tp22679.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Scalability of group by
Hi Can you test on a smaller dataset to identify if it is cluster issue or scaling issue in spark On 28 Apr 2015 11:30, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi, I am running a group by on a dataset of 2B of RDD[Row [id, time, value]] in Spark 1.3 as follows: “select id, time, first(value) from data group by id, time” My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor is allocated with 5GB of memory. However, all executors are being lost during the query execution and I get “ExecutorLostFailure”. Could you suggest what might be the reason for it? Could it be that “group by” is implemented as RDD.groupBy so it holds the group by result in memory? What is the workaround? Best regards, Alexander
java.lang.UnsupportedOperationException: empty collection
I am running following code on Spark 1.3.0. It is from https://spark.apache.org/docs/1.3.0/ml-guide.html On running val model1 = lr.fit(training.toDF) I get java.lang.UnsupportedOperationException: empty collection what could be the reason? import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.param.ParamMap import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.sql.{Row, SQLContext} val conf = new SparkConf().setAppName(SimpleParamsExample) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // Prepare training data. // We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of case classes // into DataFrames, where it uses the case class metadata to infer the schema. val training = sc.parallelize(Seq( LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)), LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)), LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5 // Create a LogisticRegression instance. This instance is an Estimator. val lr = new LogisticRegression() // Print out the parameters, documentation, and any default values. println(LogisticRegression parameters:\n + lr.explainParams() + \n) // We may set parameters using setter methods. lr.setMaxIter(10) .setRegParam(0.01) // Learn a LogisticRegression model. This uses the parameters stored in lr. *val model1 = lr.fit(training.toDF)* *Some more information:* scala training.toDF res26: org.apache.spark.sql.DataFrame = [label: double, features: vecto] scala training.toDF.collect() res27: Array[org.apache.spark.sql.Row] = Array([1.0,[0.0,1.1,0.1]], [0.0,[2.0,1.0,-1.0]], [0.0,[2.0,1.3,1.0]], [1.0,[0.0,1.2,-0.5]]) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-UnsupportedOperationException-empty-collection-tp22677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
1.3.1: Persisting RDD in parquet - Conflicting partition column names
Hi I am getting the following error when persisting an RDD in parquet format to an S3 location. This is code that was working in the 1.2 version. The version that it is failing to work is 1.3.1. Any help is appreciated. Caused by: java.lang.AssertionError: assertion failed: Conflicting partition column names detected: ArrayBuffer(batch_id) ArrayBuffer() at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.sql.parquet.ParquetRelation2$.resolvePartitions(newParquet.scala:933) at org.apache.spark.sql.parquet.ParquetRelation2$.parsePartitions(newParquet.scala:851) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$7.apply(newParquet.scala:311) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$7.apply(newParquet.scala:303) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:303) at org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:692) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196) at org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:995) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/1-3-1-Persisting-RDD-in-parquet-Conflicting-partition-column-names-tp22678.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why Spark is much faster than Hadoop MapReduce even on disk
http://www.datascienceassn.org/content/making-sense-making-sense-performance-data-analytics-frameworks From: bit1...@163.com bit1...@163.com To: user user@spark.apache.org Sent: Monday, April 27, 2015 8:33 PM Subject: Why Spark is much faster than Hadoop MapReduce even on disk #yiv1713360705 body {line-height:1.5;}#yiv1713360705 body {font-size:10.5pt;color:rgb(0, 0, 0);line-height:1.5;}Hi, I am frequently asked why spark is also much faster than Hadoop MapReduce on disk (without the use of memory cache). I have no convencing answer for this question, could you guys elaborate on this? Thanks!
Re: Spark Cluster Setup
Similar to what Dean called out, we build Puppet manifests so we could do the automation - its a bit of work to setup, but well worth the effort. On Fri, Apr 24, 2015 at 11:27 AM Dean Wampler deanwamp...@gmail.com wrote: It's mostly manual. You could try automating with something like Chef, of course, but there's nothing already available in terms of automation. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Apr 24, 2015 at 10:33 AM, James King jakwebin...@gmail.com wrote: Thanks Dean, Sure I have that setup locally and testing it with ZK. But to start my multiple Masters do I need to go to each host and start there or is there a better way to do this. Regards jk On Fri, Apr 24, 2015 at 5:23 PM, Dean Wampler deanwamp...@gmail.com wrote: The convention for standalone cluster is to use Zookeeper to manage master failover. http://spark.apache.org/docs/latest/spark-standalone.html Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Apr 24, 2015 at 5:01 AM, James King jakwebin...@gmail.com wrote: I'm trying to find out how to setup a resilient Spark cluster. Things I'm thinking about include: - How to start multiple masters on different hosts? - there isn't a conf/masters file from what I can see Thank you.
Driver ID from spark-submit
Hello, I am trying to use the default Spark cluster manager in a production environment. I will be submitting jobs with spark-submit. I wonder if the following is possible: 1. Get the Driver ID from spark-submit. We will use this ID to keep track of the job and kill it if necessary. 2. Weather it is possible to run spark-submit in a mode where it ends and returns control to the user immediately after the job is submitted. Thanks! Rares
?????? Exception in using updateStateByKey
Maybe I found the solution??do not set 'spark.cleaner.ttl', just use function 'remember' in StreamingContext to set the rememberDuration. -- -- ??: Ted Yu;yuzhih...@gmail.com; : 2015??4??27??(??) 10:20 ??: Sea261810...@qq.com; : Re: Exception in using updateStateByKey Can you make the value for spark.cleaner.ttl larger ?Cheers On Mon, Apr 27, 2015 at 7:13 AM, Sea 261810...@qq.com wrote: my hadoop version is 2.2.0?? the hdfs-audit.log is too large?? The problem is that?? when the checkpoint info is deleted(it depends on ??spark.cleaner.ttl??)??it will throw this exception?? - -- ??: Ted Yu;yuzhih...@gmail.com; : 2015??4??27??(??) 9:55 ??: Sea261810...@qq.com; : useruser@spark.apache.org; : Re: Exception in using updateStateByKey Which hadoop release are you using ? Can you check hdfs audit log to see who / when deleted spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 ? Cheers On Mon, Apr 27, 2015 at 6:21 AM, Sea 261810...@qq.com wrote: Hi, all: I use function updateStateByKey in Spark Streaming, I need to store the states for one minite, I set spark.cleaner.ttl to 120, the duration is 2 seconds, but it throws Exception Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042) at org.apache.hadoop.ipc.Client.call(Client.java:1347) at org.apache.hadoop.ipc.Client.call(Client.java:1300) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) Why? my code is ssc = StreamingContext(sc,2) kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1}) kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\ .filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \ .filter(lambda x: x[1]['isExisted'] != 1) \ .foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))
Spark JDBC data source API issue with mysql
Hi, I have been trying out spark data source api with JDBC. The following is the code to get DataFrame, Try(hc.load(org.apache.spark.sql.jdbc,Map(url - dbUrl,dbtable-s($ query) ))) By looking at test cases, I found that query has to be inside brackets, otherwise it's treated as table name. But with when used with MySQL, query inside the ( ) is treated as derived table which is throwing exception. Is this the right way to pass the queries to jdbc source or am I missing something? Regards, Madhukara Phatak http://datamantra.io/
Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?
I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple mirrors and direct link. Each time i untar i get below error spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar: (Empty error message) tar: Error exit delayed from previous errors Is it broken ? -- Deepak
RE: ReduceByKey and sorting within partitions
Marco - why do you want data sorted both within and across partitions? If you need to take an ordered sequence across all your data you need to either aggregate your RDD on the driver and sort it, or use zipWithIndex to apply an ordered index to your data that matches the order it was stored on HDFS. You can then get the data in order by filtering based on that index. Let me know if that's not what you need - thanks! Sent with Good (www.good.com) -Original Message- From: Marco [marcope...@gmail.commailto:marcope...@gmail.com] Sent: Monday, April 27, 2015 07:01 AM Eastern Standard Time To: user@spark.apache.org Subject: ReduceByKey and sorting within partitions Hi, I'm trying, after reducing by key, to get data ordered among partitions (like RangePartitioner) and within partitions (like sortByKey or repartitionAndSortWithinPartition) pushing the sorting down to the shuffles machinery of the reducing phase. I think, but maybe I'm wrong, that the correct way to do that is that combineByKey call setKeyOrdering function on the ShuflleRDD that it returns. Am I wrong? Can be done by a combination of other transformations with the same efficiency? Thanks, Marco - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
data locality in spark
Hi guys, I am running some SQL queries, but all my tasks are reported as either NODE_LOCAL or PROCESS_LOCAL. In case of Hadoop world, the reduce tasks are RACK or NON_RACK LOCAL because they have to aggregate data from multiple hosts. However, in Spark even the aggregation stages are reported as NODE/PROCESS LOCAL. Do I miss something, or why the reduce-like tasks are still NODE/PROCESS LOCAL ? Thanks,Robert
RE: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?
What command are you using to untar? Are you running out of disk space? Sent with Good (www.good.com) -Original Message- From: ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.commailto:deepuj...@gmail.com] Sent: Monday, April 27, 2015 11:44 AM Eastern Standard Time To: user Subject: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ? I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple mirrors and direct link. Each time i untar i get below error spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar: (Empty error message) tar: Error exit delayed from previous errors Is it broken ? -- Deepak The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Timeout Error
Hello All, I dug a little deeper and found this error : 15/04/27 16:05:39 WARN TransportChannelHandler: Exception in connection from /10.1.0.90:40590 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/04/27 16:05:39 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=45314884029, chunkIndex=0}, buffer=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=26227673 cap=26227673]}} to /10.1.0.90:40590; closing connection java.nio.channels.ClosedChannelException 15/04/27 16:05:39 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=8439869725098873668, response=[B@1bdcdf63} to /10.1.0.90:40590; closing connection java.nio.channels.ClosedChannelException 15/04/27 16:05:39 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@master.spark.com:60802] - [akka.tcp://sparkdri...@master.spark.com:37195] disassociated! Shutting down. 15/04/27 16:05:39 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkdri...@master.spark.com:37195] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. On Mon, Apr 27, 2015 at 8:35 AM, Shixiong Zhu zsxw...@gmail.com wrote: The configuration key should be spark.akka.askTimeout for this timeout. The time unit is seconds. Best Regards, Shixiong(Ryan) Zhu 2015-04-26 15:15 GMT-07:00 Deepak Gopalakrishnan dgk...@gmail.com: Hello, Just to add a bit more context : I have done that in the code, but I cannot see it change from 30 seconds in the log. .set(spark.executor.memory, 10g) .set(spark.driver.memory, 20g) .set(spark.akka.timeout,6000) PS : I understand that 6000 is quite large, but I'm just trying to see if it actually changes Here is the command that I'm running sudo MASTER=spark://master.spark.com:7077 /opt/spark/spark-1.3.0-bin-hadoop2.4/bin/spark-submit --class class-name --executor-memory 20G --driver-memory 10G --deploy-mode client --conf spark.akka.timeout=6000 --conf spark.akka.askTimeout=6000 jar file path and here is how I load the file JavaPairRDDString, String learningRdd=sc.wholeTextFiles(filePath,10); Thanks On Mon, Apr 27, 2015 at 3:36 AM, Bryan Cutler cutl...@gmail.com wrote: I'm not sure what the expected performance should be for this amount of data, but you could try to increase the timeout with the property spark.akka.timeout to see if that helps. Bryan On Sun, Apr 26, 2015 at 6:57 AM, Deepak Gopalakrishnan dgk...@gmail.com wrote: Hello All, I'm trying to process a 3.5GB file on standalone mode using spark. I could run my spark job succesfully on a 100MB file and it works as expected. But, when I try to run it on the 3.5GB file, I run into the below error : 15/04/26 12:45:50 INFO BlockManagerMaster: Updated info of block taskresult_83 15/04/26 12:46:46 WARN AkkaUtils: Error sending message [message = Heartbeat(2,[Lscala.Tuple2;@790223d3,BlockManagerId(2, master.spark.com, 39143))] in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) 15/04/26 12:47:15 INFO MemoryStore: ensureFreeSpace(26227673) called with curMem=265897, maxMem=5556991426 15/04/26
Re: What is difference btw reduce fold?
Hi Q, fold and reduce both aggregate over a collection by implementing an operation you specify, the major different is the starting point of the aggregation. For fold(), you have to specify the starting value, and for reduce() the starting value is the first (or possibly an arbitrary) element in the collection. Simple examples - we can sum the numbers in a collection using both functions: (1 until 10).reduce( (a,b) = a+b ) (1 until 10).fold(0)( (a,b) = a+b ) With fold, we want to start at 0 and cumulatively add each element. In this case, the operation passed to fold() and reduce() were very similar, but it is helpful to think about fold in the following way. For the operation we pass to fold(), imagine its two arguments are (i) the current accumulated value and (ii) the next value in the collection, (1 until 10).fold(0)( (accumulated_so_far, next_value) = accumulated_so_far + next_value ). So the result of the operation, accumulated_so_far + next_value, will be passed to the operation again as the first argument, and so on. In this way, we could count the number of elements in a collection using fold, (1 until 10).fold(0)( (accumulated_so_far, next_value) = accumulated_so_far + 1 ). When it comes to Spark, here’s another thing to keep in mind. For both reduce and fold, you need to make sure your operation is both commutative and associative. For RDDs, reduce and fold are implemented on each partition separately, and then the results are combined using the operation. With fold, this could get you into trouble because an empty partition will emit fold’s starting value, so the number of partitions might erroneously affect the result of the calculation, if you’re not careful about the operation. This would occur with the ( (a,b) = a+1) operation from above (see http://stackoverflow.com/questions/29150202/pyspark-fold-method-output). Hope this helps. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-difference-btw-reduce-fold-tp22653p22671.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Getting error running MLlib example with new cluster
How did you run the example app? Did you use spark-submit? -Xiangrui On Thu, Apr 23, 2015 at 2:27 PM, Su She suhsheka...@gmail.com wrote: Sorry, accidentally sent the last email before finishing. I had asked this question before, but wanted to ask again as I think it is now related to my pom file or project setup. Really appreciate the help! I have been trying on/off for the past month to try to run this MLlib example: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala I am able to build the project successfully. When I run it, it returns: features in spam: 8 features in ham: 7 and then freezes. According to the UI, the description of the job is count at DataValidators.scala.38. This corresponds to this line in the code: val model = lrLearner.run(trainingData) I've tried just about everything I can think of...changed numFeatures from 1 - 10,000, set executor memory to 1g, set up a new cluster, at this point I think I might have missed dependencies as that has usually been the problem in other spark apps I have tried to run. This is my pom file, that I have used for other successful spark apps. Please let me know if you think I need any additional dependencies or there are incompatibility issues, or a pom.xml that is better to use. Thank you! Cluster information: Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0) java version 1.7.0_25 Scala version: 2.10.4 hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0) project xmlns = http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://w3.org/2001/XMLSchema-instance; xsi:schemaLocation =http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd; groupId edu.berkely/groupId artifactId simple-project /artifactId modelVersion 4.0.0/modelVersion name Simple Project /name packaging jar /packaging version 1.0 /version repositories repository idcloudera/id url http://repository.cloudera.com/artifactory/cloudera-repos//url /repository repository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /repository /repositories pluginRepositories pluginRepository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /pluginRepository /pluginRepositories build plugins plugin groupIdorg.scala-tools/groupId artifactIdmaven-scala-plugin/artifactId executions execution idcompile/id goals goalcompile/goal /goals phasecompile/phase /execution execution idtest-compile/id goals goaltestCompile/goal /goals phasetest-compile/phase /execution execution phaseprocess-resources/phase goals goalcompile/goal /goals /execution /executions /plugin plugin artifactIdmaven-compiler-plugin/artifactId configuration source1.7/source target1.7/target /configuration /plugin /plugins /build dependencies dependency !--Spark dependency -- groupId org.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0-cdh5.3.0/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version2.5.0-mr1-cdh5.3.0/version /dependency dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version2.10.4/version /dependency dependency groupIdorg.scala-lang/groupId artifactIdscala-compiler/artifactId version2.10.4/version /dependency dependency groupIdcom.101tec/groupId artifactIdzkclient/artifactId version0.3/version /dependency
Re: MLlib - Collaborative Filtering - trainImplicit task size
Could you try different ranks and see whether the task size changes? We do use YtY in the closure, which should work the same as broadcast. If that is the case, it should be safe to ignore this warning. -Xiangrui On Thu, Apr 23, 2015 at 4:52 AM, Christian S. Perone christian.per...@gmail.com wrote: All these warnings come from ALS iterations, from flatMap and also from aggregate, for instance the origin of the state where the flatMap is showing these warnings (w/ Spark 1.3.0, they are also shown in Spark 1.3.1): org.apache.spark.rdd.RDD.flatMap(RDD.scala:296) org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527) scala.collection.immutable.Range.foreach(Range.scala:141) org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527) org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203) And from the aggregate: org.apache.spark.rdd.RDD.aggregate(RDD.scala:968) org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112) org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527) scala.collection.immutable.Range.foreach(Range.scala:141) org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527) org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203) On Thu, Apr 23, 2015 at 2:49 AM, Xiangrui Meng men...@gmail.com wrote: This is the size of the serialized task closure. Is stage 246 part of ALS iterations, or something before or after it? -Xiangrui On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone christian.per...@gmail.com wrote: Hi Sean, thanks for the answer. I tried to call repartition() on the input with many different sizes and it still continues to show that warning message. On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen so...@cloudera.com wrote: I think maybe you need more partitions in your input, which might make for smaller tasks? On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone christian.per...@gmail.com wrote: I keep seeing these warnings when using trainImplicit: WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB). The maximum recommended task size is 100 KB. And then the task size starts to increase. Is this a known issue ? Thanks ! -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?
Works fine for me. Make sure you're not downloading the HTML redirector page and thinking it's the archive. On Mon, Apr 27, 2015 at 11:43 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple mirrors and direct link. Each time i untar i get below error spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar: (Empty error message) tar: Error exit delayed from previous errors Is it broken ? -- Deepak - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StandardScaler failing with OOM errors in PySpark
You might need to specify driver memory in spark-submit instead of passing JVM options. spark-submit is designed to handle different deployments correctly. -Xiangrui On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar rokros...@gmail.com wrote: ok yes, I think I have narrowed it down to being a problem with driver memory settings. It looks like the application master/driver is not being launched with the settings specified: For the driver process on the main node I see -XX:MaxPermSize=128m -Xms512m -Xmx512m as options used to start the JVM, even though I specified 'spark.yarn.am.memory', '5g' 'spark.yarn.am.memoryOverhead', '2000' The info shows that these options were read: 15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with 7120 MB memory including 2000 MB overhead Is there some reason why these options are being ignored and instead starting the driver with just 512Mb of heap? On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote: the feature dimension is 800k. yes, I believe the driver memory is likely the problem since it doesn't crash until the very last part of the tree aggregation. I'm running it via pyspark through YARN -- I have to run in client mode so I can't set spark.driver.memory -- I've tried setting the spark.yarn.am.memory and overhead parameters but it doesn't seem to have an effect. Thanks, Rok On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote: What is the feature dimension? Did you set the driver memory? -Xiangrui On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote: I'm trying to use the StandardScaler in pyspark on a relatively small (a few hundred Mb) dataset of sparse vectors with 800k features. The fit method of StandardScaler crashes with Java heap space or Direct buffer memory errors. There should be plenty of memory around -- 10 executors with 2 cores each and 8 Gb per core. I'm giving the executors 9g of memory and have also tried lots of overhead (3g), thinking it might be the array creation in the aggregators that's causing issues. The bizarre thing is that this isn't always reproducible -- sometimes it actually works without problems. Should I be setting up executors differently? Thanks, Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Powered By Spark
Hi, Would you mind adding our company to the Powered By Spark page? organization name: atp URL: https://atp.io a list of which Spark components you are using: SparkSQL, MLLib, Databricks Cloud and a short description of your use case: Predictive models and learning algorithms to improve the relevance of programmatic marketing Thanks! Justin Justin Barton CTO +1 (718) 404 9272 +44 203 290 9272 atp.io | jus...@atp.io | find us https://atp.io/find-us
spark sql LEFT OUTER JOIN java.lang.ClassCastException
Hi There, I am using spark sql left out join query. The sql query is scala val test = sqlContext.sql(SELECT e.departmentID FROM employee e LEFT OUTER JOIN department d ON d.departmentId = e.departmentId).toDF() In the spark 1.3.1 its working fine, but the latest pull is give the below error 15/04/27 23:02:49 ERROR Executor: Exception in task 4.0 in stage 67.0 (TID 118)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost task 4.0 in stage 67.0 (TID 118) on executor localhost: java.lang.ClassCastException (null) [duplicate 1]15/04/27 23:02:49 ERROR Executor: Exception in task 2.0 in stage 67.0 (TID 116)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost task 2.0 in stage 67.0 (TID 116) on executor localhost: java.lang.ClassCastException (null) [duplicate 2]15/04/27 23:02:49 ERROR Executor: Exception in task 3.0 in stage 67.0 (TID 117)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost task 3.0 in stage 67.0 (TID 117) on executor localhost: java.lang.ClassCastException (null) [duplicate 3]15/04/27 23:02:49 ERROR Executor: Exception in task 0.0 in stage 66.0 (TID 112)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost task 0.0 in stage 66.0 (TID 112) on executor localhost: java.lang.ClassCastException (null) [duplicate 1]15/04/27 23:02:49 INFO TaskSchedulerImpl: Removed TaskSet 66.0, whose tasks have all completed, from pool 15/04/27 23:02:49 ERROR Executor: Exception in task 5.0 in stage 67.0 (TID 119)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost task 5.0 in stage 67.0 (TID 119) on executor localhost: java.lang.ClassCastException (null) [duplicate 4]15/04/27 23:02:49 ERROR Executor: Exception in task 0.0 in stage 67.0 (TID 114)java.lang.ClassCastException15/04/27 23:02:49 INFO TaskSetManager: Lost task 0.0 in stage 67.0 (TID 114) on executor localhost: java.lang.ClassCastException (null) [duplicate 5]15/04/27 23:02:49 INFO TaskSchedulerImpl: Removed TaskSet 67.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 66.0 failed 1 times, most recent failure: Lost task 1.0 in stage 66.0 (TID 113, localhost): java.lang.ClassCastException Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1241) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1232) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1231) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1231) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:705) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:705) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:705) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1424) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1385) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) ThanksKiran.
Streaming app with windowing and persistence
Hello, everyone. I develop stream application, working with window functions - each window create table and perform some SQL-operations on extracted data. I met such problem: when using window operations and checkpointing, application does not start next time. Here is the code: finalDuration batchDuration = Durations.seconds(10); finalDuration slideDuration = Durations.seconds(10); finalDuration windowDuration = Durations.seconds(600); finalSparkConf conf =newSparkConf(); conf.setAppName(Streaming); conf.setMaster(local[4]); JavaStreamingContextFactory contextFactory =newJavaStreamingContextFactory() { @Override publicJavaStreamingContext create() { JavaStreamingContext streamingContext =newJavaStreamingContext(conf,batchDuration); streamingContext.checkpoint(CHECKPOINT_DIR); returnstreamingContext; } }; JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate(CHECKPOINT_DIR,newConfiguration(), contextFactory,true); JavaDStreamString lines = streamingContext.textFileStream(SOURCE_DIR); lines.countByWindow(windowDuration,slideDuration).print(); streamingContext.start(); streamingContext.awaitTermination(); I expect, that after application restart, Spark will merge old event counter with new values (if it is not so, I am ready to merge old data manually). But, after application restart, I have this error: Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@49db6f23 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:584) at my.package.FileAggregations.main(FileAggregations.java:76) At FileAggregations.java:76 is streamingContext.start(); Spark version is 1.3.0. --- wbr, Alexandr Krasheninnikov
deos randomSplit return a copy or a reference to the original rdd? [Python]
Suppose I have something like the code below for idx in xrange(0, 10): train_test_split = training.randomSplit(weights=[0.75, 0.25]) train_cv = train_test_split[0] test_cv = train_test_split[1] # scale train_cv and test_cv by scaling train_cv and test_cv, will the original data be affected? Thanks,
Re: Getting error running MLlib example with new cluster
Hello Xiangrui, I am using this spark-submit command (as I do for all other jobs): /opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/spark/bin/spark-submit --class MLlib --master local[2] --jars $(echo /home/ec2-user/sparkApps/learning-spark/lib/*.jar | tr ' ' ',') /home/ec2-user/sparkApps/learning-spark/target/simple-project-1.1.jar Thank you for the help! Best, Su On Mon, Apr 27, 2015 at 9:58 AM, Xiangrui Meng men...@gmail.com wrote: How did you run the example app? Did you use spark-submit? -Xiangrui On Thu, Apr 23, 2015 at 2:27 PM, Su She suhsheka...@gmail.com wrote: Sorry, accidentally sent the last email before finishing. I had asked this question before, but wanted to ask again as I think it is now related to my pom file or project setup. Really appreciate the help! I have been trying on/off for the past month to try to run this MLlib example: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala I am able to build the project successfully. When I run it, it returns: features in spam: 8 features in ham: 7 and then freezes. According to the UI, the description of the job is count at DataValidators.scala.38. This corresponds to this line in the code: val model = lrLearner.run(trainingData) I've tried just about everything I can think of...changed numFeatures from 1 - 10,000, set executor memory to 1g, set up a new cluster, at this point I think I might have missed dependencies as that has usually been the problem in other spark apps I have tried to run. This is my pom file, that I have used for other successful spark apps. Please let me know if you think I need any additional dependencies or there are incompatibility issues, or a pom.xml that is better to use. Thank you! Cluster information: Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0) java version 1.7.0_25 Scala version: 2.10.4 hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0) project xmlns = http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://w3.org/2001/XMLSchema-instance; xsi:schemaLocation =http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd; groupId edu.berkely/groupId artifactId simple-project /artifactId modelVersion 4.0.0/modelVersion name Simple Project /name packaging jar /packaging version 1.0 /version repositories repository idcloudera/id url http://repository.cloudera.com/artifactory/cloudera-repos//url /repository repository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /repository /repositories pluginRepositories pluginRepository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /pluginRepository /pluginRepositories build plugins plugin groupIdorg.scala-tools/groupId artifactIdmaven-scala-plugin/artifactId executions execution idcompile/id goals goalcompile/goal /goals phasecompile/phase /execution execution idtest-compile/id goals goaltestCompile/goal /goals phasetest-compile/phase /execution execution phaseprocess-resources/phase goals goalcompile/goal /goals /execution /executions /plugin plugin artifactIdmaven-compiler-plugin/artifactId configuration source1.7/source target1.7/target /configuration /plugin /plugins /build dependencies dependency !--Spark dependency -- groupId org.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0-cdh5.3.0/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version2.5.0-mr1-cdh5.3.0/version /dependency
Spark Job fails with 6 executors and succeeds with 8 ?
I have this Spark App and it fails when i run with 6 executors but succeeds with 8. Any suggestions ? Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar --num-executors 6 --driver-memory 12g --driver-java-options -XX:MaxPermSize=8G --executor-memory 12g --executor-cores 6 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem buffersize=128 maxbuffersize=1068 maxResultSize=2G Input Data Sets Size: -sh-4.1$ hadoop fs -ls XX/YY/part-r-0 2663019338 bytes -sh-4.1$ hadoop fs -ls /AA/BB/part-r-0 2688348022 bytes -sh-4.1$ hadoop fs -ls /FOO/BAAR/ch196out83-r-0.avro 1274065689 bytes -sh-4.1$ Any thouhgts ? Exception: 15/04/27 22:12:46 INFO Configuration.deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec 15/04/27 22:12:47 ERROR executor.Executor: Exception in task 8.0 in stage 4.0 (TID 36) scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object SchemaUtil at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1220) at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1218) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.reflect.internal.Symbols$Symbol.lock(Symbols.scala:482) at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1218) at scala.reflect.internal.Symbols$Symbol.initialize(Symbols.scala:1374) at scala.reflect.internal.Symbols$Symbol.hasFlag(Symbols.scala:607) at scala.reflect.internal.Symbols$TermSymbol.isTermMacro(Symbols.scala:2453) at scala.reflect.internal.Symbols$Symbol.symbolKind(Symbols.scala:2263) at scala.reflect.internal.Symbols$Symbol.sanitizedKindString(Symbols.scala:2297) at scala.reflect.internal.Symbols$Symbol.kindString(Symbols.scala:2305) at scala.reflect.internal.Symbols$Symbol.toString(Symbols.scala:2350) at java.lang.String.valueOf(String.java:2847) at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) at scala.collection.TraversableOnce$$anonfun$addString$1.apply(TraversableOnce.scala:327) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableOnce$class.addString(TraversableOnce.scala:320) at scala.collection.AbstractTraversable.addString(Traversable.scala:105) at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:286) at scala.collection.AbstractTraversable.mkString(Traversable.scala:105) at scala.collection.TraversableLike$class.toString(TraversableLike.scala:639) at scala.collection.SeqLike$class.toString(SeqLike.scala:646) at scala.collection.AbstractSeq.toString(Seq.scala:40) at java.lang.String.valueOf(String.java:2847) at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:44) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61) at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72) at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:161) at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:21) at com.ebay.ep.poc.spark.reporting.process.util.SchemaUtil$$typecreator3$1.apply(SchemaUtil.scala:108) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:335) at scala.reflect.api.Universe.typeOf(Universe.scala:59) at com.ebay.ep.poc.spark.reporting.process.util.SchemaUtil$.com$ebay$ep$poc$spark$reporting$process$util$SchemaUtil$$getField(SchemaUtil.scala:108) at com.ebay.ep.poc.spark.reporting.process.util.SchemaUtil$$anonfun$6$$anonfun$apply$4.apply(SchemaUtil.scala:94) at com.ebay.ep.poc.spark.reporting.process.util.SchemaUtil$$anonfun$6$$anonfun$apply$4.apply(SchemaUtil.scala:90) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.reflect.internal.Scopes$Scope.foreach(Scopes.scala:315) at
Serialization error
Hi, While connecting to accumulo through spark by making sparkRDD I am getting the following error: object not serializable (class: org.apache.accumulo.core.data.Key) This is due to the 'key' class of accumulo which does not implement serializable interface.How it can be solved and accumulo can be used with spark Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to work with foreachrdd
Can u write the code? Maby is the foreachRDD body. :) El martes, 28 de abril de 2015, CH.KMVPRASAD [via Apache Spark User List] ml-node+s1001560n22681...@n3.nabble.com escribió: When i run spark streaming application print method is printing result it is f9, but i used foreachrdd on that dstream object it is not working ? why? what is the reason! please help me! -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-work-with-foreachrdd-tp22681.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com javascript:_e(%7B%7D,'cvml','ml-node%2bs1001560n1...@n3.nabble.com'); To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=ZHJhcnNlLmFtZXNAZ21haWwuY29tfDF8MTUyMzY0MjQyMA== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Atte. Sergio Jiménez -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-work-with-foreachrdd-tp22681p22682.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: gridsearch - python
We will try to make them available in 1.4, which is coming soon. -Xiangrui On Thu, Apr 23, 2015 at 10:18 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I know grid search with cross validation is not supported. However, I was wondering if there is something availalable for the time being. Thanks, From: Punyashloka Biswal [mailto:punya.bis...@gmail.com] Sent: Thursday, April 23, 2015 9:06 PM To: Pagliari, Roberto; user@spark.apache.org Subject: Re: gridsearch - python https://issues.apache.org/jira/browse/SPARK-7022. Punya On Thu, Apr 23, 2015 at 5:47 PM Pagliari, Roberto rpagli...@appcomsci.com wrote: Can anybody point me to an example, if available, about gridsearch with python? Thank you, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Mesos
So I installed spark on each of the slaves 1.3.1 built with hadoop2.6 I just basically got the pre-built from the spark website… I placed those compiled spark installs on each slave at /opt/spark My spark properties seem to be getting picked up on my side fine… [cid:683C1BA0-C9EC-448C-B1DB-E93AC4576DE9@coldlight.corp] The framework is registered in Mesos, it shows up just fine, it doesn’t matter if I turn off the executor uri or not, but I always get the same error… org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 0.0 failed 4 times, most recent failure: Lost task 6.3 in stage 0.0 (TID 23, 10.253.1.117): ExecutorLostFailure (executor 20150424-104711-1375862026-5050-20113-S1 lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) These boxes are totally open to one another so they shouldn’t have any firewall issues, everything seems to show up in mesos and spark just fine, but actually running stuff totally blows up. There is nothing in the stderr or stdout, it downloads the package and untars it but doesn’t seem to do much after that. Any insights? Steve On Apr 24, 2015, at 5:50 PM, Yang Lei genia...@gmail.commailto:genia...@gmail.com wrote: SPARK_PUBLIC_DNS, SPARK_LOCAL_IP, SPARK_LOCAL_HOST This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment.
Group by order by
Hi, Could you suggest what is the best way to do group by x order by y in Spark? When I try to perform it with Spark SQL I get the following error (Spark 1.3): val results = sqlContext.sql(select * from sample group by id order by time) org.apache.spark.sql.AnalysisException: expression 'time' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37) Is there a way to do it with just RDD? Best regards, Alexander
Re: Group by order by
Hi, that error seems to indicate the basic query is not properly expressed. If you group by just ID, then that means it would need to aggregate all the time values into one value per ID, so you can't sort by it. Thus it tries to suggest an aggregate function for time so you can have 1 value per ID and properly sort it. On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi, Could you suggest what is the best way to do “group by x order by y” in Spark? When I try to perform it with Spark SQL I get the following error (Spark 1.3): val results = sqlContext.sql(select * from sample group by id order by time) org.apache.spark.sql.AnalysisException: expression 'time' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37) Is there a way to do it with just RDD? Best regards, Alexander
RE: Group by order by
Hi Richard, There are several values of time per id. Is there a way to perform group by id and sort by time in Spark? Best regards, Alexander From: Richard Marscher [mailto:rmarsc...@localytics.com] Sent: Monday, April 27, 2015 12:20 PM To: Ulanov, Alexander Cc: user@spark.apache.org Subject: Re: Group by order by Hi, that error seems to indicate the basic query is not properly expressed. If you group by just ID, then that means it would need to aggregate all the time values into one value per ID, so you can't sort by it. Thus it tries to suggest an aggregate function for time so you can have 1 value per ID and properly sort it. On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander alexander.ula...@hp.commailto:alexander.ula...@hp.com wrote: Hi, Could you suggest what is the best way to do “group by x order by y” in Spark? When I try to perform it with Spark SQL I get the following error (Spark 1.3): val results = sqlContext.sql(select * from sample group by id order by time) org.apache.spark.sql.AnalysisException: expression 'time' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37) Is there a way to do it with just RDD? Best regards, Alexander
Understanding Spark's caching
Hi Everyone! I'm trying to understand how Spark's cache work. Here is my naive understanding, please let me know if I'm missing something: val rdd1 = sc.textFile(some data) rdd.cache() //marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.saveAsTextFile(...) rdd3.saveAsTextFile(...) In the above, rdd1 will be loaded from disk (e.g. HDFS) only once. (when rdd2 is saved I assume) and then from cache (assuming there is enough RAM) when rdd3 is saved) Now here is my question. Let's say I want to cache rdd2 and rdd3 as they will both be used later on, but I don't need rdd1 after creating them. Basically there is duplication, isn't it? Since once rdd2 and rdd3 are calculated, I don't need rdd1 anymore, I should probably unpersist it, right? the question is when? *Will this work? (Option A)* val rdd1 = sc.textFile(some data) rdd.cache() //marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.cache() rdd3.cache() rdd1.unpersist() Does spark add the unpersist call to the DAG? or is it done immediately? if it's done immediately, then basically rdd1 will be non cached when I read from rdd2 and rdd3, right? *Should I do it this way instead (Option B)?* val rdd1 = sc.textFile(some data) rdd.cache() //marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.cache() rdd3.cache() rdd2.saveAsTextFile(...) rdd3.saveAsTextFile(...) rdd1.unpersist() *So the question is this:* Is Option A good enough? e.g. will rdd1 be still accessing the file only once? Or do I need to go with Option B? (see also http://stackoverflow.com/questions/29903675/understanding-sparks-caching) Thanks in advance