Re: Storing spark processed output to Database asynchronously.

2015-05-21 Thread Gautam Bajaj
That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss?

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote:

 If you cannot push data as fast as you are generating it, then async isnt
 going to help either. The work is just going to keep piling up as many
 many async jobs even though your batch processing times will be low as that
 processing time is not going to reflect how much of overall work is pending
 in the system.

 On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Hi,

 From my understanding of Spark Streaming, I created a spark entry point,
 for continuous UDP data, using:

 SparkConf conf = new 
 SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext
  jssc = new JavaStreamingContext(conf, new 
 Duration(1));JavaReceiverInputDStreamString lines = 
 jssc.receiverStream(new CustomReceiver(8060));

 Now, when I process this input stream using:

 JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= 
 hash.mapToPair(my-code)JavaPairDStream output= tuple.reduceByKey(my-code)
 output.foreachRDD(
 new 
 Function2JavaPairRDDString,ArrayListString,Time,Void(){
 @Override
 public Void call(
 JavaPairRDDString, ArrayListString arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub
 new AsyncRDDActions(arg0.rdd(), null);
 arg0.foreachPartition(
 new 
 VoidFunctionIteratorTuple2String,ArrayListString(){

 @Override
 public void call(
 IteratorTuple2String, 
 ArrayListString arg0)
 throws Exception {

 // TODO Auto-generated method stub
 GraphDatabaseService graphDb = new 
 GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/)
 
 .setConfig(remote_shell_enabled, true)
 .newGraphDatabase();

 try (Transaction tx = 
 graphDb.beginTx()) {
 while (arg0.hasNext()) {
 Tuple2  String, ArrayList  
 String  tuple = arg0.next();
 Node 
 HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
 boolean oldHMac=false;
 if (HMac!= null){
 
 System.out.println(Alread in Database: + tuple._1);
 oldHMac=true;
 }
 else
 
 HMac=Neo4jOperations.createHMac(graphDb, tuple._1);

 ArrayListString 
 zipcodes=tuple._2;
 for(String zipcode : 
 zipcodes){
 Node 
 Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
 if(Zipcode!=null){
 
 System.out.println(Already in Database: + zipcode);
 if(oldHMac==true  
 Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
 
 Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
 else
 
 Neo4jOperations.travelTo(HMac, Zipcode);
 }
 else{
 
 Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
 
 Neo4jOperations.travelTo(HMac, Zipcode);
 }
 }
 }
 tx.success();
 }
 graphDb.shutdown();
 }
 });
 return null;
 }
 });

 The part of 

Re: [Streaming] Non-blocking recommendation in custom receiver documentation and KinesisReceiver's worker.run blocking calll

2015-05-21 Thread Tathagata Das
Thanks for the JIRA. I will look into this issue.

TD

On Thu, May 21, 2015 at 1:31 AM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 I ran into one of the issues that are potentially caused because of this
 and have logged a JIRA bug -
 https://issues.apache.org/jira/browse/SPARK-7788

 Thanks,
 Aniket

 On Wed, Sep 24, 2014 at 12:59 PM Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi all

 Reading through Spark streaming's custom receiver documentation, it is
 recommended that onStart and onStop methods should not block indefinitely.
 However, looking at the source code of KinesisReceiver, the onStart method
 calls worker.run that blocks until worker is shutdown (via a call to
 onStop).

 So, my question is what are the ramifications of making a blocking call
 in onStart and whether this is something that should be addressed
 in KinesisReceiver implementation.

 Thanks,
 Aniket




Re: spark mllib kmeans

2015-05-21 Thread Pa Rö
i want evaluate some different distance measure for time-space clustering.
so i need a api for implement my own function in java.

2015-05-19 22:08 GMT+02:00 Xiangrui Meng men...@gmail.com:

 Just curious, what distance measure do you need? -Xiangrui

 On Mon, May 11, 2015 at 8:28 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:
  take a look at this
  https://github.com/derrickburns/generalized-kmeans-clustering
 
  Best,
 
  Jao
 
  On Mon, May 11, 2015 at 3:55 PM, Driesprong, Fokko fo...@driesprong.frl
 
  wrote:
 
  Hi Paul,
 
  I would say that it should be possible, but you'll need a different
  distance measure which conforms to your coordinate system.
 
  2015-05-11 14:59 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:
 
  hi,
 
  it is possible to use a custom distance measure and a other data typ as
  vector?
  i want cluster temporal geo datas.
 
  best regards
  paul
 
 
 



Re: java program Get Stuck at broadcasting

2015-05-21 Thread Akhil Das
Can you share the code, may be i/someone can help you out

Thanks
Best Regards

On Thu, May 21, 2015 at 1:45 PM, Allan Jie allanmcgr...@gmail.com wrote:

 Hi,

 Just check the logs of datanode, it looks like this:

 2015-05-20 11:42:14,605 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /
 10.9.0.48:50676, dest: /10.9.0.17:50010, bytes: 134217728, op:
 HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_804680172_54, offset: 0, srvID:
 39fb78d5-828a-4319-8303-c704fab526e3, blockid:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273, duration:
 16994466261
 2015-05-20 11:42:14,606 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273,
 type=LAST_IN_PIPELINE, downstreams=0:[] terminating
 2015-05-20 11:42:17,788 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /
 10.9.0.17:49046, dest: /10.9.0.17:50010, bytes: 134217728, op:
 HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_102926009_54, offset: 0, srvID:
 39fb78d5-828a-4319-8303-c704fab526e3, blockid:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276, duration:
 17829554438
 2015-05-20 11:42:17,788 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276,
 type=HAS_DOWNSTREAM_IN_PIPELINE terminating
 2015-05-20 11:42:17,904 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving
 BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 src: /
 10.9.0.17:49049 dest: /10.9.0.17:50010
 2015-05-20 11:42:17,904 WARN
 org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in
 BlockReceiver constructor. Cause is
 2015-05-20 11:42:17,904 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: opWriteBlock
 BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 received exception
 org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space:
 The volume with the most available space (=114409472 B) is less than the
 block size (=134217728 B).
 2015-05-20 11:42:17,905 ERROR
 org.apache.hadoop.hdfs.server.datanode.DataNode:
 HadoopV26Slave1:50010:DataXceiver error processing WRITE_BLOCK operation
  src: /10.9.0.17:49049 dst: /10.9.0.17:50010
 org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space:
 The volume with the most available space (=114409472 B) is less than the
 block size (=134217728 B).
 at
 org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy.chooseVolume(RoundRobinVolumeChoosingPolicy.java:67)
 at
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList.getNextVolume(FsVolumeList.java:69)
 at
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:1084)
 at
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:114)
 at
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.init(BlockReceiver.java:183)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:615)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:137)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:74)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:235)
 at java.lang.Thread.run(Thread.java:745)
 2015-05-20 11:43:59,669 INFO
 org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification
 succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073741999_1176
 2015-05-20 11:46:10,214 INFO
 org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification
 succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073742000_1177
 2015-05-20 11:48:35,445 INFO
 org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification
 succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073741990_1167
 2015-05-20 11:50:04,043 INFO
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
 Scheduling blk_1073742080_1257 file
 /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742080
 for deletion
 2015-05-20 11:50:04,136 INFO
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
 Scheduling blk_1073742081_1258 file
 /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742081
 for deletion
 2015-05-20 11:50:04,136 INFO
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
 Scheduling blk_1073742082_1259 file
 /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742082
 for deletion
 2015-05-20 11:50:04,136 INFO
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
 Scheduling blk_1073742083_1260 file
 

Re: Storing spark processed output to Database asynchronously.

2015-05-21 Thread Tathagata Das
If you cannot push data as fast as you are generating it, then async isnt
going to help either. The work is just going to keep piling up as many
many async jobs even though your batch processing times will be low as that
processing time is not going to reflect how much of overall work is pending
in the system.

On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote:

 Hi,

 From my understanding of Spark Streaming, I created a spark entry point,
 for continuous UDP data, using:

 SparkConf conf = new 
 SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext
  jssc = new JavaStreamingContext(conf, new 
 Duration(1));JavaReceiverInputDStreamString lines = 
 jssc.receiverStream(new CustomReceiver(8060));

 Now, when I process this input stream using:

 JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= 
 hash.mapToPair(my-code)JavaPairDStream output= tuple.reduceByKey(my-code)
 output.foreachRDD(
 new 
 Function2JavaPairRDDString,ArrayListString,Time,Void(){
 @Override
 public Void call(
 JavaPairRDDString, ArrayListString arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub
 new AsyncRDDActions(arg0.rdd(), null);
 arg0.foreachPartition(
 new 
 VoidFunctionIteratorTuple2String,ArrayListString(){

 @Override
 public void call(
 IteratorTuple2String, 
 ArrayListString arg0)
 throws Exception {

 // TODO Auto-generated method stub
 GraphDatabaseService graphDb = new 
 GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/)
 
 .setConfig(remote_shell_enabled, true)
 .newGraphDatabase();

 try (Transaction tx = 
 graphDb.beginTx()) {
 while (arg0.hasNext()) {
 Tuple2  String, ArrayList  
 String  tuple = arg0.next();
 Node 
 HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
 boolean oldHMac=false;
 if (HMac!= null){
 
 System.out.println(Alread in Database: + tuple._1);
 oldHMac=true;
 }
 else
 
 HMac=Neo4jOperations.createHMac(graphDb, tuple._1);

 ArrayListString 
 zipcodes=tuple._2;
 for(String zipcode : 
 zipcodes){
 Node 
 Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
 if(Zipcode!=null){
 
 System.out.println(Already in Database: + zipcode);
 if(oldHMac==true  
 Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
 
 Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
 else
 
 Neo4jOperations.travelTo(HMac, Zipcode);
 }
 else{
 
 Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
 
 Neo4jOperations.travelTo(HMac, Zipcode);
 }
 }
 }
 tx.success();
 }
 graphDb.shutdown();
 }
 });
 return null;
 }
 });

 The part of code in output.foreachRDD pushes the output of spark into
 Neo4j Database. Checking for duplicates values.

 This part of code is very time consuming because of which my processing
 time exceeds batch time. Because of that, it 

Question about Serialization in Storage Level

2015-05-21 Thread Jiang, Zhipeng
Hi there,

This question may seem to be kind of naïve, but what's the difference between 
MEMORY_AND_DISK and MEMORY_AND_DISK_SER?

If I call rdd.persist(StorageLevel.MEMORY_AND_DISK), the BlockManager won't 
serialize the rdd?

Thanks,
Zhipeng


Re: Spark and Flink

2015-05-21 Thread Pa Rö
thanks a lot for ur help, now i split my project, it's works.

2015-05-19 15:44 GMT+02:00 Alexander Alexandrov 
alexander.s.alexand...@gmail.com:

 Sorry, we're using a forked version which changed groupID.

 2015-05-19 15:15 GMT+02:00 Till Rohrmann trohrm...@apache.org:

 I guess it's a typo: eu.stratosphere should be replaced by
 org.apache.flink

 On Tue, May 19, 2015 at 1:13 PM, Alexander Alexandrov 
 alexander.s.alexand...@gmail.com wrote:

 We managed to do this with the following config:

 // properties
 !-- Hadoop --
 hadoop.version2.2.0/hadoop.version
 !-- Flink --
 flink.version0.9-SNAPSHOT/flink.version
 !-- Spark --
 spark.version1.2.1/spark.version

 // form the dependency management
 !-- Hadoop --
 dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-common/artifactId
 version${hadoop.version}/version
 scopeprovided/scope
 /dependency
 dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-hdfs/artifactId
 version${hadoop.version}/version
 scopeprovided/scope
 /dependency

 !-- Flink --
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-scala/artifactId
 version${flink.version}/version
 scopeprovided/scope
 /dependency
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-java/artifactId
 version${flink.version}/version
 scopeprovided/scope
 /dependency
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-clients/artifactId
 version${flink.version}/version
 scopeprovided/scope
 /dependency

 !-- Spark --
 dependency
 groupIdorg.apache.spark/groupId

 artifactIdspark-core_${scala.tools.version}/artifactId
 version${spark.version}/version
 scopeprovided/scope
 /dependency

 !-- Jetty --
 dependency
 groupIdorg.eclipse.jetty/groupId
 artifactIdjetty-util/artifactId
 version${jetty.version}/version
 /dependency
 dependency
 groupIdorg.eclipse.jetty/groupId
 artifactIdjetty-servlet/artifactId
 version${jetty.version}/version
 /dependency

 // actual dependencies
 !-- Spark --
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_${scala.tools.version}/artifactId
 /dependency

 !-- Flink --
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-scala/artifactId
 /dependency
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-java/artifactId
 /dependency
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-clients/artifactId
 /dependency
 !-- FIXME: this is a hacky solution for a Flink issue with the
 Jackson deps--
 dependency
 groupIdcom.fasterxml.jackson.core/groupId
 artifactIdjackson-core/artifactId
 version2.2.1/version
 scopeprovided/scope
 /dependency
 dependency
 groupIdcom.fasterxml.jackson.core/groupId
 artifactIdjackson-databind/artifactId
 version2.2.1/version
 scopeprovided/scope
 /dependency
 dependency
 groupIdcom.fasterxml.jackson.core/groupId
 artifactIdjackson-annotations/artifactId
 version2.2.1/version
 scopeprovided/scope
 /dependency


 2015-05-19 10:06 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

 it's sound good, maybe you can send me pseudo structure, that is my
 fist maven project.

 best regards,
 paul

 2015-05-18 14:05 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi,
 I would really recommend you to put your Flink and Spark dependencies
 into different maven modules.
 Having them both in the same project will be very hard, if not
 impossible.
 Both projects depend on similar projects with slightly different
 versions.

 I would suggest a maven module structure like this:
 yourproject-parent (a pom module)
 -- yourproject-common
 -- yourproject-flink
 -- yourproject-spark



 On Mon, May 18, 2015 at 10:00 AM, Pa Rö 
 paul.roewer1...@googlemail.com wrote:

 hi,
 if i add your dependency i get over 100 errors, now i change the
 version number:
 dependencies
 dependency
 groupIdcom.fasterxml.jackson.module/groupId
  

Re: How to process data in chronological order

