Re: --jars works in yarn-client but not yarn-cluster mode, why?

2015-05-20 Thread Fengyun RAO
. (You can also add those configs to your spark-defaults.conf to avoid having to type them all the time; and don't forget to include any other jars that might be needed.) On Mon, May 18, 2015 at 11:14 PM, Fengyun RAO raofeng...@gmail.com wrote: Thanks, Marcelo! Below is the full log

Re: --jars works in yarn-client but not yarn-cluster mode, why?

2015-05-19 Thread Fengyun RAO
Thanks, Marcelo! Below is the full log, SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in

Re: --jars works in yarn-client but not yarn-cluster mode, why?

2015-05-14 Thread Fengyun RAO
situation as described in SPARK-5377. Wilfred On 14/05/2015 13:47, Fengyun RAO wrote: I look into the Environment in both modes. yarn-client: spark.jars local:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar,file:/home/xxx/my-app.jar yarn-cluster

Re: --jars works in yarn-client but not yarn-cluster mode, why?

2015-05-13 Thread Fengyun RAO
wonder why htrace exists in spark.yarn.secondary.jars but still not found in URLClassLoader. I tried both local and file mode for the jar, still the same error. 2015-05-14 11:37 GMT+08:00 Fengyun RAO raofeng...@gmail.com: Hadoop version: CDH 5.4. We need to connect to HBase, thus need extra /opt

--jars works in yarn-client but not yarn-cluster mode, why?

2015-05-13 Thread Fengyun RAO
Hadoop version: CDH 5.4. We need to connect to HBase, thus need extra /opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar dependency. It works in yarn-client mode: spark-submit --class xxx.xxx.MyApp --master yarn-client --num-executors 10 --executor-memory 10g --jars

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-24 Thread Fengyun RAO
by the cache in LogParser, so I mocked a class to avoid the cache, unfortunately it’s still slower. 2015-01-22 4:33 GMT+08:00 Davies Liu dav...@databricks.com: On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
or works differently. I wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote: the LogParser instance is not serializable, and thus cannot

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
made more executors per machine. But from your additional info it does not sound like this is the case. I think you need more debugging to pinpoint what is slower. On Jan 21, 2015 9:30 AM, Fengyun RAO raofeng...@gmail.com wrote: thanks, Sean. I don't quite understand you have *more *partitions

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
the singleton pattern doesn't work or works differently. I wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
btw: Shuffle Write(11 GB) mean 11 GB per Executor, for each task, it's ~40 MB 2015-01-21 17:53 GMT+08:00 Fengyun RAO raofeng...@gmail.com: I don't know how to debug distributed application, any tools or suggestion? but from spark web UI, the GC time (~0.1 s), Shuffle Write(11 GB

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
-serializable native code / objects. FWIW RDD#flatMap() does not appear to have changed 1.1 - 1.2 (tho master has a slight refactor). Agree it's worth checking the number of partitions in your 1.1 vs 1.2 test. On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-20 Thread Fengyun RAO
at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote: Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below

spark 1.2 three times slower than spark 1.1, why?

2015-01-20 Thread Fengyun RAO
Currently we are migrating from spark 1.1 to spark 1.2, but found the program 3x slower, with nothing else changed. note: our program in spark 1.1 has successfully processed a whole year data, quite stable. the main script is as below sc.textFile(inputPath) .flatMap(line =

Re: How to share a NonSerializable variable among tasks in the same worker node?

2015-01-20 Thread Fengyun RAO
currently we migrate from 1.1 to 1.2, and found our program 3x slower, maybe due to the singleton hack? could you explain in detail why or how The singleton hack works very different in spark 1.2.0 thanks! 2015-01-18 20:56 GMT+08:00 octavian.ganea octavian.ga...@inf.ethz.ch: The singleton

Re: what classes are needed to register in KryoRegistrator, e.g. Row?

2014-10-28 Thread Fengyun RAO
Although nobody answers, as I tested, Row, MutableValue and there subclasses are not registered by default, which I think should be, since they would absolutely show up in Spark SQL. ​ 2014-10-26 23:43 GMT+08:00 Fengyun RAO raofeng...@gmail.com: In Tuning Spark https://spark.apache.org/docs

what classes are needed to register in KryoRegistrator, e.g. Row?

2014-10-26 Thread Fengyun RAO
In Tuning Spark https://spark.apache.org/docs/latest/tuning.html, it says, Spark automatically includes Kryo serializers for the *many commonly-used core Scala classes* covered in the AllScalaRegistrar from the Twitter chill https://github.com/twitter/chill library. I looked into the

Re: What does KryoException: java.lang.NegativeArraySizeException mean?

2014-10-21 Thread Fengyun RAO
Thanks, Guilaume, Below is when the exception happens, nothing has spilled to disk yet. And there isn't a join, but a partitionBy and groupBy action. Actually if numPartitions is small, it succeeds, while if it's large, it fails. Partition was simply done by override def getPartition(key:

What does KryoException: java.lang.NegativeArraySizeException mean?

2014-10-20 Thread Fengyun RAO
The exception drives me crazy, because it occurs randomly. I didn't know which line of my code causes this exception. I didn't even understand what KryoException: java.lang.NegativeArraySizeException means, or even implies? 14/10/20 15:59:01 WARN scheduler.TaskSetManager: Lost task 32.2 in stage

Re: What does KryoException: java.lang.NegativeArraySizeException mean?

2014-10-20 Thread Fengyun RAO
Thank you, Guillaume, my dataset is not that large, it's totally ~2GB 2014-10-20 16:58 GMT+08:00 Guillaume Pitel guillaume.pi...@exensa.com: Hi, It happened to me with blocks which take more than 1 or 2 GB once serialized I think the problem was that during serialization, a Byte Array is

Re: How to close resources shared in executor?

2014-10-17 Thread Fengyun RAO
): * HTableInterface table = connection.getTable(table1); * try { * // Use the table as needed, for a single operation and a single thread * } finally { * table.close(); * connection.close(); * } Cheers On Thu, Oct 16, 2014 at 9:03 PM, Fengyun RAO raofeng...@gmail.com wrote

Re: How to close resources shared in executor?

2014-10-16 Thread Fengyun RAO
(user) ... Util.Connection.close() } On Wed, Oct 15, 2014 at 6:09 AM, Fengyun RAO raofeng...@gmail.com wrote: In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection

Re: How to close resources shared in executor?

2014-10-16 Thread Fengyun RAO
that statesHconnectionManager would release underlying connection automatically? If that’s true, maybe the Javadoc which recommends a shutdown hook needs update ​ 2014-10-16 14:20 GMT+08:00 Fengyun RAO raofeng...@gmail.com: Thanks, Ted. Util.Connection.close() should be called only once, so it can

Re: How to add HBase dependencies and conf with spark-submit?

2014-10-16 Thread Fengyun RAO
/CDH/lib/hbase/hbase-common.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar,/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar \ - Original Message - From: Fengyun RAO raofeng...@gmail.com To: user@spark.apache.org, u...@hbase.apache.org Sent: Wednesday, October 15, 2014 6

Re: How to close resources shared in executor?

2014-10-16 Thread Fengyun RAO
on shutdown hook is in HConnectionManager.java of 0.94 You don't need to use shutdown hook for 0.94+ Cheers On Wed, Oct 15, 2014 at 11:41 PM, Fengyun RAO raofeng...@gmail.com wrote: I may have misunderstood your point. val result = rdd.map(line = { val table = Util.Connection.getTable(user

How to close resources shared in executor?

2014-10-15 Thread Fengyun RAO
In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection(HBaseConf) } which would be shared among tasks on the same executor. e.g. val result = rdd.map(line = { val table

Re: How to add HBase dependencies and conf with spark-submit?

2014-10-15 Thread Fengyun RAO
+user@hbase 2014-10-15 20:48 GMT+08:00 Fengyun RAO raofeng...@gmail.com: We use Spark 1.1, and HBase 0.98.1-cdh5.1.0, and need to read and write an HBase table in Spark program. I notice there are: spark.driver.extraClassPath spark.executor.extraClassPathproperties to manage extra

Re: [Spark SQL] How to select first row in each GROUP BY group?

2014-08-21 Thread Fengyun RAO
on the SchemaRDD, then for each group just take the first record. From: Fengyun RAO raofeng...@gmail.com Date: Thursday, August 21, 2014 at 8:26 AM To: user@spark.apache.org user@spark.apache.org Subject: Re: [Spark SQL] How to select first row in each GROUP BY group? Could anybody help? I googled

[Spark SQL] How to select first row in each GROUP BY group?

2014-08-20 Thread Fengyun RAO
I have a table with 4 columns: a, b, c, time What I need is something like: SELECT a, b, GroupFirst(c) FROM t GROUP BY a, b GroupFirst means the first item of column c group, and by the first I mean minimal time in that group. In Oracle/Sql Server, we could write: WITH summary AS (

sqlContext.parquetFile(path) fails if path is a file but succeeds if a directory

2014-08-18 Thread Fengyun RAO
I'm using CDH 5.1 with spark 1.0. When I try to run Spark SQL following the Programming Guide val parquetFile = sqlContext.parquetFile(path) If the path is a file, it throws an exception: Exception in thread main java.lang.IllegalArgumentException: Expected hdfs://*/file.parquet for be a

Fwd: how to split RDD by key and save to different path

2014-08-12 Thread Fengyun RAO
partition } } 2014-08-12 21:34 GMT+08:00 Fengyun RAO raofeng...@gmail.com: 1. be careful, HDFS are better for large files, not bunches of small files. 2. if that's really what you want, roll it your own. def writeAvro(iterator: Iterator[(String, String)]) = { val writers = new

Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-09 Thread Fengyun RAO
Although nobody answers the Two questions, in my practice, it seems both are yes. 2014-08-04 19:50 GMT+08:00 Fengyun RAO raofeng...@gmail.com: object LogParserWrapper { private val logParser = { val settings = new ... val builders = new new LogParser

Re: Is there a way to write spark RDD to Avro files

2014-08-02 Thread Fengyun RAO
Below works for me: val job = Job.getInstance val schema = Schema.create(Schema.Type.STRING) AvroJob.setOutputKeySchema(job, schema) records.map(item = (new AvroKey[String](item.getGridsumId), NullWritable.get())) .saveAsNewAPIHadoopFile(args(1),

How to share a NonSerializable variable among tasks in the same worker node?

2014-07-31 Thread Fengyun RAO
As shown here: 2 - Why Is My Spark Job so Slow and Only Using a Single Thread? http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/ 123456789101112131415 object JSONParser { def parse(raw: String): String = ...}object MyFirstSparkJob { def

Re: Is it possible to read file head in each partition?

2014-07-30 Thread Fengyun RAO
to filter them out by prefix string matching or regex? On Wed, Jul 30, 2014 at 1:39 PM, Fengyun RAO raofeng...@gmail.com wrote: It will certainly cause bad performance, since it reads the whole content of a large file into one value, instead of splitting it into partitions. Typically one file

Is it possible to read file head in each partition?

2014-07-29 Thread Fengyun RAO
Hi, all We are migrating from mapreduce to spark, and encountered a problem. Our input files are IIS logs with file head. It's easy to get the file head if we process only one file, e.g. val lines = sc.textFile('hdfs://*/u_ex14073011.log') val head = lines.take(4) Then we can write our map

Re: Is it possible to read file head in each partition?

2014-07-29 Thread Fengyun RAO
RDD.mapPartitions(). Nick ​ On Wed, Jul 30, 2014 at 12:02 AM, Fengyun RAO raofeng...@gmail.com wrote: Hi, all We are migrating from mapreduce to spark, and encountered a problem. Our input files are IIS logs with file head. It's easy to get the file head if we process only one file, e.g