Re: Fwd: Model weights of linear regression becomes abnormal values
You probably need to scale the values in the data set so that they are all of comparable ranges and translate them so that their means get to 0. You can use pyspark.mllib.feature.StandardScaler(True, True) object for that. On 28.5.2015. 6:08, Maheshakya Wijewardena wrote: Hi, I'm trying to use Sparks' *LinearRegressionWithSGD* in PySpark with the attached dataset. The code is attached. When I check the model weights vector after training, it contains `nan` values. [nan,nan,nan,nan,nan,nan,nan,nan] But for some data sets, this problem does not occur. What might be the reason for this? Is this an issue with the data I'm using or a bug? Best regards. -- Pruthuvi Maheshakya Wijewardena Software Engineer WSO2 Lanka (Pvt) Ltd Email: mahesha...@wso2.com mailto:mahesha...@wso2.com Mobile: +94711228855/* */ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark1.3.1 build issue with CDH5.4.0 getUnknownFields
I had already tested query in Hive CLI and it works fine. Same query shows error in Spark SQL. On May 29, 2015 4:14 AM, ayan guha guha.a...@gmail.com wrote: Probably a naive question: can you try the same in hive CLI and see if your SQL is working? Looks like hive thing to me as spark is faithfully delegating the query to hive. On 29 May 2015 03:22, Abhishek Tripathi trackissue...@gmail.com wrote: Hi , I'm using CDH5.4.0 quick start VM and tried to build Spark with Hive compatibility so that I can run Spark sql and access temp table remotely. I used below command to build Spark, it was build successful but when I tried to access Hive data from Spark sql, I get error. Thanks, Abhi --- mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive -Phive-thriftserver -DskipTests clean package [cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/ [cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql SET spark.sql.hive.version=0.13.1 spark-sql show tables; sample_07 false t1 false Time taken: 3.901 seconds, Fetched 2 row(s) spark-sql select * from t1; 15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1] java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
Re: Adding an indexed column
One way I can see is to - 1. get rdd from your df 2. call rdd.zipWithIndex to get a new rdd 3. turn your new rdd to a new df On Fri, May 29, 2015 at 5:43 AM, Cesar Flores ces...@gmail.com wrote: Assuming that I have the next data frame: flag | price -- 1|47.808764653746 1|47.808764653746 1|31.9869279512204 1|47.7907893713564 1|16.7599200038239 1|16.7599200038239 1|20.3916014172137 How can I create a data frame with an extra indexed column as the next one: flag | price | index --|--- 1|47.808764653746 | 0 1|47.808764653746 | 1 1|31.9869279512204| 2 1|47.7907893713564| 3 1|16.7599200038239| 4 1|16.7599200038239| 5 1|20.3916014172137| 6 -- Cesar Flores
Re: Spark Streaming and Drools
Hi all, I wrote a simple rule (Drools) and I'm trying to fire it, when I fireAllRules nothing happen neither exceptions. . . do I need to setup configurations? Thanks A G 2015-05-22 12:22 GMT+02:00 Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com: Hi, Sometime back I played with Distributed Rule processing by integrating Drool with HBase Co-Processors ..and invoke Rules on any incoming data .. https://github.com/dibbhatt/hbase-rule-engine You can get some idea how to use Drools rules if you see this RegionObserverCoprocessor .. https://github.com/dibbhatt/hbase-rule-engine/blob/master/src/main/java/hbase/rule/HBaseDroolObserver.java Idea is basically to create a stateless Ruleengine from the drl file and fire the rule on incoming data .. Even though the code is for invoking rules on HBase PUT object , but you can get an idea ..and modify it for Spark.. Dibyendu On Fri, May 22, 2015 at 3:49 PM, Evo Eftimov evo.efti...@isecc.com wrote: I am not aware of existing examples but you can always “ask” Google Basically from Spark Streaming perspective, Drools is a third-party Software Library, you would invoke it in the same way as any other third-party software library from the Tasks (maps, filters etc) within your DAG job *From:* Antonio Giambanco [mailto:antogia...@gmail.com] *Sent:* Friday, May 22, 2015 11:07 AM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming and Drools Thanks a lot Evo, do you know where I can find some examples? Have a great one A G 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message *From:* Antonio Giambanco [mailto:antogia...@gmail.com] *Sent:* Friday, May 22, 2015 9:43 AM *To:* user@spark.apache.org *Subject:* Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?
When will Spark 1.4 be available exactly? To answer to Model selection can be achieved through high lambda resulting lots of zero in the coefficients : Do you mean that putting a high lambda as a parameter of the logistic regression keeps only a few significant variables and deletes the others with a zero in the coefficients? What is a high lambda for you? Is the lambda a parameter available in Spark 1.4 only or can I see it in Spark 1.3? 2015-05-23 0:04 GMT+02:00 Joseph Bradley jos...@databricks.com: If you want to select specific variable combinations by hand, then you will need to modify the dataset before passing it to the ML algorithm. The DataFrame API should make that easy to do. If you want to have an ML algorithm select variables automatically, then I would recommend using L1 regularization for now and possibly elastic net after 1.4 is release, per DB's suggestion. If you want detailed model statistics similar to what R provides, I've created a JIRA for discussing how we should add that functionality to MLlib. Those types of stats will be added incrementally, but feedback would be great for prioritization: https://issues.apache.org/jira/browse/SPARK-7674 To answer your question: How are the weights calculated: is there a correlation calculation with the variable of interest? -- Weights are calculated as with all logistic regression algorithms, by using convex optimization to minimize a regularized log loss. Good luck! Joseph On Fri, May 22, 2015 at 1:07 PM, DB Tsai dbt...@dbtsai.com wrote: In Spark 1.4, Logistic Regression with elasticNet is implemented in ML pipeline framework. Model selection can be achieved through high lambda resulting lots of zero in the coefficients. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Fri, May 22, 2015 at 1:19 AM, SparknewUser melanie.galloi...@gmail.com wrote: I am new in MLlib and in Spark.(I use Scala) I'm trying to understand how LogisticRegressionWithLBFGS and LogisticRegressionWithSGD work. I usually use R to do logistic regressions but now I do it on Spark to be able to analyze Big Data. The model only returns weights and intercept. My problem is that I have no information about which variable is significant and which variable I had better to delete to improve my model. I only have the confusion matrix and the AUC to evaluate the performance. Is there any way to have information about the variables I put in my model? How can I try different variable combinations, do I have to modify the dataset of origin (e.g. delete one or several columns?) How are the weights calculated: is there a correlation calculation with the variable of interest? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Mélanie*
Spark Executor Memory Usage
Hello! My name is Valerii. I have noticed strange memory behaivour of Spark's executor on my cluster. Cluster works in standalone mode with 3 workers. Application runs in cluster mode. From topology configuration spark.executor.memory 1536m I checked heap usage via JVisualVM: http://joxi.ru/Q2KqBMdSvYpDrj and via htop: http://joxi.ru/Vm63RWeCvG6L2Z I have 2 questions regarding Spark's executors memory usage: 1. Why does Max Heap Size change during executor work? 2. Why does Memory usage via htop greater than executor's heap size? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Executor-Memory-Usage-tp23083.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Execption writing on two cassandra tables NoHostAvailableException: All host(s) tried for query failed (no host was tried)
are you able to connect to your cassandra installation via cassandra_home$ ./bin/cqlsh This exception generally means that your cassandra instance is not reachable/accessible On Fri, May 29, 2015 at 6:11 AM, Antonio Giambanco antogia...@gmail.com wrote: Hi all, I have in a single server installed spark 1.3.1 and cassandra 2.0.14 I'm coding a simple java class for Spark Streaming as follow: - reading header events from flume sink - based on header I write the event body on navigation or transaction table (cassandra) unfortunatly I get NoHostAvailableException, if I comment the code for saving one of the two tables everything works *here the code* public static void main(String[] args) { // Create a local StreamingContext with two working thread and batch interval of 1 second SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(DWXNavigationApp); conf.set(spark.cassandra.connection.host, 127.0.0.1); conf.set(spark.cassandra.connection.native.port,9042); conf.set(spark.cassandra.output.batch.size.rows, 1); conf.set(spark.cassandra.output.concurrent.writes, 1); final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaReceiverInputDStreamSparkFlumeEvent flumeStreamNavig = FlumeUtils.createPollingStream(jssc, 127.0.0.1, ); JavaDStreamString logRowsNavig = flumeStreamNavig.map( new FunctionSparkFlumeEvent,String(){ @Override public String call(SparkFlumeEvent arg0) throws Exception { // TODO Auto-generated method stub0. MapCharSequence,CharSequence headers = arg0.event().getHeaders(); ByteBuffer bytePayload = arg0.event().getBody(); String s = headers.get(source_log).toString() + # + new String(bytePayload.array()); System.out.println(RIGA: + s); return s; } }); logRowsNavig.foreachRDD( new FunctionJavaRDDString,Void(){ @Override public Void call(JavaRDDString rows) throws Exception { if(!rows.isEmpty()){ //String header = getHeaderFronRow(rows.collect()); ListNavigation listNavigation = new ArrayListNavigation(); ListTransaction listTransaction = new ArrayListTransaction(); for(String row : rows.collect()){ String header = row.substring(0, row.indexOf(#)); if(header.contains(controller_log)){ listNavigation.add(createNavigation(row)); System.out.println(Added Element in Navigation List); }else if(header.contains(business_log)){ listTransaction.add(createTransaction(row)); System.out.println(Added Element in Transaction List); } } if(!listNavigation.isEmpty()){ JavaRDDNavigation navigationRows= jssc.sparkContext().parallelize(listNavigation); javaFunctions(navigationRows).writerBuilder(cassandrasink, navigation, mapToRow(Navigation.class)).saveToCassandra(); } if(!listTransaction.isEmpty()){ JavaRDDTransaction transactionRows= jssc.sparkContext().parallelize(listTransaction); javaFunctions(transactionRows).writerBuilder(cassandrasink, transaction, mapToRow(Transaction.class)).saveToCassandra(); } } return null; } }); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *here the exception* 15/05/29 11:19:29 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBatchStatement@ab76b83 com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried) at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107) at com.datastax.driver.core.SessionManager.execute(SessionManager.java:538) at com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:577) at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:119) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
Re: Batch aggregation by sliding window + join
Hi Ayan, thanks for the response I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only core, might be I should?) What do you mean by materialized? I can repartitionAndSort by key daily-aggregation, however I'm not quite understand how it will help with yesterdays block which needs to be loaded from file and it has no connection to this repartition of daily block. On 29 May 2015 at 01:51, ayan guha guha.a...@gmail.com wrote: Which version of spark? In 1.4 window queries will show up for these kind of scenarios. 1 thing I can suggest is keep daily aggregates materialised and partioned by key and sorted by key-day combination using repartitionandsort method. It allows you to use custom partitioner and custom sorter. Best Ayan On 29 May 2015 03:31, igor.berman igor.ber...@gmail.com wrote: Hi, I have a batch daily job that computes daily aggregate of several counters represented by some object. After daily aggregation is done, I want to compute block of 3 days aggregation(3,7,30 etc) To do so I need to add new daily aggregation to the current block and then subtract from current block the daily aggregation of the last day within the current block(sliding window...) I've implemented it with something like: baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) All rdds are keyed by unique id(long). Each rdd is saved in avro files after the job finishes and loaded when job starts(on next day). baseBlockRdd is much larger than lastDay and newDay rdds(depends on the size of the block) Unfortunately the performance is not satisfactory due to many shuffles(I have parallelism etc) I was looking for the way to improve performance somehow, to make sure that one task joins same local keys without reshuffling baseBlockRdd(which is big) each time the job starts(see https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) so bottom line - how to join big rdd with smaller rdd without reshuffling big rdd over and over again? As soon as I've saved this big rdd and reloaded it from disk I want that every other rdd will be partitioned and collocated by the same partitioner(which is absent for hadooprdd) ... somehow, so that only small rdds will be sent over network. Another idea I had - somehow split baseBlock into 2 parts with filter by keys of small rdds and then join, however I'm not sure it's possible to implement this filter without join. any ideas would be appreciated, thanks in advance Igor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
dataframe cumulative sum
What will be the more appropriate method to add a cumulative sum column to a data frame. For example, assuming that I have the next data frame: flag | price -- 1|47.808764653746 1|47.808764653746 1|31.9869279512204 How can I create a data frame with an extra cumsum column as the next one: flag | price | cumsum_price --|--- 1|47.808764653746 | 47.808764653746 1|47.808764653746 | 95.6175293075 1|31.9869279512204| 127.604457259 Thanks -- Cesar Flores
Re: [Streaming] Configure executor logging on Mesos
Hi Tim, Thanks for the info. We (Andy Petrella and myself) have been diving a bit deeper into this log config: The log line I was referring to is this one (sorry, I provided the others just for context) *Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties* That line comes from Logging.scala [1] where a default config is loaded is none is found in the classpath upon the startup of the Spark Mesos executor in the Mesos sandbox. At that point in time, none of the application-specific resources have been shipped yet as the executor JVM is just starting up. To load a custom configuration file we should have it already on the sandbox before the executor JVM starts and add it to the classpath on the startup command. Is that correct? For the classpath customization, It looks like it should be possible to pass a -Dlog4j.configuration property by using the 'spark.executor.extraClassPath' that will be picked up at [2] and that should be added to the command that starts the executor JVM, but the resource must be already on the host before we can do that. Therefore we also need some means of 'shipping' the log4j.configuration file to the allocated executor. This all boils down to your statement on the need of shipping extra files to the sandbox. Bottom line: It's currently not possible to specify a config file for your mesos executor. (ours grows several GB/day). The only workaround I found so far is to open up the Spark assembly, replace the log4j-default.properties and pack it up again. That would work, although kind of rudimentary as we use the same assembly for many jobs. Probably, accessing the log4j API programmatically should also work (I didn't try that yet) Should we open a JIRA for this functionality? -kr, Gerard. [1] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Logging.scala#L128 [2] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L77 On Thu, May 28, 2015 at 7:50 PM, Tim Chen t...@mesosphere.io wrote: -- Forwarded message -- From: Tim Chen t...@mesosphere.io Date: Thu, May 28, 2015 at 10:49 AM Subject: Re: [Streaming] Configure executor logging on Mesos To: Gerard Maas gerard.m...@gmail.com Hi Gerard, The log line you referred to is not Spark logging but Mesos own logging, which is using glog. Our own executor logs should only contain very few lines though. Most of the log lines you'll see is from Spark, and it can be controled by specifiying a log4j.properties to be downloaded with your Mesos task. Alternatively if you are downloading Spark executor via spark.executor.uri, you can include log4j.properties in that tar ball. I think we probably need some more configurations for Spark scheduler to pick up extra files to be downloaded into the sandbox. Tim On Thu, May 28, 2015 at 6:46 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi, I'm trying to control the verbosity of the logs on the Mesos executors with no luck so far. The default behaviour is INFO on stderr dump with an unbounded growth that gets too big at some point. I noticed that when the executor is instantiated, it locates a default log configuration in the spark assembly: I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave 20150528-063307-780930314-5050-8152-S5 Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties So, no matter what I provide in my job jar files (or also tried with (spark.executor.extraClassPath=log4j.properties) takes effect in the executor's configuration. How should I configure the log on the executors? thanks, Gerard.
Re: Spark1.3.1 build issue with CDH5.4.0 getUnknownFields
Regarding the build itself, hadoop-2.6 is not even a valid profile. I got the following WARNING for my build. [WARNING] The requested profile hadoop-2.6 could not be activated because it does not exist. Chen On Fri, May 29, 2015 at 2:38 AM, trackissue121 trackissue...@gmail.com wrote: I had already tested query in Hive CLI and it works fine. Same query shows error in Spark SQL. On May 29, 2015 4:14 AM, ayan guha guha.a...@gmail.com wrote: Probably a naive question: can you try the same in hive CLI and see if your SQL is working? Looks like hive thing to me as spark is faithfully delegating the query to hive. On 29 May 2015 03:22, Abhishek Tripathi trackissue...@gmail.com wrote: Hi , I'm using CDH5.4.0 quick start VM and tried to build Spark with Hive compatibility so that I can run Spark sql and access temp table remotely. I used below command to build Spark, it was build successful but when I tried to access Hive data from Spark sql, I get error. Thanks, Abhi --- *mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive -Phive-thriftserver -DskipTests clean package* [cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/ [cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql SET spark.sql.hive.version=0.13.1 spark-sql show tables; sample_07 false t1 false Time taken: 3.901 seconds, Fetched 2 row(s) spark-sql select * from t1; 15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1] java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto overrides final method *getUnknownFields* .()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) -- Chen Song
Python implementation of RDD interface
I wanted to share a Python implementation of RDDs: pysparkling. http://trivial.io/post/120179819751/pysparkling-is-a-native-implementation-of-the The benefit is that you can apply the same code that you use in PySpark on large datasets in pysparkling on small datasets or single documents. When running with pysparkling, there is no dependency on the Java Virtual Machine or Hadoop. Sven
Re: Spark SQL v MemSQL/Voltdb
Hi Ashish, Transactions are a big difference between Spark SQL and MemSQL/VoltDB, but there are other differences as well. I'm not an expert on Volt, but another difference between Spark SQL and MemSQL is that DataFrames do not support indexes and MemSQL tables do. This will have implications for scanning and query execution performance. Recently released MemSQL 4 also contains improvements to the distributed optimizer. For large, infrequently changing data sets, you could use the MemSQL column store and only need a single system for storage and query (Spark does not include storage natively, so you would need to use an external data store). You can also use Spark in combination with MemSQL, either row store or column store, using the MemSQL Spark Connector. Thanks, Conor On Thu, May 28, 2015 at 10:36 PM, Ashish Mukherjee ashish.mukher...@gmail.com wrote: Hi Mohit, Thanks for your reply. If my use case is purely querying read-only data (no transaction scenarios), at what scale is one of them a better option than the other? I am aware that for scale which can be supported on a single node, VoltDB is a better choice. However, when the scale grows to a clustered scenario, which is the right engine at various degrees of scale? Regards, Ashish On Fri, May 29, 2015 at 6:57 AM, Mohit Jaggi mohitja...@gmail.com wrote: I have used VoltDB and Spark. The use cases for the two are quite different. VoltDB is intended for transactions and also supports queries on the same(custom to voltdb) store. Spark(SQL) is NOT suitable for transactions; it is designed for querying immutable data (which may exist in several different forms of stores). On May 28, 2015, at 7:48 AM, Ashish Mukherjee ashish.mukher...@gmail.com wrote: Hello, I was wondering if there is any documented comparison of SparkSQL with MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow queries to be run in a clustered environment. What is the major differentiation? Regards, Ashish
Re: Spark Executor Memory Usage
For #2, see http://unix.stackexchange.com/questions/65835/htop-reporting-much-higher-memory-usage-than-free-or-top Cheers On Fri, May 29, 2015 at 6:56 AM, Valerii Moisieienko valeramoisee...@gmail.com wrote: Hello! My name is Valerii. I have noticed strange memory behaivour of Spark's executor on my cluster. Cluster works in standalone mode with 3 workers. Application runs in cluster mode. From topology configuration spark.executor.memory 1536m I checked heap usage via JVisualVM: http://joxi.ru/Q2KqBMdSvYpDrj and via htop: http://joxi.ru/Vm63RWeCvG6L2Z I have 2 questions regarding Spark's executors memory usage: 1. Why does Max Heap Size change during executor work? 2. Why does Memory usage via htop greater than executor's heap size? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Executor-Memory-Usage-tp23083.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Batch aggregation by sliding window + join
My point is if you keep daily aggregates already computed then you do not reprocess raw data. But yuh you may decide to recompute last 3 days everyday. On 29 May 2015 23:52, Igor Berman igor.ber...@gmail.com wrote: Hi Ayan, thanks for the response I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only core, might be I should?) What do you mean by materialized? I can repartitionAndSort by key daily-aggregation, however I'm not quite understand how it will help with yesterdays block which needs to be loaded from file and it has no connection to this repartition of daily block. On 29 May 2015 at 01:51, ayan guha guha.a...@gmail.com wrote: Which version of spark? In 1.4 window queries will show up for these kind of scenarios. 1 thing I can suggest is keep daily aggregates materialised and partioned by key and sorted by key-day combination using repartitionandsort method. It allows you to use custom partitioner and custom sorter. Best Ayan On 29 May 2015 03:31, igor.berman igor.ber...@gmail.com wrote: Hi, I have a batch daily job that computes daily aggregate of several counters represented by some object. After daily aggregation is done, I want to compute block of 3 days aggregation(3,7,30 etc) To do so I need to add new daily aggregation to the current block and then subtract from current block the daily aggregation of the last day within the current block(sliding window...) I've implemented it with something like: baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) All rdds are keyed by unique id(long). Each rdd is saved in avro files after the job finishes and loaded when job starts(on next day). baseBlockRdd is much larger than lastDay and newDay rdds(depends on the size of the block) Unfortunately the performance is not satisfactory due to many shuffles(I have parallelism etc) I was looking for the way to improve performance somehow, to make sure that one task joins same local keys without reshuffling baseBlockRdd(which is big) each time the job starts(see https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) so bottom line - how to join big rdd with smaller rdd without reshuffling big rdd over and over again? As soon as I've saved this big rdd and reloaded it from disk I want that every other rdd will be partitioned and collocated by the same partitioner(which is absent for hadooprdd) ... somehow, so that only small rdds will be sent over network. Another idea I had - somehow split baseBlock into 2 parts with filter by keys of small rdds and then join, however I'm not sure it's possible to implement this filter without join. any ideas would be appreciated, thanks in advance Igor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkR Jobs Hanging in collectPartitions
Sure. Looking more closely at the code, I thought I might have had an error in the flow of data structures in the R code, the line that extracts the words from the corpus is now, words - distinct(SparkR:::flatMap(corpus function(line) { strsplit( gsub(“^\\s+|[[:punct:]]”, “”, tolower(line)), “\\s”)[[1]] })) (just removes leading whitespace and all punctuation after having made the whole line lowercase, then splits to a vector of words, ultimately flattening the whole collection) Counts works on the resultant words list, returning the value expected, so the hang most likely occurs during the subtract. I should mention, the size of the corpus is very small, just kb in size. The dictionary I subtract against is also quite modest by Spark standards, just 4.8MB, and I’ve got 2G memory for the Worker, which ought to be sufficient for such a small job. The Scala analog runs quite fast, even with the subtract. If we look at the DAG for the SparkR job and compare that against the event timeline for Stage 3, it seems the job is stuck in Scheduler Delay (in 0/2 tasks completed) and never begins the rest of the stage. Unfortunately, the executor log hangs up as well, and doesn’t give much info. [cid:F966AC39-9916-4CBD-B447-5BF1C136F67E] Could you describe in a little more detail at what points data is actually held in R’s internal process memory? I was under the impression that SparkR:::textFile created an RDD object that would only be realized when a DAG requiring it was executed, and would therefore be part of the memory managed by Spark, and that memory would only be moved to R as an R object following a collect(), take(), etc. Thanks, Alek Eskilson From: Shivaram Venkataraman shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Reply-To: shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu Date: Wednesday, May 27, 2015 at 8:26 PM To: Aleksander Eskilson alek.eskil...@cerner.commailto:alek.eskil...@cerner.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkR Jobs Hanging in collectPartitions Could you try to see which phase is causing the hang ? i.e. If you do a count() after flatMap does that work correctly ? My guess is that the hang is somehow related to data not fitting in the R process memory but its hard to say without more diagnostic information. Thanks Shivaram On Tue, May 26, 2015 at 7:28 AM, Eskilson,Aleksander alek.eskil...@cerner.commailto:alek.eskil...@cerner.com wrote: I’ve been attempting to run a SparkR translation of a similar Scala job that identifies words from a corpus not existing in a newline delimited dictionary. The R code is: dict - SparkR:::textFile(sc, src1) corpus - SparkR:::textFile(sc, src2) words - distinct(SparkR:::flatMap(corpus, function(line) { gsub(“[[:punct:]]”, “”, tolower(strsplit(line, “ |,|-“)[[1]]))})) found - subtract(words, dict) (where src1, src2 are locations on HDFS) Then attempting something like take(found, 10) or saveAsTextFile(found, dest) should realize the collection, but that stage of the DAG hangs in Scheduler Delay during the collectPartitions phase. Synonymous Scala code however, val corpus = sc.textFile(src1).flatMap(_.split(“ |,|-“)) val dict = sc.textFile(src2) val words = corpus.map(word = word.filter(Character.isLetter(_))).disctinct() val found = words.subtract(dict) performs as expected. Any thoughts? Thanks, Alek Eskilson CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024tel:%28%2B1%29%20%28816%29221-1024.
Re: spark java.io.FileNotFoundException: /user/spark/applicationHistory/application
in yarn your executors might run on every node in your cluster, so you need to configure spark history to be on hdfs(so it will be accessible to every executor) probably you've switched from local to yarn mode when submitting -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-java-io-FileNotFoundException-user-spark-applicationHistory-application-tp23077p23084.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark1.3.1 build issue with CDH5.4.0 getUnknownFields
I've gotten that error when something is trying to use a different version of protobuf than you want. Maybe check out a `mvn dependency:tree` to see if someone is trying to use something other than libproto 2.5.0. (At least, 2.5.0 was current when I was having the problem) On Fri, May 29, 2015 at 10:23 AM, Chen Song chen.song...@gmail.com wrote: Regarding the build itself, hadoop-2.6 is not even a valid profile. I got the following WARNING for my build. [WARNING] The requested profile hadoop-2.6 could not be activated because it does not exist. Chen On Fri, May 29, 2015 at 2:38 AM, trackissue121 trackissue...@gmail.com wrote: I had already tested query in Hive CLI and it works fine. Same query shows error in Spark SQL. On May 29, 2015 4:14 AM, ayan guha guha.a...@gmail.com wrote: Probably a naive question: can you try the same in hive CLI and see if your SQL is working? Looks like hive thing to me as spark is faithfully delegating the query to hive. On 29 May 2015 03:22, Abhishek Tripathi trackissue...@gmail.com wrote: Hi , I'm using CDH5.4.0 quick start VM and tried to build Spark with Hive compatibility so that I can run Spark sql and access temp table remotely. I used below command to build Spark, it was build successful but when I tried to access Hive data from Spark sql, I get error. Thanks, Abhi --- *mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive -Phive-thriftserver -DskipTests clean package* [cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/ [cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql SET spark.sql.hive.version=0.13.1 spark-sql show tables; sample_07 false t1 false Time taken: 3.901 seconds, Fetched 2 row(s) spark-sql select * from t1; 15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1] java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto overrides final method *getUnknownFields* .()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) -- Chen Song
Format RDD/SchemaRDD contents to screen?
Im trying to debug query results inside spark-shell, but finding it cumbersome to save to file and then use file system utils to explore the results, and .foreach(print) tends to interleave the results among the myriad log messages. Take() and collect() truncate. Is there a simple way to present the contents of an RDD/SchemaRDD on the screen in a formatted way? For example, say I want to take() the first 30 lines/rows in an *RDD and present them in a readable way on the screen so that I can see what's missing or invalid. Obviously, I'm just trying to sample the results in a readable way, not download everything to the driver. Thank you
Re: Is anyone using Amazon EC2? (second attempt!)
Hi , Any update on this? I am not sure if the issue I am seeing is related .. I have 8 slaves and when I created the cluster I specified ebs volume with 100G. I see on Ec2 8 volumes created and each attached to the corresponding slave. But when I try to copy data on it , it complains that /root/ephemeral-hdfs/bin/hadoop fs -cp /intersection hdfs:// ec2-54-149-112-136.us-west-2.compute.amazonaws.com:9010/ 2015-05-28 23:40:35,447 WARN hdfs.DFSClient (DFSOutputStream.java:run(562)) - DataStreamer Exception org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /intersection/kmer150/commonGoodKmers/_temporary/_attempt_201504010056_0004_m_000428_3948/part-00428._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and no node(s) are excluded in this operation. It shows only 1 datanode , but for ephermal-hdfs it shows 8 datanodes. Any thoughts? Thanks _R On Sat, May 23, 2015 at 7:24 AM, Joe Wass jw...@crossref.org wrote: I used Spark on EC2 a while ago, but recent revisions seem to have broken the functionality. Is anyone actually using Spark on EC2 at the moment? The bug in question is: https://issues.apache.org/jira/browse/SPARK-5008 It makes it impossible to use persistent HDFS without a workround on each slave node. No-one seems to be interested in the bug, so I wonder if other people aren't actually having this problem. If this is the case, any suggestions? Joe
spark-sql errors
https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/6SqGuYemnbc
Re: Python implementation of RDD interface
DPark also can work in localhost without Mesos cluster (single thread or multiple process). I also think that running PySpark without JVM in local mode will help develop, so both pysparkling and DPark are both useful. On Fri, May 29, 2015 at 1:36 PM, Sven Kreiss s...@svenkreiss.com wrote: I have to admit that I never ran DPark. I think the goals are very different. The purpose of pysparkling is not to reproduce Spark on a cluster, but to have a lightweight implementation with the same interface to run locally or on an API server. I still run PySpark on a cluster to preprocess a large number of documents to train a scikit-learn classifier, but use pysparkling to preprocess single documents before applying that classifier in API calls. The only dependencies of pysparkling are boto and requests to access files via s3:// or http://; whereas DPark needs a Mesos cluster. On Fri, May 29, 2015 at 2:46 PM Davies Liu dav...@databricks.com wrote: There is another implementation of RDD interface in Python, called DPark [1], Could you have a few words to compare these two? [1] https://github.com/douban/dpark/ On Fri, May 29, 2015 at 8:29 AM, Sven Kreiss s...@svenkreiss.com wrote: I wanted to share a Python implementation of RDDs: pysparkling. http://trivial.io/post/120179819751/pysparkling-is-a-native-implementation-of-the The benefit is that you can apply the same code that you use in PySpark on large datasets in pysparkling on small datasets or single documents. When running with pysparkling, there is no dependency on the Java Virtual Machine or Hadoop. Sven - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is anyone using Amazon EC2? (second attempt!)
I use spark on EC2 but it's a CDH 5.3.3 distribution (starving developer version) installed thru Cloudera Manager. Spark is configured to run on Yarn. Regards Sanjay Sent from my iPhone On May 29, 2015, at 6:16 PM, roni roni.epi...@gmail.com wrote: Hi , Any update on this? I am not sure if the issue I am seeing is related .. I have 8 slaves and when I created the cluster I specified ebs volume with 100G. I see on Ec2 8 volumes created and each attached to the corresponding slave. But when I try to copy data on it , it complains that /root/ephemeral-hdfs/bin/hadoop fs -cp /intersection hdfs://ec2-54-149-112-136.us-west-2.compute.amazonaws.com:9010/ 2015-05-28 23:40:35,447 WARN hdfs.DFSClient (DFSOutputStream.java:run(562)) - DataStreamer Exception org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /intersection/kmer150/commonGoodKmers/_temporary/_attempt_201504010056_0004_m_000428_3948/part-00428._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and no node(s) are excluded in this operation. It shows only 1 datanode , but for ephermal-hdfs it shows 8 datanodes. Any thoughts? Thanks _R On Sat, May 23, 2015 at 7:24 AM, Joe Wass jw...@crossref.org wrote: I used Spark on EC2 a while ago, but recent revisions seem to have broken the functionality. Is anyone actually using Spark on EC2 at the moment? The bug in question is: https://issues.apache.org/jira/browse/SPARK-5008 It makes it impossible to use persistent HDFS without a workround on each slave node. No-one seems to be interested in the bug, so I wonder if other people aren't actually having this problem. If this is the case, any suggestions? Joe
Re: Format RDD/SchemaRDD contents to screen?
Depending on your spark version, you can convert schemaRDD to a dataframe and then use .show() On 30 May 2015 10:33, Minnow Noir minnown...@gmail.com wrote: Im trying to debug query results inside spark-shell, but finding it cumbersome to save to file and then use file system utils to explore the results, and .foreach(print) tends to interleave the results among the myriad log messages. Take() and collect() truncate. Is there a simple way to present the contents of an RDD/SchemaRDD on the screen in a formatted way? For example, say I want to take() the first 30 lines/rows in an *RDD and present them in a readable way on the screen so that I can see what's missing or invalid. Obviously, I'm just trying to sample the results in a readable way, not download everything to the driver. Thank you
RE: Official Docker container for Spark
Thanks all for your reply. I was evaluating which one fits best for me. I picked epahomov/docker-spark from docker registry and suffice my need. Thanks Tridib Date: Fri, 22 May 2015 14:15:42 +0530 Subject: Re: Official Docker container for Spark From: riteshoneinamill...@gmail.com To: 917361...@qq.com CC: tridib.sama...@live.com; user@spark.apache.org Use this: sequenceiq/docker Here's a link to their github repo: docker-spark They have repos for other big data tools too which are agin really nice. Its being maintained properly by their devs and
Security,authorization and governance
Hi Team, Is there any opensource framework/tool for providing security authorization and data governance to spark. Regards Phani Kumar
Re: dataframe cumulative sum
Hi Cesar, We just added it in Spark 1.4. In Spark 1.4, You can use window function in HiveContext to do it. Assuming you want to calculate the cumulative sum for every flag, import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ df.select( $flag, $price, sum($price).over(Window.partitionBy(flag).orderBy(price).rowsBetween(Long.MinValue, 0 In the code, over lets Spark SQL knows that you want to use window function sum. partitionBy(flag) will partition the table by the value of flag and the sum's scope is a single partition. orderBy(price) will sort rows in a partition based on the value of price (probably this does not really matter for your case. But using orderBy will make the result deterministic). Finally, rowsBetween(Long.MinValue, 0) means that the sum value for every row is calculated from price values of the first row in the partition to the current row (so, you get the cumulative sum). Thanks, Yin On Fri, May 29, 2015 at 8:09 AM, Cesar Flores ces...@gmail.com wrote: What will be the more appropriate method to add a cumulative sum column to a data frame. For example, assuming that I have the next data frame: flag | price -- 1|47.808764653746 1|47.808764653746 1|31.9869279512204 How can I create a data frame with an extra cumsum column as the next one: flag | price | cumsum_price --|--- 1|47.808764653746 | 47.808764653746 1|47.808764653746 | 95.6175293075 1|31.9869279512204| 127.604457259 Thanks -- Cesar Flores
Re: SparkR Jobs Hanging in collectPartitions
For jobs with R UDFs (i.e. when we use the RDD API from SparkR) we use R on both the driver side and on the worker side. So in this case when the `flatMap` operation is run, the data is sent from the JVM to an R process on the worker which in turn executes the `gsub` function. Could you turn on INFO logging and send a pointer to the log file ? Its pretty clear that the problem is happening in the call to `subtract`, which in turn is doing a shuffle operation, but I am not sure why this should happen. Thanks Shivaram On Fri, May 29, 2015 at 7:56 AM, Eskilson,Aleksander alek.eskil...@cerner.com wrote: Sure. Looking more closely at the code, I thought I might have had an error in the flow of data structures in the R code, the line that extracts the words from the corpus is now, words - distinct(SparkR:::flatMap(corpus function(line) { strsplit( gsub(“^\\s+|[[:punct:]]”, “”, tolower(line)), “\\s”)[[1]] })) (just removes leading whitespace and all punctuation after having made the whole line lowercase, then splits to a vector of words, ultimately flattening the whole collection) Counts works on the resultant words list, returning the value expected, so the hang most likely occurs during the subtract. I should mention, the size of the corpus is very small, just kb in size. The dictionary I subtract against is also quite modest by Spark standards, just 4.8MB, and I’ve got 2G memory for the Worker, which ought to be sufficient for such a small job. The Scala analog runs quite fast, even with the subtract. If we look at the DAG for the SparkR job and compare that against the event timeline for Stage 3, it seems the job is stuck in Scheduler Delay (in 0/2 tasks completed) and never begins the rest of the stage. Unfortunately, the executor log hangs up as well, and doesn’t give much info. Could you describe in a little more detail at what points data is actually held in R’s internal process memory? I was under the impression that SparkR:::textFile created an RDD object that would only be realized when a DAG requiring it was executed, and would therefore be part of the memory managed by Spark, and that memory would only be moved to R as an R object following a collect(), take(), etc. Thanks, Alek Eskilson From: Shivaram Venkataraman shiva...@eecs.berkeley.edu Reply-To: shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edu Date: Wednesday, May 27, 2015 at 8:26 PM To: Aleksander Eskilson alek.eskil...@cerner.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: SparkR Jobs Hanging in collectPartitions Could you try to see which phase is causing the hang ? i.e. If you do a count() after flatMap does that work correctly ? My guess is that the hang is somehow related to data not fitting in the R process memory but its hard to say without more diagnostic information. Thanks Shivaram On Tue, May 26, 2015 at 7:28 AM, Eskilson,Aleksander alek.eskil...@cerner.com wrote: I’ve been attempting to run a SparkR translation of a similar Scala job that identifies words from a corpus not existing in a newline delimited dictionary. The R code is: dict - SparkR:::textFile(sc, src1) corpus - SparkR:::textFile(sc, src2) words - distinct(SparkR:::flatMap(corpus, function(line) { gsub(“[[:punct:]]”, “”, tolower(strsplit(line, “ |,|-“)[[1]]))})) found - subtract(words, dict) (where src1, src2 are locations on HDFS) Then attempting something like take(found, 10) or saveAsTextFile(found, dest) should realize the collection, but that stage of the DAG hangs in Scheduler Delay during the collectPartitions phase. Synonymous Scala code however, val corpus = sc.textFile(src1).flatMap(_.split(“ |,|-“)) val dict = sc.textFile(src2) val words = corpus.map(word = word.filter(Character.isLetter(_))).disctinct() val found = words.subtract(dict) performs as expected. Any thoughts? Thanks, Alek Eskilson CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python implementation of RDD interface
I have to admit that I never ran DPark. I think the goals are very different. The purpose of pysparkling is not to reproduce Spark on a cluster, but to have a lightweight implementation with the same interface to run locally or on an API server. I still run PySpark on a cluster to preprocess a large number of documents to train a scikit-learn classifier, but use pysparkling to preprocess single documents before applying that classifier in API calls. The only dependencies of pysparkling are boto and requests to access files via s3:// or http://; whereas DPark needs a Mesos cluster. On Fri, May 29, 2015 at 2:46 PM Davies Liu dav...@databricks.com wrote: There is another implementation of RDD interface in Python, called DPark [1], Could you have a few words to compare these two? [1] https://github.com/douban/dpark/ On Fri, May 29, 2015 at 8:29 AM, Sven Kreiss s...@svenkreiss.com wrote: I wanted to share a Python implementation of RDDs: pysparkling. http://trivial.io/post/120179819751/pysparkling-is-a-native-implementation-of-the The benefit is that you can apply the same code that you use in PySpark on large datasets in pysparkling on small datasets or single documents. When running with pysparkling, there is no dependency on the Java Virtual Machine or Hadoop. Sven
Re: Python implementation of RDD interface
There is another implementation of RDD interface in Python, called DPark [1], Could you have a few words to compare these two? [1] https://github.com/douban/dpark/ On Fri, May 29, 2015 at 8:29 AM, Sven Kreiss s...@svenkreiss.com wrote: I wanted to share a Python implementation of RDDs: pysparkling. http://trivial.io/post/120179819751/pysparkling-is-a-native-implementation-of-the The benefit is that you can apply the same code that you use in PySpark on large datasets in pysparkling on small datasets or single documents. When running with pysparkling, there is no dependency on the Java Virtual Machine or Hadoop. Sven - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Anybody using Spark SQL JDBC server with DSE Cassandra?
Hi - We have successfully integrated Spark SQL with Cassandra. We have a backend that provides a REST API that allows users to execute SQL queries on data in C*. Now we would like to also support JDBC/ODBC connectivity , so that user can use tools like Tableau to query data in C* through the Spark SQL JDBC server. However, I have been unable to find a driver that would allow the Spark SQL Thrift/JDBC server to connect with Cassandra. DataStax provides a closed-source driver that comes only with the DSE version of Cassandra. I would like to find out how many people are using the Spark SQL JDBC server + DSE Cassandra combination. If you do use Spark SQL JDBC server + DSE, I would appreciate if you could share your experience. For example, what kind of issues you have run into? How is the performance? What reporting tools you are using? Thank you. Mohammed
Re: Execption writing on two cassandra tables NoHostAvailableException: All host(s) tried for query failed (no host was tried)
sure I can, everything is on localhost . . . . it only happens when i want to write in two or more tables in the same schema A G 2015-05-29 16:10 GMT+02:00 Yana Kadiyska yana.kadiy...@gmail.com: are you able to connect to your cassandra installation via cassandra_home$ ./bin/cqlsh This exception generally means that your cassandra instance is not reachable/accessible On Fri, May 29, 2015 at 6:11 AM, Antonio Giambanco antogia...@gmail.com wrote: Hi all, I have in a single server installed spark 1.3.1 and cassandra 2.0.14 I'm coding a simple java class for Spark Streaming as follow: - reading header events from flume sink - based on header I write the event body on navigation or transaction table (cassandra) unfortunatly I get NoHostAvailableException, if I comment the code for saving one of the two tables everything works *here the code* public static void main(String[] args) { // Create a local StreamingContext with two working thread and batch interval of 1 second SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(DWXNavigationApp); conf.set(spark.cassandra.connection.host, 127.0.0.1); conf.set(spark.cassandra.connection.native.port,9042); conf.set(spark.cassandra.output.batch.size.rows, 1); conf.set(spark.cassandra.output.concurrent.writes, 1); final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaReceiverInputDStreamSparkFlumeEvent flumeStreamNavig = FlumeUtils.createPollingStream(jssc, 127.0.0.1, ); JavaDStreamString logRowsNavig = flumeStreamNavig.map( new FunctionSparkFlumeEvent,String(){ @Override public String call(SparkFlumeEvent arg0) throws Exception { // TODO Auto-generated method stub0. MapCharSequence,CharSequence headers = arg0.event().getHeaders(); ByteBuffer bytePayload = arg0.event().getBody(); String s = headers.get(source_log).toString() + # + new String(bytePayload.array()); System.out.println(RIGA: + s); return s; } }); logRowsNavig.foreachRDD( new FunctionJavaRDDString,Void(){ @Override public Void call(JavaRDDString rows) throws Exception { if(!rows.isEmpty()){ //String header = getHeaderFronRow(rows.collect()); ListNavigation listNavigation = new ArrayListNavigation(); ListTransaction listTransaction = new ArrayListTransaction(); for(String row : rows.collect()){ String header = row.substring(0, row.indexOf(#)); if(header.contains(controller_log)){ listNavigation.add(createNavigation(row)); System.out.println(Added Element in Navigation List); }else if(header.contains(business_log)){ listTransaction.add(createTransaction(row)); System.out.println(Added Element in Transaction List); } } if(!listNavigation.isEmpty()){ JavaRDDNavigation navigationRows= jssc.sparkContext().parallelize(listNavigation); javaFunctions(navigationRows).writerBuilder(cassandrasink, navigation, mapToRow(Navigation.class)).saveToCassandra(); } if(!listTransaction.isEmpty()){ JavaRDDTransaction transactionRows= jssc.sparkContext().parallelize(listTransaction); javaFunctions(transactionRows).writerBuilder(cassandrasink, transaction, mapToRow(Transaction.class)).saveToCassandra(); } } return null; } }); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *here the exception* 15/05/29 11:19:29 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBatchStatement@ab76b83 com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried) at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107) at com.datastax.driver.core.SessionManager.execute(SessionManager.java:538) at com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:577) at