2015-05-21 Thread Sonal Goyal
Would partitioning your data based on the key and then running
mapPartitions help?

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Thu, May 21, 2015 at 4:33 AM, roy rp...@njit.edu wrote:

 I have a key-value RDD, key is a timestamp (femto-second resolution, so
 grouping buys me nothing) and I want to reduce it in the chronological
 order.

 How do I do that in spark?

 I am fine with reducing contiguous sections of the set separately and then
 aggregating the resulting objects locally.

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-data-in-chronological-order-tp22966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Unable to use hive queries with constants in predicates

2015-05-21 Thread Devarajan Srinivasan
Hi,

   I was testing spark to read data from hive using HiveContext. I got the
following error, when I used a simple query with constants in predicates.

  I am using spark 1.3*. *Anyone encountered error like this ??


*Error:*


Exception in thread main org.apache.spark.sql.AnalysisException:
Unsupported language features in query: SELECT * from test_table where
daily_partition='20150101'
TOK_QUERY 1, 0,20, 81
  TOK_FROM 1, 10,14, 81
TOK_TABREF 1, 12,14, 81
  TOK_TABNAME 1, 12,14, 81
everest_marts_test 1, 12,12, 81
voice_cdr 1, 14,14, 100
  TOK_INSERT 0, -1,-1, 0
TOK_DESTINATION 0, -1,-1, 0
  TOK_DIR 0, -1,-1, 0
TOK_TMP_FILE 0, -1,-1, 0
TOK_SELECT 1, 0,8, 7
  TOK_SELEXPR 1, 2,2, 7
TOK_TABLE_OR_COL 1, 2,2, 7
  callingpartynumber 1, 2,2, 7
  TOK_SELEXPR 1, 4,4, 26
TOK_TABLE_OR_COL 1, 4,4, 26
  calledpartynumber 1, 4,4, 26
  TOK_SELEXPR 1, 6,6, 44
TOK_TABLE_OR_COL 1, 6,6, 44
  chargingtime 1, 6,6, 44
  TOK_SELEXPR 1, 8,8, 57
TOK_TABLE_OR_COL 1, 8,8, 57
  call_direction_key 1, 8,8, 57
TOK_WHERE 1, 16,20, 131
  = 1, 18,20, 131
TOK_TABLE_OR_COL 1, 18,18, 116
  daily_partition 1, 18,18, 116
'20150101' 1, 20,20, 132

scala.NotImplementedError: No parse rules for ASTNode type: 294, text:
'20150101' :
'20150101' 1, 20,20, 132
 +
org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261)
  ;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$
hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$
hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(
Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
scala:222)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$
apply$14.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$
apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.
scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(
PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(
AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$
SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$
SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(
Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
scala:222)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$
apply$14.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$
apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 

Re: How to set the file size for parquet Part

2015-05-21 Thread Akhil Das
How many part files are you having? Did you try re-partitioning to a
smaller number so that you will have bigger files of smaller number.

Thanks
Best Regards

On Wed, May 20, 2015 at 3:06 AM, Richard Grossman richie...@gmail.com
wrote:

 Hi

 I'm using spark 1.3.1 and now I can't set the size of the part generated
 file for parquet.
 The size is only 512Kb it's really to small I must made them bigger.
 How can set this ?
 Thanks



Re: Read multiple files from S3

2015-05-21 Thread Akhil Das
textFile does reads all files in a directory.

We have modified the sparkstreaming code base to read nested files from S3,
you can check this function
https://github.com/sigmoidanalytics/spark-modified/blob/8074620414df6bbed81ac855067600573a7b22ca/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala#L206
which does that and implement something similar for your usecase.

Or if your job is just a batch job and you don't bother processing file by
file, then may be you can iterate over your list and create a sc.textFile
for each file entry and do the computing too. something like:

for(file - fileNames){

 // Create sparkContext
 // do sc.textFile(file)
 // do your computing
 // sc.stop

}



Thanks
Best Regards

On Thu, May 21, 2015 at 1:45 AM, lovelylavs lxn130...@utdallas.edu wrote:

 Hi,

 I am trying to get a collection of files according to LastModifiedDate from
 S3

 List String  FileNames = new ArrayListString();

 ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
 .withBucketName(s3_bucket)
 .withPrefix(logs_dir);

 ObjectListing objectListing;


 do {
 objectListing = s3Client.listObjects(listObjectsRequest);
 for (S3ObjectSummary objectSummary :
 objectListing.getObjectSummaries()) {

 if
 ((objectSummary.getLastModified().compareTo(dayBefore)  0)  
 (objectSummary.getLastModified().compareTo(dayAfter) 1) 
 objectSummary.getKey().contains(.log))
 FileNames.add(objectSummary.getKey());
 }

 listObjectsRequest.setMarker(objectListing.getNextMarker());
 } while (objectListing.isTruncated());

 I would like to process these files using Spark

 I understand that textFile reads a single text file. Is there any way to
 read all these files that are part of the List?

 Thanks for your help.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Read-multiple-files-from-S3-tp22965.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: rdd.saveAsTextFile problem

2015-05-21 Thread Keerthi
Hi ,

I had tried the workaround shared here, but still facing the same issue...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p22970.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FP Growth saveAsTextFile

2015-05-21 Thread Xiangrui Meng
+user

If this was in cluster mode, you should provide a path on a shared file
system, e.g., HDFS, instead of a local path. If this is in local model, I'm
not sure what went wrong.

On Wed, May 20, 2015 at 2:09 PM, Eric Tanner eric.tan...@justenough.com
wrote:

 Here is the stack trace. Thanks for looking at this.

 scala
 model.freqItemsets.saveAsTextFile(c:///repository/trunk/Scala_210_wspace/fpGrowth/modelText1)
 15/05/20 14:07:47 INFO SparkContext: Starting job: saveAsTextFile at
 console:33
 15/05/20 14:07:47 INFO DAGScheduler: Got job 15 (saveAsTextFile at
 console:33) with 2 output partitions (allowLocal=false)
 15/05/20 14:07:47 INFO DAGScheduler: Final stage: Stage 30(saveAsTextFile
 at console:33)
 15/05/20 14:07:47 INFO DAGScheduler: Parents of final stage: List(Stage 29)
 15/05/20 14:07:47 INFO DAGScheduler: Missing parents: List()
 15/05/20 14:07:47 INFO DAGScheduler: Submitting Stage 30
 (MapPartitionsRDD[21] at saveAsTextFile at console:33), which has no
 missing parents
 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(131288) called with
 curMem=724428, maxMem=278302556
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18 stored as values in
 memory (estimated size 128.2 KB, free 264.6 MB)
 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(78995) called with
 curMem=855716, maxMem=278302556
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18_piece0 stored as
 bytes in memory (estimated size 77.1 KB, free 264.5 MB)
 15/05/20 14:07:47 INFO BlockManagerInfo: Added broadcast_18_piece0 in
 memory on localhost:52396 (size: 77.1 KB, free: 265.1 MB)
 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
 broadcast_18_piece0
 15/05/20 14:07:47 INFO SparkContext: Created broadcast 18 from broadcast
 at DAGScheduler.scala:839
 15/05/20 14:07:47 INFO DAGScheduler: Submitting 2 missing tasks from Stage
 30 (MapPartitionsRDD[21] at saveAsTextFile at console:33)
 15/05/20 14:07:47 INFO TaskSchedulerImpl: Adding task set 30.0 with 2 tasks
 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 17
 15/05/20 14:07:47 INFO TaskSetManager: Starting task 0.0 in stage 30.0
 (TID 33, localhost, PROCESS_LOCAL, 1056 bytes)
 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17_piece0
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17_piece0 of size 4737
 dropped from memory (free 277372582)
 15/05/20 14:07:47 INFO TaskSetManager: Starting task 1.0 in stage 30.0
 (TID 34, localhost, PROCESS_LOCAL, 1056 bytes)
 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_17_piece0 on
 localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB)
 15/05/20 14:07:47 INFO Executor: Running task 1.0 in stage 30.0 (TID 34)
 15/05/20 14:07:47 INFO Executor: Running task 0.0 in stage 30.0 (TID 33)
 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
 broadcast_17_piece0
 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17 of size 6696
 dropped from memory (free 277379278)
 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 17
 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 16
 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16_piece0
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16_piece0 of size 4737
 dropped from memory (free 277384015)
 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_16_piece0 on
 localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB)
 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
 broadcast_16_piece0
 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16 of size 6696
 dropped from memory (free 277390711)
 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 16
 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty
 blocks out of 2 blocks
 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote
 fetches in 1 ms
 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty
 blocks out of 2 blocks
 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote
 fetches in 0 ms
 15/05/20 14:07:47 ERROR Executor: Exception in task 1.0 in stage 30.0 (TID
 34)
 java.lang.NullPointerException
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
 at org.apache.hadoop.util.Shell.run(Shell.java:455)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
 at org.apache.hadoop.util.Shell.execCommand(Shell.java:808)
 at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:656)
 at
 org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:490)
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:462)
 at
 

Re: rdd.saveAsTextFile problem

2015-05-21 Thread Akhil Das
This thread happened a year back, can you please share what issue you are
facing? which version of spark you are using? What is your system
environment? Exception stack-trace?

Thanks
Best Regards

On Thu, May 21, 2015 at 12:19 PM, Keerthi keerthi.reddy1...@gmail.com
wrote:

 Hi ,

 I had tried the workaround shared here, but still facing the same issue...

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p22970.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: java program Get Stuck at broadcasting

2015-05-21 Thread Allan Jie
Sure, the code is very simple. I think u guys can understand from the main
function.

public class Test1 {

public static double[][] createBroadcastPoints(String localPointPath, int
row, int col) throws IOException{
BufferedReader br = RAWF.reader(localPointPath);
String line = null;
int rowIndex = 0;
double[][] pointFeatures = new double[row][col];
while((line = br.readLine())!=null){
ListString point = Arrays.asList(line.split(,));
int colIndex = 0;
for(String pointFeature: point){
pointFeatures[rowIndex][colIndex] = Double.valueOf(pointFeature);
colIndex++;
}
rowIndex++;
}
br.close();
return pointFeatures;
}
  public static void main(String[] args) throws IOException{
/**Parameter Setting***/
 String localPointPath = /home/hduser/skyrock/skyrockImageFeatures.csv;
 String remoteFilePath =
hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv;
//this csv file is only 468MB
 final int row = 133433;
 final int col = 458;
 /**/
 SparkConf conf = new SparkConf().
setAppName(distance).
setMaster(spark://HadoopV26Master:7077).
set(spark.executor.memory, 4g).
set(spark.eventLog.enabled, true)
.set(spark.eventLog.dir, /usr/local/spark/logs/spark-events)
.set(spark.local.dir, /tmp/spark-temp);
JavaSparkContext sc = new JavaSparkContext(conf);
 JavaRDDString textFile = sc.textFile(remoteFilePath);
//Broadcast variable
//double[][] xx =;
 final Broadcastdouble[][] broadcastPoints =
sc.broadcast(createBroadcastPoints(localPointPath,row,col));
//final Broadcastdouble[][] broadcastPoints = sc.broadcast(xx);
 /**
 * Compute the distance in terms of each point on each instance.
 * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
 */
JavaPairRDDPair,Double distance = textFile.flatMapToPair(new
PairFlatMapFunctionString, Pair, Double(){
public IterableTuple2Pair, Double call(String v1) throws Exception{
ListString al = Arrays.asList(v1.split(,));
double[] featureVals = new double[al.size()];
for(int j=0;jal.size()-1;j++)
featureVals[j] = Double.valueOf(al.get(j+1));
int jIndex = Integer.valueOf(al.get(0));
double[][] allPoints =  broadcastPoints.getValue();
double sum = 0;
ListTuple2Pair, Double list = new ArrayListTuple2Pair, Double();
for(int i=0;irow; i++){
sum = 0;
for(int j=0;jal.size()-1;j++){
sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
}
list.add(new Tuple2Pair,Double(new Pair(i,jIndex),Math.sqrt(sum)));
}
return list;
}
});
 distance.saveAsTextFile(hdfs://HadoopV26Master:9000/user/+args[0]);
}
}


On 21 May 2015 at 16:44, Akhil Das ak...@sigmoidanalytics.com wrote:

 Can you share the code, may be i/someone can help you out

 Thanks
 Best Regards

 On Thu, May 21, 2015 at 1:45 PM, Allan Jie allanmcgr...@gmail.com wrote:

 Hi,

 Just check the logs of datanode, it looks like this:

 2015-05-20 11:42:14,605 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /
 10.9.0.48:50676, dest: /10.9.0.17:50010, bytes: 134217728, op:
 HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_804680172_54, offset: 0, srvID:
 39fb78d5-828a-4319-8303-c704fab526e3, blockid:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273, duration:
 16994466261
 2015-05-20 11:42:14,606 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273,
 type=LAST_IN_PIPELINE, downstreams=0:[] terminating
 2015-05-20 11:42:17,788 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /
 10.9.0.17:49046, dest: /10.9.0.17:50010, bytes: 134217728, op:
 HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_102926009_54, offset: 0, srvID:
 39fb78d5-828a-4319-8303-c704fab526e3, blockid:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276, duration:
 17829554438
 2015-05-20 11:42:17,788 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276,
 type=HAS_DOWNSTREAM_IN_PIPELINE terminating
 2015-05-20 11:42:17,904 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving
 BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 src: /
 10.9.0.17:49049 dest: /10.9.0.17:50010
 2015-05-20 11:42:17,904 WARN
 org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in
 BlockReceiver constructor. Cause is
 2015-05-20 11:42:17,904 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: opWriteBlock
 BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 received exception
 org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space:
 The volume with the most available space (=114409472 B) is less than the
 block size (=134217728 B).
 2015-05-20 11:42:17,905 ERROR
 org.apache.hadoop.hdfs.server.datanode.DataNode:
 HadoopV26Slave1:50010:DataXceiver error processing WRITE_BLOCK operation
  src: /10.9.0.17:49049 dst: /10.9.0.17:50010
 org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space:
 The volume with the most available space (=114409472 B) is less than the
 block size 

Re: java program got Stuck at broadcasting

2015-05-21 Thread allanjie
Sure, the code is very simple. I think u guys can understand from the main
function.

public class Test1 {

public static double[][] createBroadcastPoints(String localPointPath, 
int
row, int col) throws IOException{
BufferedReader br = RAWF.reader(localPointPath);
String line = null;
int rowIndex = 0;
double[][] pointFeatures = new double[row][col];
while((line = br.readLine())!=null){
ListString point = Arrays.asList(line.split(,));
int colIndex = 0;
for(String pointFeature: point){
pointFeatures[rowIndex][colIndex] = 
Double.valueOf(pointFeature);
colIndex++;
}
rowIndex++;
}
br.close();
return pointFeatures;
}



public static void main(String[] args) throws IOException{
/**Parameter Setting***/
 String localPointPath = 
/home/hduser/skyrock/skyrockImageFeatures.csv;
 String remoteFilePath =
hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv;
//this csv file is only 468MB
 final int row = 133433;
 final int col = 458;
 /**/

SparkConf conf = new SparkConf().
setAppName(distance).
setMaster(spark://HadoopV26Master:7077).
set(spark.executor.memory, 4g).
set(spark.eventLog.enabled, true)
.set(spark.eventLog.dir, 
/usr/local/spark/logs/spark-events)
.set(spark.local.dir, /tmp/spark-temp);
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDDString textFile = sc.textFile(remoteFilePath);
//Broadcast variable
//double[][] xx =;

final Broadcastdouble[][] broadcastPoints =
sc.broadcast(createBroadcastPoints(localPointPath,row,col));
//final Broadcastdouble[][] broadcastPoints = 
sc.broadcast(xx);

/**
 * Compute the distance in terms of each point on each instance.
 * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
 */
JavaPairRDDPair,Double distance = textFile.flatMapToPair(new
PairFlatMapFunctionString, Pair, Double(){
public IterableTuple2lt;Pair, Double call(String 
v1) throws
Exception{
ListString al = Arrays.asList(v1.split(,)); 
double[] featureVals = new double[al.size()];
for(int j=0;jal.size()-1;j++)
featureVals[j] = 
Double.valueOf(al.get(j+1));
int jIndex = Integer.valueOf(al.get(0));
double[][] allPoints =  
broadcastPoints.getValue();
double sum = 0;
Listlt;Tuple2lt;Pair, Double list = new 
ArrayListTuple2lt;Pair,
Double();
for(int i=0;irow; i++){
sum = 0;
for(int j=0;jlt;al.size()-1;j++){
sum +=
(allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
}
list.add(new Tuple2lt;Pair,Double(new
Pair(i,jIndex),Math.sqrt(sum)));
}
return list;
}
});



distance.saveAsTextFile(hdfs://HadoopV26Master:9000/user/+args[0]);
}
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



?????? How to use spark to access HBase with Security enabled

2015-05-21 Thread donhoff_h
Hi,

Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. 
According to your advice I have changed the configuration. Now my program can 
read the hbase-site.xml correctly. And it can also authenticate with zookeeper 
successfully. 

But I meet a new problem that is my program still can not pass the 
authentication of HBase. Did you or anybody else ever meet such kind of 
situation ?   I used a keytab file to provide the principal. Since it can pass 
the authentication of the Zookeeper, I am sure the keytab file is OK. But it 
jsut can not pass the authentication of HBase. The exception is listed below 
and could you or anybody else help me ? Still many many thanks!

Exception***
15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, 
connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 
sessionTimeout=9 watcher=hconnection-0x4e142a710x0, 
quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, 
baseZNode=/hbase
15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as 
SASL mechanism.
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to 
server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using 
Login Context section 'Client'
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to 
bgdt02.dev.hrb/130.1.9.98:2181, initiating session
15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 
21 16:03:18 CST 2015
15/05/21 16:03:18 INFO zookeeper.Login: TGT expires:  Fri May 
22 16:03:18 CST 2015
15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 
11:43:32 CST 2015
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on 
server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, 
negotiated timeout = 4
15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called 
multiple times. Overwriting connection and table reference; 
TableInputFormatBase will not close these old references when done.
15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes for 
table ns_dev1:hd01.
15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while 
connecting to the server : javax.security.sasl.SaslException: GSS initiate 
failed [Caused by GSSException: No valid credentials provided (Mechanism level: 
Failed to find any Kerberos tgt)]
15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication failed. The 
most likely cause is missing or invalid credentials. Consider 'kinit'.
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
at 
org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:730)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:727)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:727)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:880)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:849)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1173)
at 
org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
at 
org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
at 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:31751)
at 
org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:332)
at 
org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:187)
at 
org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:62)

Re: Spark Streaming - Design considerations/Knobs

2015-05-21 Thread Hemant Bhanawat
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks
for reading and replying. However, I have a follow-up question:

I don't think if I understand the block replication completely. Are the
blocks replicated immediately after they are received by the receiver? Or
are they kept on the receiver node only and are moved only on shuffle? Has
the replication something to do with locality.wait?

Thanks,
Hemant

On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com wrote:

 Correcting the ones that are incorrect or incomplete. BUT this is good
 list for things to remember about Spark Streaming.


 On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates blocks
of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval where 
 N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are 
 partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably 
 it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
 received the block, and another where the block was replicated) that has
 the blocks irrespective of block interval, unless non-local scheduling
 kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the 
 local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a job.
At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs 
 is
not impacted.

 To further clarify, the jobs depend on the number of output operations
 (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
 output operations.

 dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark
 job per batch

 dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() }
 }// TWO Spark jobs per batch

 dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
 rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently 
 there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
 spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata.
 Only data checkpointing, needed by only some operations, increase batch
 processing time. Read -
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
 Furthemore, with checkpoint you can recover computation, but you 

Re: Query a Dataframe in rdd.map()

2015-05-21 Thread Ram Sriharsha
never mind... i didnt realize you were referring to the first table as df.
so you want to do a join between the first table and an RDD?
the right way to do it within the data frame construct is to think of it as
a join... map the second RDD to a data frame and do an inner join on ip

On Thu, May 21, 2015 at 10:54 AM, Ram Sriharsha sriharsha@gmail.com
wrote:

 Your original code snippet seems incomplete and there isn't enough
 information to figure out what problem you actually ran into

 from your original code snippet there is an rdd variable which is well
 defined  and a df variable that is not defined in the snippet of code you
 sent

 one way to make this work is as below (until the last line is executed you
 are actually not collecting anything on the driver, and if your dataset is
 too big to collect on the driver for inspection just do a take(n) on the
 result

 from pyspark.sql import Row,SQLContext
 from pyspark.sql.functions import count

 sqlContext = SQLContext(sc)

 # convert list of ip into a data frame with column ip
 Record = Row(ip)
 df = sc.parallelize(map(lambda x: Record(x), ['208.51.22.18',
 '31.207.6.173', '208.51.22.18'])).toDF()

 # obtain ip - frequency and inspect
 df.groupBy(df.ip).agg(count(df.ip)).show()

 ++-+
 |  ip|COUNT(ip)|
 ++-+
 |208.51.22.18|2|
 |31.207.6.173|1|
 ++-+

 what exactly is the issue you are running into when you say it doesn't get
 through?

 On Thu, May 21, 2015 at 10:47 AM, ping yan sharon...@gmail.com wrote:

 Thanks. I suspected that, but figured that df query inside a map sounds
 so intuitive that I don't just want to give up.

 I've tried join and even better with a DStream.transform() and it works!
 freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y):
 y[1]))

 Thank you for the help!

 Ping

 On Thu, May 21, 2015 at 10:40 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So DataFrames, like RDDs, can only be accused from the driver. If your
 IP Frequency table is small enough you could collect it and distribute it
 as a hashmap with broadcast or you could also join your rdd with the ip
 frequency table. Hope that helps :)


 On Thursday, May 21, 2015, ping yan sharon...@gmail.com wrote:

 I have a dataframe as a reference table for IP frequencies.
 e.g.,

 ip   freq
 10.226.93.67 1
 10.226.93.69 1
 161.168.251.101   4
 10.236.70.2   1
 161.168.251.105 14


 All I need is to query the df in a map.

 rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18'])

 freqs = rdd.map(lambda x: df.where(df.ip ==x ).first())

 It doesn't get through.. would appreciate any help.

 Thanks!
 Ping




 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721



 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau




 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721





Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-05-21 Thread Tathagata Das
Looks like somehow the file size reported by the FSInputDStream of
Tachyon's FileSystem interface, is returning zero.

On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Just to follow up this thread further .

 I was doing some fault tolerant testing of Spark Streaming with Tachyon as
 OFF_HEAP block store. As I said in earlier email, I could able to solve the
 BlockNotFound exception when I used Hierarchical Storage of Tachyon ,
  which is good.

 I continue doing some testing around storing the Spark Streaming WAL and
 CheckPoint files also in Tachyon . Here is few finding ..


 When I store the Spark Streaming Checkpoint location in Tachyon , the
 throughput is much higher . I tested the Driver and Receiver failure cases
 , and Spark Streaming is able to recover without any Data Loss on Driver
 failure.

 *But on Receiver failure , Spark Streaming looses data* as I see
 Exception while reading the WAL file from Tachyon receivedData location
  for the same Receiver id which just failed.

 If I change the Checkpoint location back to HDFS , Spark Streaming can
 recover from both Driver and Receiver failure .

 Here is the Log details when Spark Streaming receiver failed ...I raised a
 JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525



 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch
 1)*
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
 remove executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
 block manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
 successfully in removeExecutor
 INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
 receiver for stream 2 from 10.252.5.62*:47255
 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not
 read data from write ahead log record
 FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)*
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IllegalArgumentException:* Seek position is past
 EOF: 645603894, fileSize = 0*
 at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
 at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
 ... 15 more

 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
 stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
 INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage
 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException
 (Could not read data from write ahead log record
 FileBasedWriteAheadLogSegment(tachyon-ft://
 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
 [duplicate 1]
 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in
 stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
 INFO : org.apache.spark.deploy.client.AppClient$ClientActor - 

Re: Query a Dataframe in rdd.map()

2015-05-21 Thread ping yan
Thanks. I suspected that, but figured that df query inside a map sounds so
intuitive that I don't just want to give up.

I've tried join and even better with a DStream.transform() and it works!
freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y):
y[1]))

Thank you for the help!

Ping

On Thu, May 21, 2015 at 10:40 AM, Holden Karau hol...@pigscanfly.ca wrote:

 So DataFrames, like RDDs, can only be accused from the driver. If your IP
 Frequency table is small enough you could collect it and distribute it as a
 hashmap with broadcast or you could also join your rdd with the ip
 frequency table. Hope that helps :)


 On Thursday, May 21, 2015, ping yan sharon...@gmail.com wrote:

 I have a dataframe as a reference table for IP frequencies.
 e.g.,

 ip   freq
 10.226.93.67 1
 10.226.93.69 1
 161.168.251.101   4
 10.236.70.2   1
 161.168.251.105 14


 All I need is to query the df in a map.

 rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18'])

 freqs = rdd.map(lambda x: df.where(df.ip ==x ).first())

 It doesn't get through.. would appreciate any help.

 Thanks!
 Ping




 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721



 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau




-- 
Ping Yan
Ph.D. in Management
Dept. of Management Information Systems
University of Arizona
Tucson, AZ 85721


RE: rdd.sample() methods very slow

2015-05-21 Thread Wang, Ningjun (LNG-NPV)
I don't need to be 100% randome. How about randomly pick a few partitions and 
return all docs in those partitions? Is 
rdd.mapPartitionsWithIndex() the right method to use to just process a small 
portion of partitions?

Ningjun

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, May 21, 2015 11:30 AM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: rdd.sample() methods very slow

I guess the fundamental issue is that these aren't stored in a way that allows 
random access to a Document.

Underneath, Hadoop has a concept of a MapFile which is like a SequenceFile with 
an index of offsets into the file where records being. Although Spark doesn't 
use it, you could maybe create some custom RDD that takes advantage of this 
format to grab random elements efficiently.

Other things come to mind but I think they're all slower -- like hashing all 
the docs and taking the smallest n in each of k partitions to get a pretty 
uniform random sample of kn docs.


On Thu, May 21, 2015 at 4:04 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:
 Is there any other way to solve the problem? Let me state the use case



 I have an RDD[Document] contains over 7 millions items. The RDD need 
 to be save on a persistent storage (currently I save it as object file on 
 disk).
 Then I need to get a small random sample of Document objects (e.g. 
 10,000 document). How can I do this quickly? The rdd.sample() methods 
 does not help because it need to read the entire RDD of 7 million 
 Document from disk which take very long time.



 Ningjun



 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Tuesday, May 19, 2015 4:51 PM
 To: Wang, Ningjun (LNG-NPV)
 Cc: user@spark.apache.org
 Subject: Re: rdd.sample() methods very slow



 The way these files are accessed is inherently sequential-access. 
 There isn't a way to in general know where record N is in a file like 
 this and jump to it. So they must be read to be sampled.





 On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:

 Hi



 I have an RDD[Document] that contains 7 million objects and it is 
 saved in file system as object file. I want to get a random sample of 
 about 70 objects from it using rdd.sample() method. It is ver slow





 val rdd : RDD[Document] =
 sc.objectFile[Document](C:/temp/docs.obj).sample(false, 0.1D,
 0L).cache()

 val count = rdd.count()



 From Spark UI, I see spark is try to read the entire object files at 
 the folder “C:/temp/docs.obj” which is about 29.7 GB. Of course this 
 is very slow. Why does Spark try to read entire 7 million objects 
 while I only need to return a random sample of 70 objects?



 Is there any efficient way to get a random sample of 70 objects 
 without reading through the entire object files?



 Ningjun





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pipelining with Spark

2015-05-21 Thread dgoldenberg
From the performance and scalability standpoint, is it better to plug in, say
a multi-threaded pipeliner into a Spark job, or implement pipelining via
Spark's own transformation mechanisms such as e.g. map or filter?

I'm seeing some reference architectures where things like 'morphlines' are
plugged into Spark but it'd seem that Spark may yield better performance and
scalability if each stage within a pipeline is a function in a Spark job - ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pipelining-with-Spark-tp22976.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to use hive queries with constants in predicates

2015-05-21 Thread Yana Kadiyska
I have not seen this error but have seen another user have weird parser
issues before:

http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccag6lhyed_no6qrutwsxeenrbqjuuzvqtbpxwx4z-gndqoj3...@mail.gmail.com%3E

I would attach a debugger and see what is going on -- if I'm looking at the
right place (
http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser)
token 294 is RCURLY...which doesnt make much sense...

On Thu, May 21, 2015 at 2:10 AM, Devarajan Srinivasan 
devathecool1...@gmail.com wrote:

 Hi,

I was testing spark to read data from hive using HiveContext. I got the
 following error, when I used a simple query with constants in predicates.

   I am using spark 1.3*. *Anyone encountered error like this ??


 *Error:*


 Exception in thread main org.apache.spark.sql.AnalysisException:
 Unsupported language features in query: SELECT * from test_table where
 daily_partition='20150101'
 TOK_QUERY 1, 0,20, 81
   TOK_FROM 1, 10,14, 81
 TOK_TABREF 1, 12,14, 81
   TOK_TABNAME 1, 12,14, 81
 everest_marts_test 1, 12,12, 81
 voice_cdr 1, 14,14, 100
   TOK_INSERT 0, -1,-1, 0
 TOK_DESTINATION 0, -1,-1, 0
   TOK_DIR 0, -1,-1, 0
 TOK_TMP_FILE 0, -1,-1, 0
 TOK_SELECT 1, 0,8, 7
   TOK_SELEXPR 1, 2,2, 7
 TOK_TABLE_OR_COL 1, 2,2, 7
   callingpartynumber 1, 2,2, 7
   TOK_SELEXPR 1, 4,4, 26
 TOK_TABLE_OR_COL 1, 4,4, 26
   calledpartynumber 1, 4,4, 26
   TOK_SELEXPR 1, 6,6, 44
 TOK_TABLE_OR_COL 1, 6,6, 44
   chargingtime 1, 6,6, 44
   TOK_SELEXPR 1, 8,8, 57
 TOK_TABLE_OR_COL 1, 8,8, 57
   call_direction_key 1, 8,8, 57
 TOK_WHERE 1, 16,20, 131
   = 1, 18,20, 131
 TOK_TABLE_OR_COL 1, 18,18, 116
   daily_partition 1, 18,18, 116
 '20150101' 1, 20,20, 132

 scala.NotImplementedError: No parse rules for ASTNode type: 294, text:
 '20150101' :
 '20150101' 1, 20,20, 132
  +
 org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261)
   ;
 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261)
 at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$
 hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
 at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$
 hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.
 scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.
 scala:135)
 at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
 apply(Parsers.scala:242)
 at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
 apply(Parsers.scala:242)
 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
 scala:222)
 at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
 append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
 append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at scala.util.parsing.combinator.Parsers$Failure.append(
 Parsers.scala:202)
 at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
 append$1.apply(Parsers.scala:254)
 at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
 append$1.apply(Parsers.scala:254)
 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
 scala:222)
 at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$
 apply$14.apply(Parsers.scala:891)
 at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$
 apply$14.apply(Parsers.scala:891)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.
 scala:890)
 at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(
 PackratParsers.scala:110)
 at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(
 AbstractSparkSQLParser.scala:38)
 at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
 at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
 at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$
 SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
 at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$
 SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.
 scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.
 scala:135)
 at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
 apply(Parsers.scala:242)
 at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
 apply(Parsers.scala:242)
 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
 scala:222)
 at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
 append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at 

Re: Question about Serialization in Storage Level

2015-05-21 Thread Todd Nist
From the docs,
https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence:

Storage LevelMeaningMEMORY_ONLYStore RDD as deserialized Java objects in
the JVM. If the RDD does not fit in memory, some partitions will not be
cached and will be recomputed on the fly each time they're needed. This is
the default level.MEMORY_AND_DISKStore RDD as *deserialized* Java objects
in the JVM. If the RDD does not fit in memory, store the partitions that
don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SERStore RDD as *serialized* Java objects (one byte array per
partition). This is generally more space-efficient than deserialized
objects, especially when using a fast serializer
https://spark.apache.org/docs/latest/tuning.html, but more CPU-intensive
to read.MEMORY_AND_DISK_SERSimilar to *MEMORY_ONLY_SER*, but spill
partitions that don't fit in memory to disk instead of recomputing them on
the fly each time they're needed.

On Thu, May 21, 2015 at 3:52 AM, Jiang, Zhipeng zhipeng.ji...@intel.com
wrote:

  Hi there,



 This question may seem to be kind of naïve, but what’s the difference
 between *MEMORY_AND_DISK* and *MEMORY_AND_DISK_SER*?



 If I call *rdd.persist(StorageLevel.MEMORY_AND_DISK)*, the BlockManager
 won’t serialize the *rdd*?



 Thanks,

 Zhipeng



Re: Spark Streaming on top of Cassandra?

2015-05-21 Thread tshah77
Can some one provide example of Spark Streaming using Java?

I have cassandra running but did not configure spark but would like to
create Dstream.

Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-top-of-Cassandra-tp1283p22978.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming on top of Cassandra?

2015-05-21 Thread jay vyas
hi. I have a spark streaming - cassandra application which you can
probably borrow pretty easily.

You can always rewrite a part of it in java if you need to , or else, you
can just use scala (see the blog post below if you want a java style dev
workflow w/ scala using intellij)/

This application implements a spark stream w twitter and ETLs it into
either a file queue or cassandra (see the commented out cassandra snippet).

https://github.com/jayunit100/SparkStreamingApps/blob/master/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala

Cassandra sink works really well with the spark context compile time
bindings .

Maybe just clone this repo down and use it as a blueprint :) There is a
blog post here about how to set up your IDE

so that the dev workflow is very similar to that of standard java
http://jayunit100.blogspot.com/2014/07/set-up-spark-application-devleopment.html

good luck !.

On Thu, May 21, 2015 at 4:24 PM, tshah77 tejasrs...@gmail.com wrote:

 Can some one provide example of Spark Streaming using Java?

 I have cassandra running but did not configure spark but would like to
 create Dstream.

 Thanks




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-top-of-Cassandra-tp1283p22978.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
jay vyas


Re: Connecting to an inmemory database from Spark

2015-05-21 Thread Tathagata Das
Doesnt seem like a Cassandra specific issue. Could you give us more
information (code, errors, stack traces)?


On Thu, May 21, 2015 at 1:33 PM, tshah77 tejasrs...@gmail.com wrote:

 TD,

 Do you have any example about reading from cassandra using spark streaming
 in java?

 I am trying to connect to cassandra using spark streaming and it is
 throwing
 an error as could not parse master url.

 Thanks
 Tejas



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-tp1343p22979.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




foreach vs foreachPartitions

2015-05-21 Thread ben
I would like to know if the foreachPartitions will results in a better
performance,  due to an higher level of parallelism, compared to the foreach
method considering the case in which I'm flowing through an RDD in order to
perform some sums into an accumulator variable.

Thank you,
Beniamino.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/foreach-vs-foreachPartitions-tp22983.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Connecting to an inmemory database from Spark

2015-05-21 Thread tshah77
TD,

Do you have any example about reading from cassandra using spark streaming
in java?

I am trying to connect to cassandra using spark streaming and it is throwing
an error as could not parse master url.

Thanks
Tejas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-tp1343p22979.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use spark to access HBase with Security enabled

2015-05-21 Thread Ted Yu
Are the worker nodes colocated with HBase region servers ?

Were you running as hbase super user ?

You may need to login, using code similar to the following:

  if (isSecurityEnabled()) {

SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);

  }

SecurityUtil is hadoop class.


Cheers

On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote:

 Hi,

 Many thanks for the help. My Spark version is 1.3.0 too and I run it on
 Yarn. According to your advice I have changed the configuration. Now my
 program can read the hbase-site.xml correctly. And it can also authenticate
 with zookeeper successfully.

 But I meet a new problem that is my program still can not pass the
 authentication of HBase. Did you or anybody else ever meet such kind of
 situation ?  I used a keytab file to provide the principal. Since it can
 pass the authentication of the Zookeeper, I am sure the keytab file is OK.
 But it jsut can not pass the authentication of HBase. The exception is
 listed below and could you or anybody else help me ? Still many many thanks!

 Exception***
 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection,
 connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181
 sessionTimeout=9 watcher=hconnection-0x4e142a710x0,
 quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181,
 baseZNode=/hbase
 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI
 as SASL mechanism.
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to
 server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate
 using Login Context section 'Client'
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established
 to bgdt02.dev.hrb/130.1.9.98:2181, initiating session
 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu
 May 21 16:03:18 CST 2015
 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires:  Fri
 May 22 16:03:18 CST 2015
 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri
 May 22 11:43:32 CST 2015
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment
 complete on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid =
 0x24d46cb0ffd0020, negotiated timeout = 4
 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable
 called multiple times. Overwriting connection and table reference;
 TableInputFormatBase will not close these old references when done.
 15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes
 for table ns_dev1:hd01.
 15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while
 connecting to the server : javax.security.sasl.SaslException: GSS initiate
 failed [Caused by GSSException: No valid credentials provided (Mechanism
 level: Failed to find any Kerberos tgt)]
 15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication failed.
 The most likely cause is missing or invalid credentials. Consider 'kinit'.
 javax.security.sasl.SaslException: GSS initiate failed [Caused by
 GSSException: No valid credentials provided (Mechanism level: Failed to
 find any Kerberos tgt)]
 at
 com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
 at
 org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:730)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:727)
 at java.security.AccessController.doPrivileged(Native
 Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:727)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:880)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:849)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1173)
 at
 org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
 at
 org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
 

foreach plus accumulator Vs mapPartitions performance

2015-05-21 Thread ben
Hi, everybody.

There are some cases in which I can obtain the same results by using the
mapPartitions and the foreach method. 

For example in a typical MapReduce approach one would perform a reduceByKey
immediately after a mapPartitions that transform the original RDD in a
collection of tuple (key, value). I think that is possible to achieve the
same result by using, for instance an array of accumulator where at each
index an executor sums a value and the index itself could be a key.

Since the reduceByKey will perform a shuffle on disk I think that when is
possible, the foreach approach should be better even though the foreach has
the side effect of sum a value to an accumulator.

I am making this request to see if my reasoning is correct . I hope I was
clear.
Thank you, Beniamino



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/foreach-plus-accumulator-Vs-mapPartitions-performance-tp22982.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark MOOC - early access

2015-05-21 Thread Marco Shaw
*Hi Spark Devs and Users,BerkeleyX and Databricks are currently developing
two Spark-related MOOC on edX (intro
https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x,
ml
https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x),
the first of which starts on June 1st.  Together these courses have over
75K enrolled students!To help students perform exercises course content, we
have created a Vagrant box that contains Spark and IPython (running on
Ubuntu 32-bit).  This will simplify user setup and helps us support them.
We are writing to give you early access to the VM environment and the first
assignment, and to request your help to test out the VM/assignment before
we unleash it to 75K people (see instructions below). We’ve provided
instructions below.  We’re happy to help if you have any difficulties
getting the VM setup; please feel free to contact me (marco.s...@gmail.com
marco.s...@gmail.com)  with any issues, comments, or
questions.Sincerely,Marco ShawSpark MOOC TA_(This is being sent
as an HTML formatted email.  Some of the links have been duplicated just in
case.)1. Install VirtualBox here
https://www.virtualbox.org/wiki/Downloads on your OS (see Windows
tutorial here https://www.youtube.com/watch?v=06Sf-m64fcY
(https://www.youtube.com/watch?v=06Sf-m64fcY
https://www.youtube.com/watch?v=06Sf-m64fcY))2. Install Vagrant here
https://www.vagrantup.com/downloads.html on your OS (see Windows tutorial
here https://www.youtube.com/watch?v=LZVS23BaA1I
(https://www.youtube.com/watch?v=LZVS23BaA1I
https://www.youtube.com/watch?v=LZVS23BaA1I))3) Install virtual machine
using the following steps: (see Windows tutorial here
https://www.youtube.com/watch?v=ZuJCqHC7IYc
(https://www.youtube.com/watch?v=ZuJCqHC7IYc
https://www.youtube.com/watch?v=ZuJCqHC7IYc))a. Create a custom directory
(e.g. c:\users\marco\myvagrant or /home/marco/myvagrant)b. Download the
file
https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/Vagrantfile
to the custom directory (NOTE: It must be named exactly Vagrantfile with
no extension)c. Open a DOS prompt (Windows) or terminal (Mac/Linux) to the
custom directory and issue the command vagrant up4) Perform basic
commands in VM as described below: (see Windows tutorial here
https://www.youtube.com/watch?v=bkteLH77IR0
(https://www.youtube.com/watch?v=bkteLH77IR0
https://www.youtube.com/watch?v=bkteLH77IR0))a. To start the VM, from a
DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant
up.b. To stop the VM, from a DOS prompt (Windows) or terminal (Mac/Linux),
issue the command vagrant halt.c. To erase or delete the VM, from a DOS
prompt (Windows) or terminal (Mac/Linux), issue the command vagrant
destroy.d. Once the VM is running, to access the notebook, open a web
browser to http://localhost:8001 http://localhost:8001/.5) Using test
notebook as described below: (see Windows tutorial here
https://www.youtube.com/watch?v=mlfAmyF3Q-s
(https://www.youtube.com/watch?v=mlfAmyF3Q-s
https://www.youtube.com/watch?v=mlfAmyF3Q-s))a. To start the VM, from a
DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant
up.b. Once the VM is running, to access the notebook, open a web browser
to http://localhost:8001 http://localhost:8001/.c. Upload this IPython
notebook:
https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/vm_test_student.ipynb
https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/vm_test_student.ipynb.d.
Run through the notebook.6) Play around with the first MOOC assignment
(email Marco for details when you get to this point).7) Please answer the
following questionsa. What machine are you using (OS, RAM, CPU, age)?b. How
long did the entire process take?c. How long did the VM download take?
Relatedly, where are you located?d. Do you have any other
comments/suggestions?*


Re: How to use spark to access HBase with Security enabled

2015-05-21 Thread Bill Q
What I found with the CDH-5.4.1 Spark 1.3, the
spark.executor.extraClassPath setting is not working. Had to use
SPARK_CLASSPATH instead.

On Thursday, May 21, 2015, Ted Yu yuzhih...@gmail.com wrote:

 Are the worker nodes colocated with HBase region servers ?

 Were you running as hbase super user ?

 You may need to login, using code similar to the following:

   if (isSecurityEnabled()) {

 SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);

   }

 SecurityUtil is hadoop class.


 Cheers

 On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com
 javascript:_e(%7B%7D,'cvml','165612...@qq.com'); wrote:

 Hi,

 Many thanks for the help. My Spark version is 1.3.0 too and I run it on
 Yarn. According to your advice I have changed the configuration. Now my
 program can read the hbase-site.xml correctly. And it can also authenticate
 with zookeeper successfully.

 But I meet a new problem that is my program still can not pass the
 authentication of HBase. Did you or anybody else ever meet such kind of
 situation ?  I used a keytab file to provide the principal. Since it can
 pass the authentication of the Zookeeper, I am sure the keytab file is OK.
 But it jsut can not pass the authentication of HBase. The exception is
 listed below and could you or anybody else help me ? Still many many thanks!

 Exception***
 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection,
 connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181
 sessionTimeout=9 watcher=hconnection-0x4e142a710x0,
 quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181,
 baseZNode=/hbase
 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI
 as SASL mechanism.
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to
 server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate
 using Login Context section 'Client'
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection
 established to bgdt02.dev.hrb/130.1.9.98:2181, initiating session
 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu
 May 21 16:03:18 CST 2015
 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires:  Fri
 May 22 16:03:18 CST 2015
 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri
 May 22 11:43:32 CST 2015
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment
 complete on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid =
 0x24d46cb0ffd0020, negotiated timeout = 4
 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable
 called multiple times. Overwriting connection and table reference;
 TableInputFormatBase will not close these old references when done.
 15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region
 sizes for table ns_dev1:hd01.
 15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while
 connecting to the server : javax.security.sasl.SaslException: GSS initiate
 failed [Caused by GSSException: No valid credentials provided (Mechanism
 level: Failed to find any Kerberos tgt)]
 15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication
 failed. The most likely cause is missing or invalid credentials. Consider
 'kinit'.
 javax.security.sasl.SaslException: GSS initiate failed [Caused by
 GSSException: No valid credentials provided (Mechanism level: Failed to
 find any Kerberos tgt)]
 at
 com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
 at
 org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:730)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:727)
 at java.security.AccessController.doPrivileged(Native
 Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:727)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:880)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:849)
 at
 org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1173)
 at

Re: Spark HistoryServer not coming up

2015-05-21 Thread roy
This got resolved after cleaning /user/spark/applicationHistory/*



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-HistoryServer-not-coming-up-tp22975p22981.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



running spark on yarn

2015-05-21 Thread Nathan Kronenfeld
Hello, folks.

We just recently switched to using Yarn on our cluster (when upgrading to
cloudera 5.4.1)

I'm trying to run a spark job from within a broader application (a web
service running on Jetty), so I can't just start it using spark-submit.

Does anyone know of an instructions page on how to do that under yarn?
I've managed to get it mostly there by including all spark, yarn, hadoop,
and hdfs config files in my SparkConf (somewhat indirectly, and that is a
bit of a short-hand), but while the job shows up now under yarn, and has
its own applications web ui page, it's not showing up under the main spark
page, and it's still missing some things (like it can't find the native
library for snappy compression), so I still think I'm doing something wrong.

Any help or hints would be much appreciated.


Thanks,
-Nathan


Re: PySpark Logs location

2015-05-21 Thread Oleg Ruchovets
Doesn't work for me so far ,
   using command but got such output. What should I check to fix the issue?
Any configuration parameters  ...


[root@sdo-hdp-bd-master1 ~]# yarn logs -applicationId
application_1426424283508_0048
15/05/21 13:25:09 INFO impl.TimelineClientImpl: Timeline service address:
http://hdp-bd-node1.development.c4i:8188/ws/v1/timeline/
15/05/21 13:25:09 INFO client.RMProxy: Connecting to ResourceManager at
hdp-bd-node1.development.c4i/12.23.45.253:8050
/app-logs/root/logs/application_1426424283508_0048does not exist.
*Log aggregation has not completed or is not enabled.*

Thanks
Oleg.

On Wed, May 20, 2015 at 11:33 PM, Ruslan Dautkhanov dautkha...@gmail.com
wrote:

 Oleg,

 You can see applicationId in your Spark History Server.
 Go to http://historyserver:18088/

 Also check
 https://spark.apache.org/docs/1.1.0/running-on-yarn.html#debugging-your-application

 It should be no different with PySpark.


 --
 Ruslan Dautkhanov

 On Wed, May 20, 2015 at 2:12 PM, Oleg Ruchovets oruchov...@gmail.com
 wrote:

 Hi Ruslan.
   Could you add more details please.
 Where do I get applicationId? In case I have a lot of log files would it
 make sense to view it from single point.
 How actually I can configure / manage log location of PySpark?

 Thanks
 Oleg.

 On Wed, May 20, 2015 at 10:24 PM, Ruslan Dautkhanov dautkha...@gmail.com
  wrote:

 You could use

 yarn logs -applicationId application_1383601692319_0008



 --
 Ruslan Dautkhanov

 On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets oruchov...@gmail.com
 wrote:

 Hi ,

   I am executing PySpark job on yarn ( hortonworks distribution).

 Could someone pointing me where is the log locations?

 Thanks
 Oleg.







DataFrame Column Alias problem

2015-05-21 Thread SLiZn Liu
Hi Spark Users Group,

I’m doing groupby operations on my DataFrame *df* as following, to get
count for each value of col1:

 df.groupBy(col1).agg(col1 - count).show // I don't know if I should 
 write like this.
col1   COUNT(col1#347)
aaa2
bbb4
ccc4
...
and more...

As I ‘d like to sort by the resulting count, with .sort(COUNT(col1#347)),
but the column name of the count result obviously cannot be retrieved in
advance. Intuitively one might consider acquire column name by column index
in a fashion of R’s DataFrame, except Spark doesn’t support. I have
Googled *spark
agg alias* and so forth, and checked DataFrame.as in Spark API, neither
helped on this. Am I the only one who had ever got stuck on this issue or
anything I have missed?

REGARDS,
Todd Leo
​


Re: rdd.saveAsTextFile problem

2015-05-21 Thread Akhil Das
On Thu, May 21, 2015 at 4:17 PM, Howard Yang howardyang2...@gmail.com
wrote:

 follow
 http://www.srccodes.com/p/article/38/build-install-configure-run-apache-hadoop-2.2.0-microsoft-windows-os
 to build latest version Hadoop in my windows machine,
 and Add Environment Variable *HADOOP_HOME* and edit *Path* Variable to
 add *bin* directory of *HADOOP_HOME* (say*C:\hadoop\bin*).
 fix this issue in my env

 2015-05-21 9:55 GMT+03:00 Akhil Das ak...@sigmoidanalytics.com:

 This thread happened a year back, can you please share what issue you are
 facing? which version of spark you are using? What is your system
 environment? Exception stack-trace?

 Thanks
 Best Regards

 On Thu, May 21, 2015 at 12:19 PM, Keerthi keerthi.reddy1...@gmail.com
 wrote:

 Hi ,

 I had tried the workaround shared here, but still facing the same
 issue...

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p22970.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Multi user setup and saving a DataFrame / RDD to a network exported file system

2015-05-21 Thread Tomasz Fruboes

Hi,

 thanks for answer, I'll open a ticket.

 In the meantime - I have found a workaround. The recipe is the following:

1. Create a new account/group on all machines (lets call it sparkuser). 
Run spark from this account.


2. Add your user to group sparkuser.

3. If you decide to write RDD/parquet file under workdir directory you 
need to execute the following (just once, before running spark-submit):


chgrp sparkuser workdir
chmod g+s workdir
setfacl -d -m g::rwx workdir

(first two steps can be replaced also by newgrp sparkuser, but this 
way all your files will be created with sparkuser group)


 than calls like

rdd.saveAsPickleFile(workdir+/somename)

 work just fine.

 The above solution has one serious problem - any other user from 
sparkuser group will be able to overwrite your saved data.


 cheers,
   Tomasz






W dniu 20.05.2015 o 23:08, Davies Liu pisze:

Could you file a JIRA for this?

The executor should run under the user who submit a job, I think.

On Wed, May 20, 2015 at 2:40 AM, Tomasz Fruboes
tomasz.frub...@fuw.edu.pl wrote:

Thanks for a suggestion. I have tried playing with it, sc.sparkUser() gives
me expected user name, but it doesnt solve the problem. From a quick search
through the spark code it seems to me, that this setting is effective only
for yarn and mesos.

  I think the workaround for the problem could be using --deploy-mode
cluster (not 100% convenient, since disallows any interactive work), but
this is not supported for python based programs.

Cheers,
   Tomasz



W dniu 20.05.2015 o 10:57, Iulian Dragoș pisze:


You could try setting `SPARK_USER` to the user under which your workers
are running. I couldn't find many references to this variable, but at
least Yarn and Mesos take it into account when spawning executors.
Chances are that standalone mode also does it.

iulian

On Wed, May 20, 2015 at 9:29 AM, Tomasz Fruboes
tomasz.frub...@fuw.edu.pl mailto:tomasz.frub...@fuw.edu.pl wrote:

 Hi,

   thanks for answer. The rights are

 drwxr-xr-x 3 tfruboes all 5632 05-19 15 tel:5632%2005-19%2015:40
 test19EE/

   I have tried setting the rights to 777 for this directory prior to
 execution. This does not get propagated down the chain, ie the
 directory created as a result of the save call
 (namesAndAges.parquet2 in the path in the dump [1] below) is created
 with the drwxr-xr-x rights (owned by the user submitting the job, ie
 tfruboes). The temp directories created inside

 namesAndAges.parquet2/_temporary/0/

 (e.g. task_201505200920_0009_r_01) are owned by root, again with
 drwxr-xr-x access rights

   Cheers,
Tomasz

 W dniu 19.05.2015 o 23:56, Davies Liu pisze:

 It surprises me, could you list the owner information of
 /mnt/lustre/bigdata/med_home/tmp/test19EE/ ?

 On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes
 tomasz.frub...@fuw.edu.pl mailto:tomasz.frub...@fuw.edu.pl

 wrote:

 Dear Experts,

we have a spark cluster (standalone mode) in which master
 and workers are
 started from root account. Everything runs correctly to the
 point when we
 try doing operations such as

   dataFrame.select(name, age).save(ofile, parquet)

 or

   rdd.saveAsPickleFile(ofile)

 , where ofile is path on a network exported filesystem
 (visible on all
 nodes, in our case this is lustre, I guess on nfs effect
 would be similar).

Unsurprisingly temp files created on workers are owned by
 root, which then
 leads to a crash (see [1] below). Is there a
 solution/workaround for this
 (e.g. controlling file creation mode of the temporary files)?

 Cheers,
Tomasz


 ps I've tried to google this problem, couple of similar
 reports, but no
 clear answer/solution found

 ps2 For completeness - running master/workers as a regular
 user solves the
 problem only for the given user. For other users submitting
 to this master
 the result is given in [2] below


 [0] Cluster details:
 Master/workers: centos 6.5
 Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the
 2.6 build)


 [1]

##
  File

/mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 o27.save.
 : java.io.IOException: Failed to rename


Re: DataFrame Column Alias problem

2015-05-21 Thread Ram Sriharsha
df.groupBy($col1).agg(count($col1).as(c)).show

On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com wrote:

 Hi Spark Users Group,

 I’m doing groupby operations on my DataFrame *df* as following, to get
 count for each value of col1:

  df.groupBy(col1).agg(col1 - count).show // I don't know if I should 
  write like this.
 col1   COUNT(col1#347)
 aaa2
 bbb4
 ccc4
 ...
 and more...

 As I ‘d like to sort by the resulting count, with .sort(COUNT(col1#347)),
 but the column name of the count result obviously cannot be retrieved in
 advance. Intuitively one might consider acquire column name by column index
 in a fashion of R’s DataFrame, except Spark doesn’t support. I have Googled 
 *spark
 agg alias* and so forth, and checked DataFrame.as in Spark API, neither
 helped on this. Am I the only one who had ever got stuck on this issue or
 anything I have missed?

 REGARDS,
 Todd Leo
 ​



Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception

2015-05-21 Thread Grega Kešpret
Hi,
is this fixed in master?

Grega


On Thu, May 14, 2015 at 7:50 PM, Michael Armbrust mich...@databricks.com
wrote:

 End of the month is the target:
 https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage

 On Thu, May 14, 2015 at 3:45 AM, Ishwardeep Singh 
 ishwardeep.si...@impetus.co.in wrote:

  Hi Michael  Ayan,



 Thank you for your response to my problem.



 Michael do we have a tentative release date for Spark version 1.4?



 Regards,

 Ishwardeep





 *From:* Michael Armbrust [mailto:mich...@databricks.com]
 *Sent:* Wednesday, May 13, 2015 10:54 PM
 *To:* ayan guha
 *Cc:* Ishwardeep Singh; user
 *Subject:* Re: [Spark SQL 1.3.1] data frame saveAsTable returns exception



 I think this is a bug in our date handling that should be fixed in Spark
 1.4.



 On Wed, May 13, 2015 at 8:23 AM, ayan guha guha.a...@gmail.com wrote:

 Your stack trace says it can't convert date to integer. You sure about
 column positions?

 On 13 May 2015 21:32, Ishwardeep Singh ishwardeep.si...@impetus.co.in
 wrote:

 Hi ,

 I am using Spark SQL 1.3.1.

 I have created a dataFrame using jdbc data source and am using
 saveAsTable()
 method but got the following 2 exceptions:

 java.lang.RuntimeException: Unsupported datatype DecimalType()
 at scala.sys.package$.error(package.scala:27)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
 at scala.Option.getOrElse(Option.scala:120)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
 at
 org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:391)
 at

 org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)
 at

 org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128)
 at
 org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
 at

 org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:218)
 at

 org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54)
 at

 org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54)
 at
 org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64)
 at

 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099)
 at

 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099)
 at
 org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1121)
 at
 org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1071)
 at
 org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1037)
 at
 org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1015)

 java.lang.ClassCastException: java.sql.Date cannot be cast to
 java.lang.Integer
 at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:215)
 at

 

saveAsTextFile() part- files are missing

2015-05-21 Thread rroxanaioana
Hello!
I just started with Spark. I have an application which counts words in a
file (1 MB file). 
The file is stored locally. I loaded the file using native code and then
created the RDD from it.
   
JavaRDDString rddFromFile = context.parallelize(myFile,
2);
JavaRDDString words = rddFromFile.flatMap(...);
JavaPairRDDString, Integer pairs = words.mapToPair(...);
JavaPairRDDString, Integer counter = pairs.reduceByKey(..);

counter.saveAsTextFile(file:///root/output);
context.close();

I have one master and 2 slaves. I run the program from the master node.
The output directory is created on the master node and on the 2 nodes. On
the master node I have only one file _SUCCES (empty) and on the nodes I have
_temporary file. I printed the counter at the console, the result seems ok.
What am I doing wrong?
Thank you!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-part-files-are-missing-tp22974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: rdd.sample() methods very slow

2015-05-21 Thread Wang, Ningjun (LNG-NPV)
Is there any other way to solve the problem? Let me state the use case

I have an RDD[Document] contains over 7 millions items. The RDD need to be save 
on a persistent storage (currently I save it as object file on disk). Then I 
need to get a small random sample of Document objects (e.g. 10,000 document). 
How can I do this quickly? The rdd.sample() methods does not help because it 
need to read the entire RDD of 7 million Document from disk which take very 
long time.

Ningjun

From: Sean Owen [mailto:so...@cloudera.com]
Sent: Tuesday, May 19, 2015 4:51 PM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: rdd.sample() methods very slow

The way these files are accessed is inherently sequential-access. There isn't a 
way to in general know where record N is in a file like this and jump to it. So 
they must be read to be sampled.


On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote:
Hi

I have an RDD[Document] that contains 7 million objects and it is saved in file 
system as object file. I want to get a random sample of about 70 objects from 
it using rdd.sample() method. It is ver slow


val rdd : RDD[Document] = 
sc.objectFile[Document](C:/temp/docs.obj).sample(false, 0.1D, 0L).cache()
val count = rdd.count()

From Spark UI, I see spark is try to read the entire object files at the folder 
“C:/temp/docs.obj” which is about 29.7 GB. Of course this is very slow. Why 
does Spark try to read entire 7 million objects while I only need to return a 
random sample of 70 objects?

Is there any efficient way to get a random sample of 70 objects without reading 
through the entire object files?

Ningjun




Re: rdd.sample() methods very slow

2015-05-21 Thread Sean Owen
I guess the fundamental issue is that these aren't stored in a way
that allows random access to a Document.

Underneath, Hadoop has a concept of a MapFile which is like a
SequenceFile with an index of offsets into the file where records
being. Although Spark doesn't use it, you could maybe create some
custom RDD that takes advantage of this format to grab random elements
efficiently.

Other things come to mind but I think they're all slower -- like
hashing all the docs and taking the smallest n in each of k partitions
to get a pretty uniform random sample of kn docs.


On Thu, May 21, 2015 at 4:04 PM, Wang, Ningjun (LNG-NPV)
ningjun.w...@lexisnexis.com wrote:
 Is there any other way to solve the problem? Let me state the use case



 I have an RDD[Document] contains over 7 millions items. The RDD need to be
 save on a persistent storage (currently I save it as object file on disk).
 Then I need to get a small random sample of Document objects (e.g. 10,000
 document). How can I do this quickly? The rdd.sample() methods does not help
 because it need to read the entire RDD of 7 million Document from disk which
 take very long time.



 Ningjun



 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Tuesday, May 19, 2015 4:51 PM
 To: Wang, Ningjun (LNG-NPV)
 Cc: user@spark.apache.org
 Subject: Re: rdd.sample() methods very slow



 The way these files are accessed is inherently sequential-access. There
 isn't a way to in general know where record N is in a file like this and
 jump to it. So they must be read to be sampled.





 On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV)
 ningjun.w...@lexisnexis.com wrote:

 Hi



 I have an RDD[Document] that contains 7 million objects and it is saved in
 file system as object file. I want to get a random sample of about 70
 objects from it using rdd.sample() method. It is ver slow





 val rdd : RDD[Document] =
 sc.objectFile[Document](C:/temp/docs.obj).sample(false, 0.1D,
 0L).cache()

 val count = rdd.count()



 From Spark UI, I see spark is try to read the entire object files at the
 folder “C:/temp/docs.obj” which is about 29.7 GB. Of course this is very
 slow. Why does Spark try to read entire 7 million objects while I only need
 to return a random sample of 70 objects?



 Is there any efficient way to get a random sample of 70 objects without
 reading through the entire object files?



 Ningjun





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



map reduce ?

2015-05-21 Thread Yasemin Kaya
Hi,

I have JavaPairRDDString, ListInteger and as an example what I want to
get.


user_id

cat1

cat2

cat3

cat4

522

0

1

2

0

62

1

0

3

0

661

1

2

0

1


query : the users who have a number (except 0) in cat1 and cat3 column
answer: cat2 - 522,611  cat3-522,62 = user 522

How can I get this solution?
I think at first, I should have JavaRDDListString of user list who are
in that column.

Thank you

Best,
yasemin

-- 
hiç ender hiç


Re: java program Get Stuck at broadcasting

2015-05-21 Thread Allan Jie
Hey, I think I found out the problem. Turns out that the file I saved is
too large.


On 21 May 2015 at 16:44, Akhil Das ak...@sigmoidanalytics.com wrote:

 Can you share the code, may be i/someone can help you out

 Thanks
 Best Regards

 On Thu, May 21, 2015 at 1:45 PM, Allan Jie allanmcgr...@gmail.com wrote:

 Hi,

 Just check the logs of datanode, it looks like this:

 2015-05-20 11:42:14,605 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /
 10.9.0.48:50676, dest: /10.9.0.17:50010, bytes: 134217728, op:
 HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_804680172_54, offset: 0, srvID:
 39fb78d5-828a-4319-8303-c704fab526e3, blockid:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273, duration:
 16994466261
 2015-05-20 11:42:14,606 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273,
 type=LAST_IN_PIPELINE, downstreams=0:[] terminating
 2015-05-20 11:42:17,788 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /
 10.9.0.17:49046, dest: /10.9.0.17:50010, bytes: 134217728, op:
 HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_102926009_54, offset: 0, srvID:
 39fb78d5-828a-4319-8303-c704fab526e3, blockid:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276, duration:
 17829554438
 2015-05-20 11:42:17,788 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder:
 BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276,
 type=HAS_DOWNSTREAM_IN_PIPELINE terminating
 2015-05-20 11:42:17,904 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving
 BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 src: /
 10.9.0.17:49049 dest: /10.9.0.17:50010
 2015-05-20 11:42:17,904 WARN
 org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in
 BlockReceiver constructor. Cause is
 2015-05-20 11:42:17,904 INFO
 org.apache.hadoop.hdfs.server.datanode.DataNode: opWriteBlock
 BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 received exception
 org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space:
 The volume with the most available space (=114409472 B) is less than the
 block size (=134217728 B).
 2015-05-20 11:42:17,905 ERROR
 org.apache.hadoop.hdfs.server.datanode.DataNode:
 HadoopV26Slave1:50010:DataXceiver error processing WRITE_BLOCK operation
  src: /10.9.0.17:49049 dst: /10.9.0.17:50010
 org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space:
 The volume with the most available space (=114409472 B) is less than the
 block size (=134217728 B).
 at
 org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy.chooseVolume(RoundRobinVolumeChoosingPolicy.java:67)
 at
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList.getNextVolume(FsVolumeList.java:69)
 at
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:1084)
 at
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:114)
 at
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.init(BlockReceiver.java:183)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:615)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:137)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:74)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:235)
 at java.lang.Thread.run(Thread.java:745)
 2015-05-20 11:43:59,669 INFO
 org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification
 succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073741999_1176
 2015-05-20 11:46:10,214 INFO
 org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification
 succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073742000_1177
 2015-05-20 11:48:35,445 INFO
 org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification
 succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073741990_1167
 2015-05-20 11:50:04,043 INFO
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
 Scheduling blk_1073742080_1257 file
 /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742080
 for deletion
 2015-05-20 11:50:04,136 INFO
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
 Scheduling blk_1073742081_1258 file
 /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742081
 for deletion
 2015-05-20 11:50:04,136 INFO
 org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
 Scheduling blk_1073742082_1259 file
 /tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742082
 for deletion
 2015-05-20 11:50:04,136 INFO
 

Pandas timezone problems

2015-05-21 Thread Def_Os
After deserialization, something seems to be wrong with my pandas DataFrames.
It looks like the timezone information is lost, and subsequent errors ensue.

Serializing and deserializing a timezone-aware DataFrame tests just fine, so
it must be Spark that somehow changes the data.

My program runs timezone-unaware data without problems.

Anybody have any ideas on what causes this, or how to solve it?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pandas-timezone-problems-tp22985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pandas timezone problems

2015-05-21 Thread Xiangrui Meng
These are relevant:

JIRA: https://issues.apache.org/jira/browse/SPARK-6411
PR: https://github.com/apache/spark/pull/6250

On Thu, May 21, 2015 at 3:16 PM, Def_Os njde...@gmail.com wrote:
 After deserialization, something seems to be wrong with my pandas DataFrames.
 It looks like the timezone information is lost, and subsequent errors ensue.

 Serializing and deserializing a timezone-aware DataFrame tests just fine, so
 it must be Spark that somehow changes the data.

 My program runs timezone-unaware data without problems.

 Anybody have any ideas on what causes this, or how to solve it?





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Pandas-timezone-problems-tp22985.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: S3NativeFileSystem inefficient implementation when calling sc.textFile

2015-05-21 Thread Peng Cheng
I stumble upon this thread and I conjecture that this may affect restoring a
checkpointed RDD as well:

http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-gt-10-hour-between-stage-latency-td22925.html#a22928

In my case I have 1600+ fragmented checkpoint file and the time to read all
metadata takes a staggering 11 hours.

If this is really the cause then its an obvious handicap, as checkponted RDD
already has all file parttition information available and doesn't need to to
read them from s3 into driver again (which cause a single-point-of-failure
and a bottleneck).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3NativeFileSystem-inefficient-implementation-when-calling-sc-textFile-tp19841p22984.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark MOOC - early access

2015-05-21 Thread Kartik Mehta
Awesome,

Thanks a ton for helping us all and futuristic planning,

Much appreciate it,

Regards,

Kartik
On May 21, 2015 4:41 PM, Marco Shaw marco.s...@gmail.com wrote:
















 *Hi Spark Devs and Users,BerkeleyX and Databricks are currently developing
 two Spark-related MOOC on edX (intro
 https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x,
 ml
 https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x),
 the first of which starts on June 1st.  Together these courses have over
 75K enrolled students!To help students perform exercises course content, we
 have created a Vagrant box that contains Spark and IPython (running on
 Ubuntu 32-bit).  This will simplify user setup and helps us support them.
 We are writing to give you early access to the VM environment and the first
 assignment, and to request your help to test out the VM/assignment before
 we unleash it to 75K people (see instructions below). We’ve provided
 instructions below.  We’re happy to help if you have any difficulties
 getting the VM setup; please feel free to contact me (marco.s...@gmail.com
 marco.s...@gmail.com)  with any issues, comments, or
 questions.Sincerely,Marco ShawSpark MOOC TA_(This is being sent
 as an HTML formatted email.  Some of the links have been duplicated just in
 case.)1. Install VirtualBox here
 https://www.virtualbox.org/wiki/Downloads on your OS (see Windows
 tutorial here https://www.youtube.com/watch?v=06Sf-m64fcY
 (https://www.youtube.com/watch?v=06Sf-m64fcY
 https://www.youtube.com/watch?v=06Sf-m64fcY))2. Install Vagrant here
 https://www.vagrantup.com/downloads.html on your OS (see Windows tutorial
 here https://www.youtube.com/watch?v=LZVS23BaA1I
 (https://www.youtube.com/watch?v=LZVS23BaA1I
 https://www.youtube.com/watch?v=LZVS23BaA1I))3) Install virtual machine
 using the following steps: (see Windows tutorial here
 https://www.youtube.com/watch?v=ZuJCqHC7IYc
 (https://www.youtube.com/watch?v=ZuJCqHC7IYc
 https://www.youtube.com/watch?v=ZuJCqHC7IYc))a. Create a custom directory
 (e.g. c:\users\marco\myvagrant or /home/marco/myvagrant)b. Download the
 file
 https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/Vagrantfile
 to the custom directory (NOTE: It must be named exactly Vagrantfile with
 no extension)c. Open a DOS prompt (Windows) or terminal (Mac/Linux) to the
 custom directory and issue the command vagrant up4) Perform basic
 commands in VM as described below: (see Windows tutorial here
 https://www.youtube.com/watch?v=bkteLH77IR0
 (https://www.youtube.com/watch?v=bkteLH77IR0
 https://www.youtube.com/watch?v=bkteLH77IR0))a. To start the VM, from a
 DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant
 up.b. To stop the VM, from a DOS prompt (Windows) or terminal (Mac/Linux),
 issue the command vagrant halt.c. To erase or delete the VM, from a DOS
 prompt (Windows) or terminal (Mac/Linux), issue the command vagrant
 destroy.d. Once the VM is running, to access the notebook, open a web
 browser to http://localhost:8001 http://localhost:8001/.5) Using test
 notebook as described below: (see Windows tutorial here
 https://www.youtube.com/watch?v=mlfAmyF3Q-s
 (https://www.youtube.com/watch?v=mlfAmyF3Q-s
 https://www.youtube.com/watch?v=mlfAmyF3Q-s))a. To start the VM, from a
 DOS prompt (Windows) or terminal (Mac/Linux), issue the command vagrant
 up.b. Once the VM is running, to access the notebook, open a web browser
 to http://localhost:8001 http://localhost:8001/.c. Upload this IPython
 notebook:
 https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/vm_test_student.ipynb
 https://raw.githubusercontent.com/spark-mooc/mooc-setup/master/vm_test_student.ipynb.d.
 Run through the notebook.6) Play around with the first MOOC assignment
 (email Marco for details when you get to this point).7) Please answer the
 following questionsa. What machine are you using (OS, RAM, CPU, age)?b. How
 long did the entire process take?c. How long did the VM download take?
 Relatedly, where are you located?d. Do you have any other
 comments/suggestions?*



Re: [pyspark] Starting workers in a virtualenv

2015-05-21 Thread Davies Liu
Could you try with specify PYSPARK_PYTHON to the path of python in
your virtual env, for example

PYSPARK_PYTHON=/path/to/env/bin/python bin/spark-submit xx.py

On Mon, Apr 20, 2015 at 12:51 AM, Karlson ksonsp...@siberie.de wrote:
 Hi all,

 I am running the Python process that communicates with Spark in a
 virtualenv. Is there any way I can make sure that the Python processes of
 the workers are also started in a virtualenv? Currently I am getting
 ImportErrors when the worker tries to unpickle stuff that is not installed
 system-wide. For now both the worker and the driver run on the same machine
 in local mode.

 Thanks in advance!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



task all finished, while the stage marked finish long time later problem

2015-05-21 Thread 邓刚 [技术中心]
Hi all,

 We are running spark streaming with version 1.1.1. recently we found 
an odd problem.

 In stage 44554, All the task finished, but the stage marked finished 
long time later, as you can see the log below, the last task finished @15/05/21 
21:17:36
 And also the stage remove from taskset. But for about 7s later 
@15/05/21 21:17:43. It is finished at DAGScheduler. So all the dependency 
stages of 44554 launched after 15/05/21 21:17:43. (see image later),I know 
there is a config : spark.akka.threads, the default is 4, we do not change it. 
Does this config affect ?

 Anyone know have any idea about this?




Drive Log:

15/05/21 21:17:36 INFO 
[org.apache.spark.scheduler.TaskSetManager---task-result-getter-3]: Finished 
task 180.0 in stage 44554.0 (TID 1291943) in 3512 ms on 
gd6-mercury-spark-010.idc.vip.com (191/191)
15/05/21 21:17:36 INFO 
[org.apache.spark.scheduler.TaskSchedulerImpl---task-result-getter-3]: Removed 
TaskSet 44554.0, whose tasks have all completed, from pool default
15/05/21 21:17:43 INFO 
[org.apache.spark.scheduler.DAGScheduler---sparkDriver-akka.actor.default-dispatcher-44]:
 Stage 44554 (mapToPair at TraceCalculator.java:88) finished in 14.582 s


Stage image:
[cid:image001.png@01D0947A.81D6BFC0]
本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.


Kmeans Labeled Point RDD

2015-05-21 Thread anneywarlord
Hello,

New to Spark. I wanted to know if it is possible to use a Labeled Point RDD
in org.apache.spark.mllib.clustering.KMeans. After I cluster my data I would
like to be able to identify which observations were grouped with each
centroid.

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-Labeled-Point-RDD-tp22989.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kmeans Labeled Point RDD

2015-05-21 Thread Krishna Sankar
You can predict and then zip it with the points RDD to get approx. same as
LP.
Cheers
k/

On Thu, May 21, 2015 at 6:19 PM, anneywarlord anneywarl...@gmail.com
wrote:

 Hello,

 New to Spark. I wanted to know if it is possible to use a Labeled Point RDD
 in org.apache.spark.mllib.clustering.KMeans. After I cluster my data I
 would
 like to be able to identify which observations were grouped with each
 centroid.

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-Labeled-Point-RDD-tp22989.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: foreach plus accumulator Vs mapPartitions performance

2015-05-21 Thread Burak Yavuz
Or you can simply use `reduceByKeyLocally` if you don't want to worry about
implementing accumulators and such, and assuming that the reduced values
will fit in memory of the driver (which you are assuming by using
accumulators).

Best,
Burak

On Thu, May 21, 2015 at 2:46 PM, ben delpizz...@gmail.com wrote:

 Hi, everybody.

 There are some cases in which I can obtain the same results by using the
 mapPartitions and the foreach method.

 For example in a typical MapReduce approach one would perform a reduceByKey
 immediately after a mapPartitions that transform the original RDD in a
 collection of tuple (key, value). I think that is possible to achieve the
 same result by using, for instance an array of accumulator where at each
 index an executor sums a value and the index itself could be a key.

 Since the reduceByKey will perform a shuffle on disk I think that when is
 possible, the foreach approach should be better even though the foreach has
 the side effect of sum a value to an accumulator.

 I am making this request to see if my reasoning is correct . I hope I was
 clear.
 Thank you, Beniamino



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/foreach-plus-accumulator-Vs-mapPartitions-performance-tp22982.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: PySpark Logs location

2015-05-21 Thread Ruslan Dautkhanov
https://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application

When log aggregation isn’t turned on, logs are retained locally on each
machine under YARN_APP_LOGS_DIR, which is usually configured to/tmp/logs or
$HADOOP_HOME/logs/userlogs depending on the Hadoop version and
installation. Viewing logs for a container requires going to the host that
contains them and looking in this directory. Subdirectories organize log
files by application ID and container ID.

You can enable log aggregation by changing yarn.log-aggregation-enable to
true so it'll be easier to see yarn application logs.

-- 
Ruslan Dautkhanov

On Thu, May 21, 2015 at 5:08 AM, Oleg Ruchovets oruchov...@gmail.com
wrote:

 Doesn't work for me so far ,
using command but got such output. What should I check to fix the
 issue? Any configuration parameters  ...


 [root@sdo-hdp-bd-master1 ~]# yarn logs -applicationId
 application_1426424283508_0048
 15/05/21 13:25:09 INFO impl.TimelineClientImpl: Timeline service address:
 http://hdp-bd-node1.development.c4i:8188/ws/v1/timeline/
 15/05/21 13:25:09 INFO client.RMProxy: Connecting to ResourceManager at
 hdp-bd-node1.development.c4i/12.23.45.253:8050
 /app-logs/root/logs/application_1426424283508_0048does not exist.
 *Log aggregation has not completed or is not enabled.*

 Thanks
 Oleg.

 On Wed, May 20, 2015 at 11:33 PM, Ruslan Dautkhanov dautkha...@gmail.com
 wrote:

 Oleg,

 You can see applicationId in your Spark History Server.
 Go to http://historyserver:18088/

 Also check
 https://spark.apache.org/docs/1.1.0/running-on-yarn.html#debugging-your-application

 It should be no different with PySpark.


 --
 Ruslan Dautkhanov

 On Wed, May 20, 2015 at 2:12 PM, Oleg Ruchovets oruchov...@gmail.com
 wrote:

 Hi Ruslan.
   Could you add more details please.
 Where do I get applicationId? In case I have a lot of log files would it
 make sense to view it from single point.
 How actually I can configure / manage log location of PySpark?

 Thanks
 Oleg.

 On Wed, May 20, 2015 at 10:24 PM, Ruslan Dautkhanov 
 dautkha...@gmail.com wrote:

 You could use

 yarn logs -applicationId application_1383601692319_0008



 --
 Ruslan Dautkhanov

 On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets oruchov...@gmail.com
 wrote:

 Hi ,

   I am executing PySpark job on yarn ( hortonworks distribution).

 Could someone pointing me where is the log locations?

 Thanks
 Oleg.








Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-05-21 Thread Dibyendu Bhattacharya
Hi Tathagata,

Thanks for looking into this. Further investigating I found that the issue
is with Tachyon does not support File Append. The streaming receiver which
writes to WAL when failed, and again restarted, not able to append to same
WAL file after restart.

I raised this with Tachyon user group, and Haoyuan told that within 3
months time Tachyon file append will be ready. Will revisit this issue
again then .

Regards,
Dibyendu


On Fri, May 22, 2015 at 12:24 AM, Tathagata Das t...@databricks.com wrote:

 Looks like somehow the file size reported by the FSInputDStream of
 Tachyon's FileSystem interface, is returning zero.

 On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Just to follow up this thread further .

 I was doing some fault tolerant testing of Spark Streaming with Tachyon
 as OFF_HEAP block store. As I said in earlier email, I could able to solve
 the BlockNotFound exception when I used Hierarchical Storage of Tachyon
 ,  which is good.

 I continue doing some testing around storing the Spark Streaming WAL and
 CheckPoint files also in Tachyon . Here is few finding ..


 When I store the Spark Streaming Checkpoint location in Tachyon , the
 throughput is much higher . I tested the Driver and Receiver failure cases
 , and Spark Streaming is able to recover without any Data Loss on Driver
 failure.

 *But on Receiver failure , Spark Streaming looses data* as I see
 Exception while reading the WAL file from Tachyon receivedData location
  for the same Receiver id which just failed.

 If I change the Checkpoint location back to HDFS , Spark Streaming can
 recover from both Driver and Receiver failure .

 Here is the Log details when Spark Streaming receiver failed ...I raised
 a JIRA for the same issue :
 https://issues.apache.org/jira/browse/SPARK-7525



 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
 (epoch 1)*
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
 remove executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
 block manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
 successfully in removeExecutor
 INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
 receiver for stream 2 from 10.252.5.62*:47255
 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
 not read data from write ahead log record
 FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)*
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IllegalArgumentException:* Seek position is past
 EOF: 645603894, fileSize = 0*
 at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
 at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
 ... 15 more

 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
 stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
 INFO : 

LDA prediction on new document

2015-05-21 Thread Dani Qiu
Hi, guys, I'm pretty new to LDA. I notice spark 1.3.0  mllib provide EM
based LDA implementation. It returns both topics and topic distribution.

My question is how can I use these parameters to predict on new document ?

And I notice there is an Online LDA implementation in spark master branch,
it only returns topics , how can I use this to  do prediction on new
document (and trained document) ?


thanks


Re: Query a Dataframe in rdd.map()

2015-05-21 Thread Ram Sriharsha
Your original code snippet seems incomplete and there isn't enough
information to figure out what problem you actually ran into

from your original code snippet there is an rdd variable which is well
defined  and a df variable that is not defined in the snippet of code you
sent

one way to make this work is as below (until the last line is executed you
are actually not collecting anything on the driver, and if your dataset is
too big to collect on the driver for inspection just do a take(n) on the
result

from pyspark.sql import Row,SQLContext
from pyspark.sql.functions import count

sqlContext = SQLContext(sc)

# convert list of ip into a data frame with column ip
Record = Row(ip)
df = sc.parallelize(map(lambda x: Record(x), ['208.51.22.18',
'31.207.6.173', '208.51.22.18'])).toDF()

# obtain ip - frequency and inspect
df.groupBy(df.ip).agg(count(df.ip)).show()

++-+
|  ip|COUNT(ip)|
++-+
|208.51.22.18|2|
|31.207.6.173|1|
++-+

what exactly is the issue you are running into when you say it doesn't get
through?

On Thu, May 21, 2015 at 10:47 AM, ping yan sharon...@gmail.com wrote:

 Thanks. I suspected that, but figured that df query inside a map sounds so
 intuitive that I don't just want to give up.

 I've tried join and even better with a DStream.transform() and it works!
 freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y):
 y[1]))

 Thank you for the help!

 Ping

 On Thu, May 21, 2015 at 10:40 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So DataFrames, like RDDs, can only be accused from the driver. If your
 IP Frequency table is small enough you could collect it and distribute it
 as a hashmap with broadcast or you could also join your rdd with the ip
 frequency table. Hope that helps :)


 On Thursday, May 21, 2015, ping yan sharon...@gmail.com wrote:

 I have a dataframe as a reference table for IP frequencies.
 e.g.,

 ip   freq
 10.226.93.67 1
 10.226.93.69 1
 161.168.251.101   4
 10.236.70.2   1
 161.168.251.105 14


 All I need is to query the df in a map.

 rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18'])

 freqs = rdd.map(lambda x: df.where(df.ip ==x ).first())

 It doesn't get through.. would appreciate any help.

 Thanks!
 Ping




 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721



 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau




 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721




Re: rdd.sample() methods very slow

2015-05-21 Thread Sean Owen
If sampling whole partitions is sufficient (or a part of a partition),
sure you could mapPartitionsWithIndex and decide whether to process a
partition at all based on its # and skip the rest. That's much faster.

On Thu, May 21, 2015 at 7:07 PM, Wang, Ningjun (LNG-NPV)
ningjun.w...@lexisnexis.com wrote:
 I don't need to be 100% randome. How about randomly pick a few partitions and 
 return all docs in those partitions? Is
 rdd.mapPartitionsWithIndex() the right method to use to just process a small 
 portion of partitions?

 Ningjun

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Official Docker container for Spark

2015-05-21 Thread tridib
Hi,

I am using spark 1.2.0. Can you suggest docker containers which can be
deployed in production? I found lot of spark images in
https://registry.hub.docker.com/ . But could not figure out which one to
use. None of them seems like official image.

Does anybody have any recommendation?

Thanks
Tridib



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Official-Docker-container-for-Spark-tp22977.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java program got Stuck at broadcasting

2015-05-21 Thread Akhil Das
Can you try commenting the saveAsTextFile and do a simple count()? If its a
broadcast issue, then it would throw up the same error.
On 21 May 2015 14:21, allanjie allanmcgr...@gmail.com wrote:

 Sure, the code is very simple. I think u guys can understand from the main
 function.

 public class Test1 {

 public static double[][] createBroadcastPoints(String
 localPointPath, int
 row, int col) throws IOException{
 BufferedReader br = RAWF.reader(localPointPath);
 String line = null;
 int rowIndex = 0;
 double[][] pointFeatures = new double[row][col];
 while((line = br.readLine())!=null){
 ListString point =
 Arrays.asList(line.split(,));
 int colIndex = 0;
 for(String pointFeature: point){
 pointFeatures[rowIndex][colIndex] =
 Double.valueOf(pointFeature);
 colIndex++;
 }
 rowIndex++;
 }
 br.close();
 return pointFeatures;
 }



 public static void main(String[] args) throws IOException{
 /**Parameter Setting***/
  String localPointPath =
 /home/hduser/skyrock/skyrockImageFeatures.csv;
  String remoteFilePath =
 hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv;
 //this csv file is only 468MB
  final int row = 133433;
  final int col = 458;
  /**/

 SparkConf conf = new SparkConf().
 setAppName(distance).
 setMaster(spark://HadoopV26Master:7077).
 set(spark.executor.memory, 4g).
 set(spark.eventLog.enabled, true)
 .set(spark.eventLog.dir,
 /usr/local/spark/logs/spark-events)
 .set(spark.local.dir, /tmp/spark-temp);
 JavaSparkContext sc = new JavaSparkContext(conf);

 JavaRDDString textFile = sc.textFile(remoteFilePath);
 //Broadcast variable
 //double[][] xx =;

 final Broadcastdouble[][] broadcastPoints =
 sc.broadcast(createBroadcastPoints(localPointPath,row,col));
 //final Broadcastdouble[][] broadcastPoints =
 sc.broadcast(xx);

 /**
  * Compute the distance in terms of each point on each
 instance.
  * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
  */
 JavaPairRDDPair,Double distance =
 textFile.flatMapToPair(new
 PairFlatMapFunctionString, Pair, Double(){
 public IterableTuple2lt;Pair, Double
 call(String v1) throws
 Exception{
 ListString al =
 Arrays.asList(v1.split(,));
 double[] featureVals = new
 double[al.size()];
 for(int j=0;jal.size()-1;j++)
 featureVals[j] =
 Double.valueOf(al.get(j+1));
 int jIndex = Integer.valueOf(al.get(0));
 double[][] allPoints =
 broadcastPoints.getValue();
 double sum = 0;
 Listlt;Tuple2lt;Pair, Double list =
 new ArrayListTuple2lt;Pair,
 Double();
 for(int i=0;irow; i++){
 sum = 0;
 for(int j=0;jlt;al.size()-1;j++){
 sum +=
 (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
 }
 list.add(new
 Tuple2lt;Pair,Double(new
 Pair(i,jIndex),Math.sqrt(sum)));
 }
 return list;
 }
 });



 distance.saveAsTextFile(hdfs://HadoopV26Master:9000/user/+args[0]);
 }
 }




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark HistoryServer not coming up

2015-05-21 Thread roy
Hi,

   After restarting Spark HistoryServer, it failed to come up, I checked
logs for Spark HistoryServer found following messages :'

 2015-05-21 11:38:03,790 WARN org.apache.spark.scheduler.ReplayListenerBus:
Log path provided contains no log files.
2015-05-21 11:38:52,319 INFO org.apache.spark.deploy.history.HistoryServer:
Registered signal handlers for [TERM, HUP, INT]
2015-05-21 11:38:52,328 WARN
org.apache.spark.deploy.history.HistoryServerArguments: Setting log
directory through the command line is deprecated as of Spark 1.1.0. Please
set this through spark.history.fs.logDirectory instead.
2015-05-21 11:38:52,461 INFO org.apache.spark.SecurityManager: Changing view
acls to: spark
2015-05-21 11:38:52,462 INFO org.apache.spark.SecurityManager: Changing
modify acls to: spark
2015-05-21 11:38:52,463 INFO org.apache.spark.SecurityManager:
SecurityManager: authentication disabled; ui acls disabled; users with view
permissions: Set(spark); users with modify permissions: Set(spark)
2015-05-21 11:41:24,893 ERROR org.apache.spark.deploy.history.HistoryServer:
RECEIVED SIGNAL 15: SIGTERM
2015-05-21 11:41:33,439 INFO org.apache.spark.deploy.history.HistoryServer:
Registered signal handlers for [TERM, HUP, INT]
2015-05-21 11:41:33,447 WARN
org.apache.spark.deploy.history.HistoryServerArguments: Setting log
directory through the command line is deprecated as of Spark 1.1.0. Please
set this through spark.history.fs.logDirectory instead.
2015-05-21 11:41:33,578 INFO org.apache.spark.SecurityManager: Changing view
acls to: spark
2015-05-21 11:41:33,579 INFO org.apache.spark.SecurityManager: Changing
modify acls to: spark
2015-05-21 11:41:33,579 INFO org.apache.spark.SecurityManager:
SecurityManager: authentication disabled; ui acls disabled; users with view
permissions: Set(spark); users with modify permissions: Set(spark)
2015-05-21 11:44:07,147 WARN org.apache.hadoop.hdfs.BlockReaderFactory: I/O
error constructing remote block reader.
java.io.EOFException: Premature EOF: no length prefix available
at
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2109)
at
org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:408)
at
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:785)
at
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:663)
at
org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:327)
at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:574)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:797)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844)
at java.io.DataInputStream.read(DataInputStream.java:149)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:154)
at java.io.BufferedReader.readLine(BufferedReader.java:317)
at java.io.BufferedReader.readLine(BufferedReader.java:382)
at
scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:67)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
at
org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$5.apply(FsHistoryProvider.scala:175)
at
org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$5.apply(FsHistoryProvider.scala:172)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at

Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-21 Thread Shixiong Zhu
My 2 cents:

As per javadoc:
https://docs.oracle.com/javase/7/docs/api/java/lang/Runtime.html#addShutdownHook(java.lang.Thread)

Shutdown hooks should also finish their work quickly. When a program
invokes exit the expectation is that the virtual machine will promptly shut
down and exit. When the virtual machine is terminated due to user logoff or
system shutdown the underlying operating system may only allow a fixed
amount of time in which to shut down and exit. It is therefore inadvisable
to attempt any user interaction or to perform a long-running computation in
a shutdown hook.

The shutdown hook should not do any long-running work and may exit before
stop returns. It means we cannot implement the stopGracefully = true
semantics correctly, which the user will expect stops gracefully by
waiting for the processing of all received data to be completed. So I
agree that we can add `ssc.stop` as a the shutdown hook. But stopGracefully
should be false.



Best Regards,
Shixiong Zhu

2015-05-20 21:59 GMT-07:00 Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com:

 Thanks Tathagata for making this change..

 Dibyendu

 On Thu, May 21, 2015 at 8:24 AM, Tathagata Das t...@databricks.com
 wrote:

 If you are talking about handling driver crash failures, then all bets
 are off anyways! Adding a shutdown hook in the hope of handling driver
 process failure, handles only a some cases (Ctrl-C), but does not handle
 cases like SIGKILL (does not run JVM shutdown hooks) or driver machine
 crash. So its not a good idea to rely on that.

 Nonetheless I have opened a PR to handle the shutdown of the
 StreamigntContext in the same way as SparkContext.
 https://github.com/apache/spark/pull/6307


 On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Thenka Sean . you are right. If driver program is running then I can
 handle shutdown in main exit path  . But if Driver machine is crashed (if
 you just stop the application, for example killing the driver process ),
 then Shutdownhook is the only option isn't it ? What I try to say is , just
 doing ssc.stop in  sys.ShutdownHookThread  or
  Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need
 to use the Utils.addShutdownHook with a priority .. So just checking if
 Spark Streaming can make graceful shutdown as default shutdown mechanism.

 Dibyendu

 On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote:

 I don't think you should rely on a shutdown hook. Ideally you try to
 stop it in the main exit path of your program, even in case of an
 exception.

 On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
 dibyendu.bhattach...@gmail.com wrote:
  You mean to say within Runtime.getRuntime().addShutdownHook I call
  ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
 
  This won't work anymore in 1.4.
 
  The SparkContext got stopped before Receiver processed all received
 blocks
  and I see below exception in logs. But if I add the
 Utils.addShutdownHook
  with the priority as I mentioned , then only graceful shutdown works
 . In
  that case shutdown-hook run in priority order.
 







Re: saveAsTextFile() part- files are missing

2015-05-21 Thread Tomasz Fruboes

Hi,

 it looks you are writing to a local filesystem. Could you try writing 
to a location visible by all nodes (master and workers), e.g. nfs share?


 HTH,
  Tomasz

W dniu 21.05.2015 o 17:16, rroxanaioana pisze:

Hello!
I just started with Spark. I have an application which counts words in a
file (1 MB file).
The file is stored locally. I loaded the file using native code and then
created the RDD from it.

 JavaRDDString rddFromFile = context.parallelize(myFile,
2);
JavaRDDString words = rddFromFile.flatMap(...);
JavaPairRDDString, Integer pairs = words.mapToPair(...);
JavaPairRDDString, Integer counter = pairs.reduceByKey(..);

counter.saveAsTextFile(file:///root/output);
context.close();

I have one master and 2 slaves. I run the program from the master node.
The output directory is created on the master node and on the 2 nodes. On
the master node I have only one file _SUCCES (empty) and on the nodes I have
_temporary file. I printed the counter at the console, the result seems ok.
What am I doing wrong?
Thank you!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-part-files-are-missing-tp22974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark HistoryServer not coming up

2015-05-21 Thread Marcelo Vanzin
Seems like there might be a mismatch between your Spark jars and your
cluster's HDFS version. Make sure you're using the Spark jar that matches
the hadoop version of your cluster.

On Thu, May 21, 2015 at 8:48 AM, roy rp...@njit.edu wrote:

 Hi,

After restarting Spark HistoryServer, it failed to come up, I checked
 logs for Spark HistoryServer found following messages :'

  2015-05-21 11:38:03,790 WARN org.apache.spark.scheduler.ReplayListenerBus:
 Log path provided contains no log files.
 2015-05-21 11:38:52,319 INFO org.apache.spark.deploy.history.HistoryServer:
 Registered signal handlers for [TERM, HUP, INT]
 2015-05-21 11:38:52,328 WARN
 org.apache.spark.deploy.history.HistoryServerArguments: Setting log
 directory through the command line is deprecated as of Spark 1.1.0. Please
 set this through spark.history.fs.logDirectory instead.
 2015-05-21 11:38:52,461 INFO org.apache.spark.SecurityManager: Changing
 view
 acls to: spark
 2015-05-21 11:38:52,462 INFO org.apache.spark.SecurityManager: Changing
 modify acls to: spark
 2015-05-21 11:38:52,463 INFO org.apache.spark.SecurityManager:
 SecurityManager: authentication disabled; ui acls disabled; users with view
 permissions: Set(spark); users with modify permissions: Set(spark)
 2015-05-21 11:41:24,893 ERROR
 org.apache.spark.deploy.history.HistoryServer:
 RECEIVED SIGNAL 15: SIGTERM
 2015-05-21 11:41:33,439 INFO org.apache.spark.deploy.history.HistoryServer:
 Registered signal handlers for [TERM, HUP, INT]
 2015-05-21 11:41:33,447 WARN
 org.apache.spark.deploy.history.HistoryServerArguments: Setting log
 directory through the command line is deprecated as of Spark 1.1.0. Please
 set this through spark.history.fs.logDirectory instead.
 2015-05-21 11:41:33,578 INFO org.apache.spark.SecurityManager: Changing
 view
 acls to: spark
 2015-05-21 11:41:33,579 INFO org.apache.spark.SecurityManager: Changing
 modify acls to: spark
 2015-05-21 11:41:33,579 INFO org.apache.spark.SecurityManager:
 SecurityManager: authentication disabled; ui acls disabled; users with view
 permissions: Set(spark); users with modify permissions: Set(spark)
 2015-05-21 11:44:07,147 WARN org.apache.hadoop.hdfs.BlockReaderFactory: I/O
 error constructing remote block reader.
 java.io.EOFException: Premature EOF: no length prefix available
 at
 org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2109)
 at

 org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:408)
 at

 org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:785)
 at

 org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:663)
 at

 org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:327)
 at
 org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:574)
 at

 org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:797)
 at
 org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844)
 at java.io.DataInputStream.read(DataInputStream.java:149)
 at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
 at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
 at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
 at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
 at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
 at java.io.InputStreamReader.read(InputStreamReader.java:184)
 at java.io.BufferedReader.fill(BufferedReader.java:154)
 at java.io.BufferedReader.readLine(BufferedReader.java:317)
 at java.io.BufferedReader.readLine(BufferedReader.java:382)
 at

 scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:67)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at

 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
 at

 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
 at

 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at

 org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
 at

 org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$5.apply(FsHistoryProvider.scala:175)
 at

 org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$5.apply(FsHistoryProvider.scala:172)
 at

 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at

 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at

 

Query a Dataframe in rdd.map()

2015-05-21 Thread ping yan
I have a dataframe as a reference table for IP frequencies.
e.g.,

ip   freq
10.226.93.67 1
10.226.93.69 1
161.168.251.101   4
10.236.70.2   1
161.168.251.105 14


All I need is to query the df in a map.

rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18'])

freqs = rdd.map(lambda x: df.where(df.ip ==x ).first())

It doesn't get through.. would appreciate any help.

Thanks!
Ping




-- 
Ping Yan
Ph.D. in Management
Dept. of Management Information Systems
University of Arizona
Tucson, AZ 85721


Re: Spark Streaming + Kafka failure recovery

2015-05-21 Thread Bill Jay
Hi Cody,

That is clear. Thanks!

Bill

On Tue, May 19, 2015 at 1:27 PM, Cody Koeninger c...@koeninger.org wrote:

 If you checkpoint, the job will start from the successfully consumed
 offsets.  If you don't checkpoint, by default it will start from the
 highest available offset, and you will potentially lose data.

 Is the link I posted, or for that matter the scaladoc, really not clear on
 that point?

 The scaladoc says:

  To recover from driver failures, you have to enable checkpointing in the
 StreamingContext
 http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/StreamingContext.html.
 The information on consumed offset can be recovered from the checkpoint.

 On Tue, May 19, 2015 at 2:38 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 If a Spark streaming job stops at 12:01 and I resume the job at 12:02.
 Will it still start to consume the data that were produced to Kafka at
 12:01? Or it will just start consuming from the current time?


 On Tue, May 19, 2015 at 10:58 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Have you read
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
 ?

 1.  There's nothing preventing that.

 2. Checkpointing will give you at-least-once semantics, provided you
 have sufficient kafka retention.  Be aware that checkpoints aren't
 recoverable if you upgrade code.

 On Tue, May 19, 2015 at 12:42 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am currently using Spark streaming to consume and save logs every
 hour in our production pipeline. The current setting is to run a crontab
 job to check every minute whether the job is still there and if not
 resubmit a Spark streaming job. I am currently using the direct approach
 for Kafka consumer. I have two questions:

 1. In the direct approach, no offset is stored in zookeeper and no
 group id is specified. Can two consumers (one is Spark streaming and the
 other is a Kafak console consumer in Kafka package) read from the same
 topic from the brokers together (I would like both of them to get all
 messages, i.e. publish-subscribe mode)? What about two Spark streaming jobs
 read from the same topic?

 2. How to avoid data loss if a Spark job is killed? Does checkpointing
 serve this purpose? The default behavior of Spark streaming is to read the
 latest logs. However, if a job is killed, can the new job resume from what
 was left to avoid loosing logs?

 Thanks!

 Bill







Re: Query a Dataframe in rdd.map()

2015-05-21 Thread Holden Karau
So DataFrames, like RDDs, can only be accused from the driver. If your IP
Frequency table is small enough you could collect it and distribute it as a
hashmap with broadcast or you could also join your rdd with the ip
frequency table. Hope that helps :)

On Thursday, May 21, 2015, ping yan sharon...@gmail.com wrote:

 I have a dataframe as a reference table for IP frequencies.
 e.g.,

 ip   freq
 10.226.93.67 1
 10.226.93.69 1
 161.168.251.101   4
 10.236.70.2   1
 161.168.251.105 14


 All I need is to query the df in a map.

 rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18'])

 freqs = rdd.map(lambda x: df.where(df.ip ==x ).first())

 It doesn't get through.. would appreciate any help.

 Thanks!
 Ping




 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
Linked In: https://www.linkedin.com/in/holdenkarau