Re: SparkSQL 1.4 can't accept registration of UDF?
The same issue (A custome udf jar added through 'add jar' is not recognized) is observed on Spark 1.4.1. Instead of executing, beelineadd jar udf.jar My workaround is either 1) to pass the udf.jar by using --jars while starting ThriftServer (This didn't work in AWS EMR's Spark 1.4.0.b). or 2) to add the custom UDF jar into SPARK_CLASSPATH ( It works in AWS EMR) Thanks, On Tue, Jul 14, 2015 at 9:29 PM, Okehee Goh oke...@gmail.com wrote: The command list jar doesn't seem accepted in beeline with Spark's ThriftServer in both Spark 1.3.1 and Spark1.4. 0: jdbc:hive2://localhost:1 list jar; Error: org.apache.spark.sql.AnalysisException: cannot recognize input near 'list' 'jar' 'EOF'; line 1 pos 0 (state=,code=0) Thanks On Tue, Jul 14, 2015 at 8:46 PM, prosp4300 prosp4...@163.com wrote: What's the result of list jar in both 1.3.1 and 1.4.0, please check if there is any difference At 2015-07-15 08:10:44, ogoh oke...@gmail.com wrote: Hello, I am using SparkSQL along with ThriftServer so that we can access using Hive queries. With Spark 1.3.1, I can register UDF function. But, Spark 1.4.0 doesn't work for that. The jar of the udf is same. Below is logs: I appreciate any advice. == With Spark 1.4 Beeline version 1.4.0 by Apache Hive 0: jdbc:hive2://localhost:1 add jar hdfs:///user/hive/lib/dw-udf-2015.06.06-SNAPSHOT.jar; 0: jdbc:hive2://localhost:1 create temporary function parse_trace as 'com. mycom.dataengine.udf.GenericUDFParseTraceAnnotation'; 15/07/14 23:49:43 DEBUG transport.TSaslTransport: writing data length: 206 15/07/14 23:49:43 DEBUG transport.TSaslTransport: CLIENT: reading data length: 201 Error: org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.FunctionTask (state=,code=0) == With Spark 1.3.1: Beeline version 1.3.1 by Apache Hive 0: jdbc:hive2://localhost:10001 add jar hdfs:///user/hive/lib/dw-udf-2015.06.06-SNAPSHOT.jar; +-+ | Result | +-+ +-+ No rows selected (1.313 seconds) 0: jdbc:hive2://localhost:10001 create temporary function parse_trace as 'com. mycom.dataengine.udf.GenericUDFParseTraceAnnotation'; +-+ | result | +-+ +-+ No rows selected (0.999 seconds) === The logs of ThriftServer of Spark 1.4.0 15/07/14 23:49:43 INFO SparkExecuteStatementOperation: Running query 'create temporary function parse_trace as 'com.quixey.dataengine.udf.GenericUDFParseTraceAnnotation'' 15/07/14 23:49:43 INFO ParseDriver: Parsing command: create temporary function parse_trace as 'com.quixey.dataengine.udf.GenericUDFParseTraceAnnotation' 15/07/14 23:49:43 INFO ParseDriver: Parse Completed 15/07/14 23:49:43 INFO PerfLogger: PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver 15/07/14 23:49:43 INFO PerfLogger: PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver 15/07/14 23:49:43 INFO Driver: Concurrency mode is disabled, not creating a lock manager 15/07/14 23:49:43 INFO PerfLogger: PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver 15/07/14 23:49:43 INFO PerfLogger: PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver 15/07/14 23:49:43 INFO ParseDriver: Parsing command: create temporary function parse_trace as 'com.quixey.dataengine.udf.GenericUDFParseTraceAnnotation' 15/07/14 23:49:43 INFO ParseDriver: Parse Completed 15/07/14 23:49:43 INFO PerfLogger: /PERFLOG method=parse start=1436917783106 end=1436917783106 duration=0 from=org.apache.hadoop.hive.ql.Driver 15/07/14 23:49:43 INFO PerfLogger: PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver 15/07/14 23:49:43 INFO HiveMetaStore: 2: get_database: default 15/07/14 23:49:43 INFO audit: ugi=anonymous ip=unknown-ip-addr cmd=get_database: default 15/07/14 23:49:43 INFO HiveMetaStore: 2: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/07/14 23:49:43 INFO ObjectStore: ObjectStore, initialize called 15/07/14 23:49:43 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/07/14 23:49:43 INFO Query: Reading in results for query org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used is closing 15/07/14 23:49:43 INFO ObjectStore: Initialized ObjectStore 15/07/14 23:49:43 INFO FunctionSemanticAnalyzer: analyze done 15/07/14 23:49:43 INFO Driver: Semantic Analysis Completed 15/07/14 23:49:43 INFO PerfLogger: /PERFLOG method=semanticAnalyze start=1436917783106 end=1436917783114 duration=8 from=org.apache.hadoop.hive.ql.Driver 15/07/14 23:49:43 INFO Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null) 15/07/14 23:49:43 INFO PerfLogger: /PERFLOG method=compile start=1436917783106 end=1436917783114 duration=8 from=org.apache.hadoop.hive.ql.Driver 15/07/14 23:49:43 INFO PerfLogger: PERFLOG method=Driver.execute
create HiveContext if available, otherwise SQLContext
has anyone tried to make HiveContext only if the class is available? i tried this: implicit lazy val sqlc: SQLContext = try { Class.forName(org.apache.spark.sql.hive.HiveContext, true, Thread.currentThread.getContextClassLoader) .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] } catch { case e: ClassNotFoundException = new SQLContext(sc) } it compiles fine, but i get classloader issues when i actually use it on a cluster. for example: Exception in thread main java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:216) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:229) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
Re: Running foreach on a list of rdds in parallel
sc.union(rdds).saveAsTextFile() On Wed, Jul 15, 2015 at 10:37 PM, Brandon White bwwintheho...@gmail.com wrote: Hello, I have a list of rdds List(rdd1, rdd2, rdd3,rdd4) I would like to save these rdds in parallel. Right now, it is running each operation sequentially. I tried using a rdd of rdd but that does not work. list.foreach { rdd = rdd.saveAsTextFile(/tmp/cache/) } Any ideas? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame InsertIntoJdbc() Runtime Exception on cluster
Which version of spark are you using? insertIntoJDBC is deprecated (from 1.4.0), you may use write.jdbc() instead. Thanks Best Regards On Wed, Jul 15, 2015 at 2:43 PM, Manohar753 manohar.re...@happiestminds.com wrote: Hi All, Am trying to add few new rows for existing table in mysql using DataFrame.But it is adding new rows to the table in local environment but on spark cluster below is the runtime exception. Exception in thread main java.lang.RuntimeException: Table msusers_1 already exists. at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:240) at org.apache.spark.sql.DataFrame.insertIntoJDBC(DataFrame.scala:1481) at com.sparkexpert.UserMigration.main(UserMigration.java:59) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/07/15 08:13:42 INFO spark.SparkContext: Invoking stop() from shutdown hook 15/07/15 08:13:42 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null} 15/07/15 08:13: code snippet is below: System.out.println(Query); MapString, String options = new HashMap(); options.put(driver, PropertyLoader.getProperty(Constants.msSqlDriver)); options.put(url, PropertyLoader.getProperty(Constants.msSqlURL)); options.put(dbtable,Query); options.put(numPartitions, 1); DataFrame delatUsers = sqlContext.load(jdbc, options); delatUsers.show(); //Load latest users DataFrame String mysQuery=(SELECT * FROM msusers_1) as employees_name; MapString, String msoptions = new HashMap(); msoptions.put(driver,PropertyLoader.getProperty(Constants.mysqlDriver)); msoptions.put(url, PropertyLoader.getProperty(Constants.mysqlUrl)); msoptions.put(dbtable,mysQuery); msoptions.put(numPartitions, 1); DataFrame latestUsers = sqlContext.load(jdbc, msoptions); //Get Update users Data DataFrame updatedUsers = delatUsers.as(ms).join(latestUsers.as(lat), col(lat.uid).equalTo(col(ms.uid)), inner).select(ms.revision,ms.uid,ms.UserType,ms.FirstName,ms.LastName,ms.Email,ms.smsuser_id,ms.dev_acct,ms.lastlogin,ms.username,ms.schoolAffiliation,ms.authsystem_id,ms.AdminStatus); //Insert new users into Mysql DB * delatUsers.except(updatedUsers).insertIntoJDBC(PropertyLoader.getProperty(Constants.mysqlUrl), msusers_1, false); * the bold line is the Exception occur line. Team please give me some inputs if any one had come across this . but for the same override the table is working fine on cluster also. Thanks, manoar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-InsertIntoJdbc-Runtime-Exception-on-cluster-tp23851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Will multiple filters on the same RDD optimized to one filter?
What if I would use both rdd1 and rdd2 later? Raghavendra Pandey raghavendra.pan...@gmail.com于2015年7月16日周四 下午4:08写道: If you cache rdd it will save some operations. But anyway filter is a lazy operation. And it runs based on what you will do later on with rdd1 and rdd2... Raghavendra On Jul 16, 2015 1:33 PM, Bin Wang wbi...@gmail.com wrote: If I write code like this: val rdd = input.map(_.value) val f1 = rdd.filter(_ == 1) val f2 = rdd.filter(_ == 2) ... Then the DAG of the execution may be this: - Filter - ... Map - Filter - ... But the two filters is operated on the same RDD, which means it could be done by just scan the RDD once. Does spark have this kind optimization for now?
Re: DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items
Hi Nikos, How many columns and distinct values of some_column are there in the DataFrame? Parquet writer is known to be very memory consuming for wide tables. And lots of distinct partition column values result in many concurrent Parquet writers. One possible workaround is to first repartition the data by partition columns first. Cheng On 7/15/15 7:05 PM, Nikos Viorres wrote: Hi, I am trying to test partitioning for DataFrames with parquet usage so i attempted to do df.write().partitionBy(some_column).parquet(path) on a small dataset of 20.000 records which when saved as parquet locally with gzip take 4mb of disk space. However, on my dev machine with -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always fails with an OutOfMemoryError. Does anyone have any ideas? stack trace: [Stage 2: (0 + 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in stage 2.0 (TID 8) java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread Thread[Executor task launch worker-2,5,main] java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at
Invalid HDFS path exception
Hi. I am trying to use Apache Spark in a Restful web service in which I am trying to query the data from Hive tables using Apache Spark Sql. This is my java class SparkConf sparkConf = new SparkConf().setAppName(Hive).setMaster(local).setSparkHome(Path); JavaSparkContext ctx = new JavaSparkContext(sparkConf); HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(ctx.sc()); sqlContext.sql(CREATE TABLE if not exists page (VisitedDate STRING,Platform STRING,VisitCount int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'); sqlContext.sql(LOAD DATA INPATH '/page' INTO TABLE page); Row[] result = sqlContext.sql(Select * from tablename where Platform='Apache').collect(); ctx.close(); When I deploy this service it throws invalid path as it looks for the file in local file system. But when I worked in the scala shell it works fine. when analyzed I found that it is not taking hive configuration file in spark home and I have also set spark home in my code but it doesn't work out. And also I found that metastore_db folder is created inside glass fish server which is used in this service. Can any one tell me how to solve this issue -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Invalid-HDFS-path-exception-tp23875.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
S3 Read / Write makes executors deadlocked
Given the following code which just reads from s3, then saves files to s3 val inputFileName: String = s3n://input/file/path val outputFileName: String = s3n://output/file/path val conf = new SparkConf().setAppName(this.getClass.getName).setMaster(local[4]) val sparkContext = new SparkContext(conf) // Problems here: executors thread locked sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName) // But this one works sparkContext.textFile(inputFileName).count() It blocks without showing any exceptions or errors. jstack shows that all executors are locked. The thread dump is in end of this post. I am using spark-1.4.0 on my PC which has 4 CPU cores. There are 21 parquet files in the input directory, 500KB / file. In addition, if we change the last action to a non IO bounded one, for example, count(). It works. It seems that S3 read and write in the same stage makes executors deadlocked. I encountered the same problem when using DataFrame load/save operations, jira created: https://issues.apache.org/jira/browse/SPARK-8869 Executor task launch worker-3 #69 daemon prio=5 os_prio=0 tid=0x7f7bd4036800 nid=0x1296 in Object.wait() [0x7f7c1099a000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518) - *locked* 0xe56745b8 (a org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- Hao Ren Data Engineer @ leboncoin Paris, France
Re: Sorted Multiple Outputs
Hi Eugene, thanks for your response! Your recommendation makes sense, that's what I more or less tried. The problem that I am facing is that inside foreachPartition() I cannot create a new rdd and use saveAsTextFile. It would probably make sense to write directly to HDFS using the Java API. When I tried that I was getting errors similar to this: Failed on local exception: java.io.InterruptedIOException: Interruped while waiting for IO on channel java.nio.channels.SocketChannel Probably it's hitting a race condition. Has anyone else faced this situation? Any suggestions? Thanks a lot! On 15 July 2015 at 14:04, Eugene Morozov fathers...@list.ru wrote: Yiannis , It looks like you might explore other approach. sc.textFile(input/path) .map() // your own implementation .partitionBy(new HashPartitioner(num)) .groupBy() //your own implementation, as a result - PairRDD of key vs Iterable of values .foreachPartition() On the last step you could sort all values for the key and store them into separate file even into the same directory of all other files for other keys. HashParititoner must guarantee that all values for specific key will reside in just one partition, but it might happen that one partition might contain more, than one key (with values). This I’m not sure, but that shouldn’t be a big deal as you would iterate over tuplekey, Iterablevalue and store one key to a specific file. On 15 Jul 2015, at 03:23, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I have been using the approach described here: http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job In addition to that, I was wondering if there is a way to set the customize the order of those values contained in each file. Thanks a lot! Eugene Morozov fathers...@list.ru
Spark 1.4.0 org.apache.spark.sql.AnalysisException: cannot resolve 'probability' given input columns
Hi forumI am currently using Spark 1.4.0, and started using the ML pipeline framework.I ran the example program ml.JavaSimpleTextClassificationPipeline which uses the LogisticRegression. But I wanted to do multiclass classification, so I used DecisionTreeClassifier present in the org.apache.spark.ml.classification package.The model got trained properly using the fit method, but when testing the model using the print statement from above example, I am getting following error that 'probability' column is not present.Is this column present only for LogisticRegression? If so can I see what are the possible columns present after DecisionTreeClassifier predicts the output? Also, one morething how can I convert the predicted output back to String format if I am using StringIndexer.*org.apache.spark.sql.AnalysisException: cannot resolve 'probability' given input columns id, prediction, labelStr, data, features, words, label;*at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:63) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:52) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:122) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:52) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42) at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:920) at org.apache.spark.sql.DataFrame.(DataFrame.scala:131) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:611) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:611) at com.xxx.ml.xxx.execute(xxx.java:129) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-org-apache-spark-sql-AnalysisException-cannot-resolve-probability-given-input-columns-tp23874.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Will multiple filters on the same RDD optimized to one filter?
If you cache rdd it will save some operations. But anyway filter is a lazy operation. And it runs based on what you will do later on with rdd1 and rdd2... Raghavendra On Jul 16, 2015 1:33 PM, Bin Wang wbi...@gmail.com wrote: If I write code like this: val rdd = input.map(_.value) val f1 = rdd.filter(_ == 1) val f2 = rdd.filter(_ == 2) ... Then the DAG of the execution may be this: - Filter - ... Map - Filter - ... But the two filters is operated on the same RDD, which means it could be done by just scan the RDD once. Does spark have this kind optimization for now?
Re: Spark streaming Processing time keeps increasing
MAke sure you provide the filterFunction with the invertible reduceByKeyAndWindow. Otherwise none of the keys will get removed, and the key space will continue increase. This is what is leading to the lag. So use the filtering function to filter out the keys that are not needed any more. On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das ak...@sigmoidanalytics.com wrote: What is your data volume? Are you having checkpointing/WAL enabled? In that case make sure you are having SSD disks as this behavior is mainly due to the IO wait. Thanks Best Regards On Thu, Jul 16, 2015 at 8:43 AM, N B nb.nos...@gmail.com wrote: Hello, We have a Spark streaming application and the problem that we are encountering is that the batch processing time keeps on increasing and eventually causes the application to start lagging. I am hoping that someone here can point me to any underlying cause of why this might happen. The batch interval is 1 minute as of now and the app does some maps, filters, joins and reduceByKeyAndWindow operations. All the reduces are invertible functions and so we do provide the inverse-reduce functions in all those. The largest window size we have is 1 hour right now. When the app is started, we see that the batch processing time is between 20 and 30 seconds. It keeps creeping up slowly and by the time it hits the 1 hour mark, it somewhere around 35-40 seconds. Somewhat expected and still not bad! I would expect that since the largest window we have is 1 hour long, the application should stabilize around the 1 hour mark and start processing subsequent batches within that 35-40 second zone. However, that is not what is happening. The processing time still keeps increasing and eventually in a few hours it exceeds 1 minute mark and then starts lagging. Eventually the lag builds up and becomes in minutes at which point we have to restart the system. Any pointers on why this could be happening and what we can do to troubleshoot further? Thanks Nikunj
Re: Job aborted due to stage failure: Task not serializable:
Did you try this? *val out=lines.filter(xx={* val y=xx val x=broadcastVar.value var flag:Boolean=false for(a-x) { if(y.contains(a)) flag=true } flag } *})* Thanks Best Regards On Wed, Jul 15, 2015 at 8:10 PM, Naveen Dabas naveen.u...@ymail.com wrote: I am using the below code and using kryo serializer .when i run this code i got this error : Task not serializable in commented line 2) how broadcast variables are treated in exceotu.are they local variables or can be used in any function defined as global variables. object StreamingLogInput { def main(args: Array[String]) { val master = args(0) val conf = new SparkConf().setAppName(StreamingLogInput) // Create a StreamingContext with a 1 second batch size val sc = new SparkContext(conf) val lines=sc.parallelize(List(eoore is test,testing is error report)) //val ssc = new StreamingContext(sc, Seconds(30)) //val lines = ssc.socketTextStream(localhost, ) val filter=sc.textFile(/user/nadabas/filters/fltr) val filarr=filter.collect().toArray val broadcastVar = sc.broadcast(filarr) // val out=lines.transform{rdd=rdd.filter(x=fil(broadcastVar.value,x))} *val out=lines.filter(x=fil(broadcastVar.value,x)) //error is coming* out.collect() } def fil(x1:Array[String],y1:String)={ val y=y1 // val x=broadcastVar.value val x=x1 var flag:Boolean=false for(a-x) { if(y.contains(a)) flag=true } flag } }
Re: Spark cluster read local files
Yes you can do that, just make sure you rsync the same file to the same location on every machine. Thanks Best Regards On Thu, Jul 16, 2015 at 5:50 AM, Julien Beaudan jbeau...@stottlerhenke.com wrote: Hi all, Is it possible to use Spark to assign each machine in a cluster the same task, but on files in each machine's local file system, and then have the results sent back to the driver program? Thank you in advance! Julien
Will multiple filters on the same RDD optimized to one filter?
If I write code like this: val rdd = input.map(_.value) val f1 = rdd.filter(_ == 1) val f2 = rdd.filter(_ == 2) ... Then the DAG of the execution may be this: - Filter - ... Map - Filter - ... But the two filters is operated on the same RDD, which means it could be done by just scan the RDD once. Does spark have this kind optimization for now?
Indexed Store for lookup table
Hello, I have been using IndexedRDD as a large lookup (1 billion records) to join with small tables (1 million rows). The performance of indexedrdd is great until it has to be persisted on disk. Are there any alternatives to IndexedRDD or any changes to how I use it to improve performance with big data volumes? Kindest Regards, Jem
Re: Spark streaming Processing time keeps increasing
Thanks Akhil. For doing reduceByKeyAndWindow, one has to have checkpointing enabled. So, yes we do have it enabled. But not Write Ahead Log because we don't have a need for recovery and we do not recover the process state on restart. I don't know if IO Wait fully explains the increasing processing time. Below is a full minute of 'sar' output every 2 seconds. The iowait values don't seem too bad to me except for a brief small spike in the middle. Also, how does one explain the continued degradation of processing time even beyond the largest window interval? Thanks Nikunj $ sar 2 30 Linux 3.13.0-48-generic (ip-X-X-X-X)07/16/2015 _x86_64_ (16 CPU) 01:11:14 AM CPU %user %nice %system %iowait%steal %idle 01:11:16 AM all 66.70 0.03 11.10 0.03 0.00 22.13 01:11:18 AM all 79.99 0.00 10.81 0.00 0.03 9.17 01:11:20 AM all 62.66 0.03 10.84 0.00 0.03 26.43 01:11:22 AM all 68.59 0.00 10.83 0.00 0.10 20.49 01:11:24 AM all 77.74 0.00 10.83 0.00 0.03 11.40 01:11:26 AM all 65.01 0.00 10.83 0.03 0.07 24.06 01:11:28 AM all 66.33 0.00 10.87 0.00 0.03 22.77 01:11:30 AM all 72.38 0.03 12.48 0.54 0.06 14.50 01:11:32 AM all 68.35 0.00 12.98 7.46 0.03 11.18 01:11:34 AM all 75.94 0.03 14.02 3.27 0.03 6.71 01:11:36 AM all 68.60 0.00 14.34 2.76 0.03 14.27 01:11:38 AM all 61.99 0.03 13.34 0.07 0.07 24.51 01:11:40 AM all 52.21 0.03 12.79 1.04 0.13 33.79 01:11:42 AM all 37.91 0.03 12.43 0.03 0.10 49.48 01:11:44 AM all 26.92 0.00 11.68 0.14 0.10 61.16 01:11:46 AM all 24.86 0.00 12.07 0.00 0.10 62.97 01:11:48 AM all 25.49 0.00 11.96 0.00 0.10 62.45 01:11:50 AM all 21.16 0.00 12.35 0.03 0.14 66.32 01:11:52 AM all 29.89 0.00 12.06 0.03 0.10 57.91 01:11:54 AM all 26.77 0.00 11.81 0.00 0.10 61.32 01:11:56 AM all 25.34 0.03 11.81 0.03 0.14 62.65 01:11:58 AM all 22.42 0.00 12.60 0.00 0.10 64.88 01:12:00 AM all 30.27 0.00 12.10 0.03 0.14 57.46 01:12:02 AM all 80.59 0.00 10.58 0.35 0.03 8.44 01:12:04 AM all 49.05 0.00 12.89 0.66 0.07 37.32 01:12:06 AM all 31.21 0.03 13.54 6.54 0.17 48.50 01:12:08 AM all 31.66 0.00 13.26 6.30 0.10 48.67 01:12:10 AM all 36.19 0.00 12.87 3.04 0.14 47.76 01:12:12 AM all 82.63 0.03 10.60 0.00 0.03 6.70 01:12:14 AM all 77.72 0.00 10.66 0.00 0.03 11.59 Average:all 52.22 0.01 12.04 1.08 0.08 34.58 On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das ak...@sigmoidanalytics.com wrote: What is your data volume? Are you having checkpointing/WAL enabled? In that case make sure you are having SSD disks as this behavior is mainly due to the IO wait. Thanks Best Regards On Thu, Jul 16, 2015 at 8:43 AM, N B nb.nos...@gmail.com wrote: Hello, We have a Spark streaming application and the problem that we are encountering is that the batch processing time keeps on increasing and eventually causes the application to start lagging. I am hoping that someone here can point me to any underlying cause of why this might happen. The batch interval is 1 minute as of now and the app does some maps, filters, joins and reduceByKeyAndWindow operations. All the reduces are invertible functions and so we do provide the inverse-reduce functions in all those. The largest window size we have is 1 hour right now. When the app is started, we see that the batch processing time is between 20 and 30 seconds. It keeps creeping up slowly and by the time it hits the 1 hour mark, it somewhere around 35-40 seconds. Somewhat expected and still not bad! I would expect that since the largest window we have is 1 hour long, the application should stabilize around the 1 hour mark and start processing subsequent batches within that 35-40 second zone. However, that is not what is happening. The processing time still keeps increasing and eventually in a few hours it exceeds 1 minute mark and then starts lagging. Eventually the lag builds up and becomes in minutes at which point we have to restart the system. Any pointers on why this could be happening and what we can do to troubleshoot further? Thanks Nikunj
Re: Indexed Store for lookup table
You'll probably have to install it separately. On Thu, Jul 16, 2015 at 2:29 PM Jem Tucker jem.tuc...@gmail.com wrote: Hi Vetle, IndexedRDD is persisted in the same way RDDs are as far as I am aware. Are you aware if Cassandra can be built into my application or has to be a stand alone database which is installed separately? Thanks, Jem On Thu, Jul 16, 2015 at 12:59 PM Vetle Leinonen-Roeim ve...@roeim.net wrote: Hi, Not sure how IndexedRDD is persisted, but perhaps you're better off using a NOSQL database for lookups (perhaps using Cassandra, with the Cassandra connector)? That should give you good performance on lookups, but persisting those billion records sounds like something that will take some time in any case. Regards, Vetle On Thu, Jul 16, 2015 at 10:02 AM Jem Tucker jem.tuc...@gmail.com wrote: Hello, I have been using IndexedRDD as a large lookup (1 billion records) to join with small tables (1 million rows). The performance of indexedrdd is great until it has to be persisted on disk. Are there any alternatives to IndexedRDD or any changes to how I use it to improve performance with big data volumes? Kindest Regards, Jem
Re: DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items
Ηι Lian, Thank you for the tip. Indeed, there were a lot of distinct values in my result set (approximately 3000). As you suggested i decided to partition the data firstly on a column with much smaller cardinality. Thanks n On Thu, Jul 16, 2015 at 2:09 PM, Cheng Lian lian.cs@gmail.com wrote: Hi Nikos, How many columns and distinct values of some_column are there in the DataFrame? Parquet writer is known to be very memory consuming for wide tables. And lots of distinct partition column values result in many concurrent Parquet writers. One possible workaround is to first repartition the data by partition columns first. Cheng On 7/15/15 7:05 PM, Nikos Viorres wrote: Hi, I am trying to test partitioning for DataFrames with parquet usage so i attempted to do df.write().partitionBy(some_column).parquet(path) on a small dataset of 20.000 records which when saved as parquet locally with gzip take 4mb of disk space. However, on my dev machine with -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always fails with an OutOfMemoryError. Does anyone have any ideas? stack trace: [Stage 2: (0 + 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in stage 2.0 (TID 8) java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread Thread[Executor task launch worker-2,5,main] java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at
Use rank with distribute by in HiveContext
Does spark HiveContext support the rank() ... distribute by syntax (as in the following article- http://www.edwardcapriolo.com/roller/edwardcapriolo/entry/doing_rank_with_hive )? If not, how can it be achieved? Thanks, Lior
Re: Indexed Store for lookup table
By the way - if you're going this route, see https://github.com/datastax/spark-cassandra-connector On Thu, Jul 16, 2015 at 2:40 PM Vetle Leinonen-Roeim ve...@roeim.net wrote: You'll probably have to install it separately. On Thu, Jul 16, 2015 at 2:29 PM Jem Tucker jem.tuc...@gmail.com wrote: Hi Vetle, IndexedRDD is persisted in the same way RDDs are as far as I am aware. Are you aware if Cassandra can be built into my application or has to be a stand alone database which is installed separately? Thanks, Jem On Thu, Jul 16, 2015 at 12:59 PM Vetle Leinonen-Roeim ve...@roeim.net wrote: Hi, Not sure how IndexedRDD is persisted, but perhaps you're better off using a NOSQL database for lookups (perhaps using Cassandra, with the Cassandra connector)? That should give you good performance on lookups, but persisting those billion records sounds like something that will take some time in any case. Regards, Vetle On Thu, Jul 16, 2015 at 10:02 AM Jem Tucker jem.tuc...@gmail.com wrote: Hello, I have been using IndexedRDD as a large lookup (1 billion records) to join with small tables (1 million rows). The performance of indexedrdd is great until it has to be persisted on disk. Are there any alternatives to IndexedRDD or any changes to how I use it to improve performance with big data volumes? Kindest Regards, Jem
RE: Use rank with distribute by in HiveContext
Yes. The HIVE UDF and distribute by both supported by Spark SQL. If you are using Spark 1.4, you can try Hive analytics windows function (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics),most of which are already supported in Spark 1.4, so you don't need the customize UDF of rank. Yong Date: Thu, 16 Jul 2015 15:10:58 +0300 Subject: Use rank with distribute by in HiveContext From: lio...@taboola.com To: user@spark.apache.org Does spark HiveContext support the rank() ... distribute by syntax (as in the following article- http://www.edwardcapriolo.com/roller/edwardcapriolo/entry/doing_rank_with_hive )? If not, how can it be achieved? Thanks,Lior
DataFrame from RDD[Row]
Hi, This is an ugly solution because it requires pulling out a row: val rdd: RDD[Row] = ... ctx.createDataFrame(rdd, rdd.first().schema) Is there a better alternative to get a DataFrame from an RDD[Row] since toDF won't work as Row is not a Product ? Thanks, Marius
Select all columns except some
Hi, In a hundred columns dataframe, I wish to either select all of them except or drop the ones I dont want. I am failing in doing such simple task, tried two ways val clean_cols = df.columns.filterNot(col_name = col_name.startWith(STATE_).mkString(, ) df.select(clean_cols) But this throws exception: org.apache.spark.sql.AnalysisException: cannot resolve 'asd_dt, industry_area,...' at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:63) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:52) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123) The other thing I tried is df.columns.filter(col_name = col_name.startWith(STATE_) for (col - cols) df.drop(col) But this other thing doesn't do anything or hangs up. Saif
PairRDDFunctions and DataFrames
Hi, could someone point me to the recommended way of using countApproxDistinctByKey with DataFrames? I know I can map to pair RDD but I'm wondering if there is a simpler method? If someone knows if this operations is expressible in SQL that information would be most appreciated as well.
Re: Indexed Store for lookup table
Thanks! On Thu, Jul 16, 2015 at 1:59 PM Vetle Leinonen-Roeim ve...@roeim.net wrote: By the way - if you're going this route, see https://github.com/datastax/spark-cassandra-connector On Thu, Jul 16, 2015 at 2:40 PM Vetle Leinonen-Roeim ve...@roeim.net wrote: You'll probably have to install it separately. On Thu, Jul 16, 2015 at 2:29 PM Jem Tucker jem.tuc...@gmail.com wrote: Hi Vetle, IndexedRDD is persisted in the same way RDDs are as far as I am aware. Are you aware if Cassandra can be built into my application or has to be a stand alone database which is installed separately? Thanks, Jem On Thu, Jul 16, 2015 at 12:59 PM Vetle Leinonen-Roeim ve...@roeim.net wrote: Hi, Not sure how IndexedRDD is persisted, but perhaps you're better off using a NOSQL database for lookups (perhaps using Cassandra, with the Cassandra connector)? That should give you good performance on lookups, but persisting those billion records sounds like something that will take some time in any case. Regards, Vetle On Thu, Jul 16, 2015 at 10:02 AM Jem Tucker jem.tuc...@gmail.com wrote: Hello, I have been using IndexedRDD as a large lookup (1 billion records) to join with small tables (1 million rows). The performance of indexedrdd is great until it has to be persisted on disk. Are there any alternatives to IndexedRDD or any changes to how I use it to improve performance with big data volumes? Kindest Regards, Jem
Re: Research ideas using spark
Ok… After having some off-line exchanges with Shashidhar Rao came up with an idea… Apply machine learning to either implement or improve autoscaling up or down within a Storm/Akka cluster. While I don’t know what constitutes an acceptable PhD thesis, or senior project for undergrads… this is a real life problem that actually has some real value. First, storm doesn’t scale down. Unless there’s been some improvements in the last year, you really can’t easily scale down the number of workers and transfer state to another worker. Looking at Akka, that would be an easier task because of the actor model. However, I don’t know Akka that well, so I can’t say if this is already implemented. So besides the mechanism to scale (up and down), you then have the issue of machine learning in terms of load and how to properly scale. This could be as simple as a PID function that watches the queues between spout/bolts and bolt/bolt, or something more advanced. This is where the research part of the project comes in. (What do you monitor, and how do you calculate and determine when to scale up or down, weighing in the cost(s) of the action of scaling.) Again its a worthwhile project, something that actually has business value, especially in terms of Lambda and other groovy greek lettered names for cluster designs (Zeta? ;-) ) Where you have both M/R (computational) and subjective real time (including micro batch) occurring either on the same cluster or within the same DC infrastructure. Again I don’t know if this is worthy of a PhD thesis, Masters Thesis, or Senior Project, but it is something that one could sink one’s teeth into and potentially lead to a commercial grade project if done properly. Good luck with it. HTH -Mike On Jul 15, 2015, at 12:40 PM, vaquar khan vaquar.k...@gmail.com wrote: I would suggest study spark ,flink,strom and based on your understanding and finding prepare your research paper. May be you will invented new spark ☺ Regards, Vaquar khan On 16 Jul 2015 00:47, Michael Segel msegel_had...@hotmail.com mailto:msegel_had...@hotmail.com wrote: Silly question… When thinking about a PhD thesis… do you want to tie it to a specific technology or do you want to investigate an idea but then use a specific technology. Or is this an outdated way of thinking? I am doing my PHD thesis on large scale machine learning e.g Online learning, batch and mini batch learning.” So before we look at technologies like Spark… could the OP break down a more specific concept or idea that he wants to pursue? Looking at what Jorn said… Using machine learning to better predict workloads in terms of managing clusters… This could be interesting… but is it enough for a PhD thesis, or of interest to the OP? On Jul 15, 2015, at 9:43 AM, Jörn Franke jornfra...@gmail.com mailto:jornfra...@gmail.com wrote: Well one of the strength of spark is standardized general distributed processing allowing many different types of processing, such as graph processing, stream processing etc. The limitation is that it is less performant than one system focusing only on one type of processing (eg graph processing). I miss - and this may not be spark specific - some artificial intelligence to manage a cluster, e.g. Predicting workloads, how long a job may run based on previously executed similar jobs etc. Furthermore, many optimizations you have do to manually, e.g. Bloom filters, partitioning etc - if you find here as well some intelligence that does this automatically based on previously executed jobs taking into account that optimizations themselves change over time would be great... You may also explore feature interaction Le mar. 14 juil. 2015 à 7:19, Shashidhar Rao raoshashidhar...@gmail.com mailto:raoshashidhar...@gmail.com a écrit : Hi, I am doing my PHD thesis on large scale machine learning e.g Online learning, batch and mini batch learning. Could somebody help me with ideas especially in the context of Spark and to the above learning methods. Some ideas like improvement to existing algorithms, implementing new features especially the above learning methods and algorithms that have not been implemented etc. If somebody could help me with some ideas it would really accelerate my work. Plus few ideas on research papers regarding Spark or Mahout. Thanks in advance. Regards
Please add our meetup home page in Japan.
Hi folks. We have lots of Spark enthusiasts and some organizations held talk events in Tokyo, Japan. Now we're going to unifiy those events and have created our home page in meetup.com. http://www.meetup.com/Tokyo-Spark-Meetup/ Could you add this to the list? Thanks. - Kousuke Saruta - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
pyspark 1.4 udf change date values
Hi all, I am having some troubles when using a custom udf in dataframes with pyspark 1.4. I have rewritten the udf to simplify the problem and it gets even weirder. The udfs I am using do absolutely nothing, they just receive some value and output the same value with the same format. I show you my code below: c= a.join(b, a['ID'] == b['ID_new'], 'inner') c.filter(c['ID'] == 'XX').show() udf_A = UserDefinedFunction(lambda x: x, DateType()) udf_B = UserDefinedFunction(lambda x: x, DateType()) udf_C = UserDefinedFunction(lambda x: x, DateType()) d = c.select(c['ID'], c['t1'].alias('ta'), udf_A(vinc_muestra['t2']).alias('tb'), udf_B(vinc_muestra['t1']).alias('tc'), udf_C(vinc_muestra['t2']).alias('td')) d.filter(d['ID'] == 'XX').show() I am showing here the results from the outputs: +++--+--+ | ID | ID_new | t1 | t2 | +++--+--+ |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-20| 2013-02-20| |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-20| 2013-02-20| |62698917| 62698917| 2012-02-20| 2013-02-20| |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-20| 2013-02-20| +++--+--+ ++---+---+++ | ID| ta |tb | tc| td | ++---+---+++ |62698917| 2012-02-28| 20070305|20030305|20140228| |62698917| 2012-02-20| 20070215|20020215|20130220| |62698917| 2012-02-28| 20070310|20050310|20140228| |62698917| 2012-02-20| 20070305|20030305|20130220| |62698917| 2012-02-20| 20130802|20130102|20130220| |62698917| 2012-02-28| 20070215|20020215|20140228| |62698917| 2012-02-28| 20070215|20020215|20140228| |62698917| 2012-02-20| 20140102|20130102|20130220| ++---+---+++ My problem here is that values at columns 'tb', 'tc' and 'td' in dataframe 'd' are completely different from values 't1' and 't2' in dataframe c even when my udfs are doing nothing. It seems like if values were somehow got from other registers (or just invented). Results are different between executions (apparently random). Any insight on this? Thanks in advance
Re: spark streaming job to hbase write
You ask an interesting question… Lets set aside spark, and look at the overall ingestion pattern. Its really an ingestion pattern where your input in to the system is from a queue. Are the events discrete or continuous? (This is kinda important.) If the events are continuous then more than likely you’re going to be ingesting data where the key is somewhat sequential. If you use put(), you end up with hot spotting. And you’ll end up with regions half full. So you would be better off batching up the data and doing bulk imports. If the events are discrete, then you’ll want to use put() because the odds are you will not be using a sequential key. (You could, but I’d suggest that you rethink your primary key) Depending on the rate of ingestion, you may want to do a manual flush. (It depends on the velocity of data to be ingested and your use case ) (Remember what caching occurs and where when dealing with HBase.) A third option… Depending on how you use the data, you may want to avoid storing the data in HBase, and only use HBase as an index to where you store the data files for quick access. Again it depends on your data ingestion flow and how you intend to use the data. So really this is less a spark issue than an HBase issue when it comes to design. HTH -Mike On Jul 15, 2015, at 11:46 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I have a requirement of writing in hbase table from Spark streaming app after some processing. Is Hbase put operation the only way of writing to hbase or is there any specialised connector or rdd of spark for hbase write. Should Bulk load to hbase from streaming app be avoided if output of each batch interval is just few mbs? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HiveThriftServer2.startWithContext error with registerTempTable
Cheng, Yes, select * from temp_table was working. I was able to perform some transformation+action on the dataframe and print it on console. HiveThriftServer2.startWithContext was being run on the same session. When you say try --jars option, are you asking me to pass spark-csv jar? I'm already doing this with --packages com.databricks:spark-csv_2.10:1.0.3 Not sure if I'm missing your point here. Anyways, I gave it a shot. I downloaded spark-csv_2.10-0.1.jar and started spark-shell with --jars. I still get the same exception. I'm pasting the exception below. scala 15/07/16 11:29:22 ERROR SparkExecuteStatementOperation: Error executing query: java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 15/07/16 11:29:22 WARN ThriftCLIService: Error executing statement: org.apache.hive.service.cli.HiveSQLException: java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1 at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Srikanth On Thu, Jul 16, 2015 at 12:44 AM, Cheng, Hao hao.ch...@intel.com wrote: Have you ever try query the “select * from temp_table” from the spark shell? Or can you try the option --jars while starting the spark shell? *From:* Srikanth [mailto:srikanth...@gmail.com] *Sent:* Thursday, July 16, 2015 9:36 AM *To:* user *Subject:* Re: HiveThriftServer2.startWithContext error with registerTempTable Hello, Re-sending this to see if I'm second time lucky! I've not managed to move past this error. Srikanth On Mon, Jul 13, 2015 at 9:14 PM, Srikanth srikanth...@gmail.com wrote: Hello, I want to expose result of Spark computation to external tools. I plan to do this with Thrift server JDBC interface by registering result Dataframe as temp table. I wrote a sample program in spark-shell to test this. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext.implicits._ HiveThriftServer2.startWithContext(hiveContext) val myDF = hiveContext.read.format(com.databricks.spark.csv).option(header, true).load(/datafolder/weblog/pages.csv) myDF.registerTempTable(temp_table) I'm able to see the temp table in Beeline +-+--+ | tableName | isTemporary | +-+--+ | temp_table | true | | my_table| false| +-+--+ Now when I issue select * from temp_table from Beeline, I see below exception in spark-shell 15/07/13 17:18:27 WARN ThriftCLIService: Error executing statement: org.apache.hive.service.cli.HiveSQLException: *java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1* at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I'm able to read the other table(my_table) from Beeline though. Any suggestions on how to overcome this? This is with Spark 1.4 pre-built version. Spark-shell was started with --package to pass spark-csv. Srikanth
Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown if the limit is negative or greater than the buffer's capacity at kafka.message.Message.sliceDelimited(Message.scala:236) If size had been negative, it would have just returned null, so we know the exception got thrown because the size was greater than the buffer's capacity I haven't seen that before... maybe a corrupted message of some kind? If that problem is reproducible, try providing an explicit argument for messageHandler, with a function that logs the message offset. On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this: 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.message.Message.sliceDelimited(Message.scala:236) at kafka.message.Message.payload(Message.scala:218) at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This is happening only when I'm doing a full data processing from Kafka. If there's no load, when you killed the driver and then restart, it resumes the checkpoint as expected without missing data. Did someone encounters something similar ? How did you solve this ? Regards, Nicolas PHUNG
Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Not exactly the same issue, but possibly related: https://issues.apache.org/jira/browse/KAFKA-1196 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org wrote: Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown if the limit is negative or greater than the buffer's capacity at kafka.message.Message.sliceDelimited(Message.scala:236) If size had been negative, it would have just returned null, so we know the exception got thrown because the size was greater than the buffer's capacity I haven't seen that before... maybe a corrupted message of some kind? If that problem is reproducible, try providing an explicit argument for messageHandler, with a function that logs the message offset. On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this: 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.message.Message.sliceDelimited(Message.scala:236) at kafka.message.Message.payload(Message.scala:218) at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This is happening only when I'm doing a full data processing from Kafka. If there's no load, when you killed the driver and then restart, it resumes the checkpoint as expected without missing data. Did someone encounters something similar ? How did you solve this ? Regards, Nicolas PHUNG
Re: pyspark 1.4 udf change date values
Thanks for reporting this, could you file a JIRA for it? On Thu, Jul 16, 2015 at 8:22 AM, Luis Guerra luispelay...@gmail.com wrote: Hi all, I am having some troubles when using a custom udf in dataframes with pyspark 1.4. I have rewritten the udf to simplify the problem and it gets even weirder. The udfs I am using do absolutely nothing, they just receive some value and output the same value with the same format. I show you my code below: c= a.join(b, a['ID'] == b['ID_new'], 'inner') c.filter(c['ID'] == 'XX').show() udf_A = UserDefinedFunction(lambda x: x, DateType()) udf_B = UserDefinedFunction(lambda x: x, DateType()) udf_C = UserDefinedFunction(lambda x: x, DateType()) d = c.select(c['ID'], c['t1'].alias('ta'), udf_A(vinc_muestra['t2']).alias('tb'), udf_B(vinc_muestra['t1']).alias('tc'), udf_C(vinc_muestra['t2']).alias('td')) d.filter(d['ID'] == 'XX').show() I am showing here the results from the outputs: +++--+--+ | ID | ID_new | t1 | t2 | +++--+--+ |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-20| 2013-02-20| |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-20| 2013-02-20| |62698917| 62698917| 2012-02-20| 2013-02-20| |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-20| 2013-02-20| +++--+--+ ++---+---+++ | ID| ta |tb | tc| td | ++---+---+++ |62698917| 2012-02-28| 20070305|20030305|20140228| |62698917| 2012-02-20| 20070215|20020215|20130220| |62698917| 2012-02-28| 20070310|20050310|20140228| |62698917| 2012-02-20| 20070305|20030305|20130220| |62698917| 2012-02-20| 20130802|20130102|20130220| |62698917| 2012-02-28| 20070215|20020215|20140228| |62698917| 2012-02-28| 20070215|20020215|20140228| |62698917| 2012-02-20| 20140102|20130102|20130220| ++---+---+++ My problem here is that values at columns 'tb', 'tc' and 'td' in dataframe 'd' are completely different from values 't1' and 't2' in dataframe c even when my udfs are doing nothing. It seems like if values were somehow got from other registers (or just invented). Results are different between executions (apparently random). Any insight on this? Thanks in advance - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PairRDDFunctions and DataFrames
Instead of using that RDD operation just use the native DataFrame function approxCountDistinct https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$ On Thu, Jul 16, 2015 at 6:58 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi, could someone point me to the recommended way of using countApproxDistinctByKey with DataFrames? I know I can map to pair RDD but I'm wondering if there is a simpler method? If someone knows if this operations is expressible in SQL that information would be most appreciated as well.
Re: Indexed Store for lookup table
Hi, Not sure how IndexedRDD is persisted, but perhaps you're better off using a NOSQL database for lookups (perhaps using Cassandra, with the Cassandra connector)? That should give you good performance on lookups, but persisting those billion records sounds like something that will take some time in any case. Regards, Vetle On Thu, Jul 16, 2015 at 10:02 AM Jem Tucker jem.tuc...@gmail.com wrote: Hello, I have been using IndexedRDD as a large lookup (1 billion records) to join with small tables (1 million rows). The performance of indexedrdd is great until it has to be persisted on disk. Are there any alternatives to IndexedRDD or any changes to how I use it to improve performance with big data volumes? Kindest Regards, Jem
Re: Indexed Store for lookup table
Hi Vetle, IndexedRDD is persisted in the same way RDDs are as far as I am aware. Are you aware if Cassandra can be built into my application or has to be a stand alone database which is installed separately? Thanks, Jem On Thu, Jul 16, 2015 at 12:59 PM Vetle Leinonen-Roeim ve...@roeim.net wrote: Hi, Not sure how IndexedRDD is persisted, but perhaps you're better off using a NOSQL database for lookups (perhaps using Cassandra, with the Cassandra connector)? That should give you good performance on lookups, but persisting those billion records sounds like something that will take some time in any case. Regards, Vetle On Thu, Jul 16, 2015 at 10:02 AM Jem Tucker jem.tuc...@gmail.com wrote: Hello, I have been using IndexedRDD as a large lookup (1 billion records) to join with small tables (1 million rows). The performance of indexedrdd is great until it has to be persisted on disk. Are there any alternatives to IndexedRDD or any changes to how I use it to improve performance with big data volumes? Kindest Regards, Jem
Re: Use rank with distribute by in HiveContext
Did you take a look at the excellent write up by Yin Huai and Michael Armbrust? It appears that rank is supported in the 1.4.x release. https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html Snippet from above article for your convenience: To answer the first question “*What are the best-selling and the second best-selling products in every category?*”, we need to rank products in a category based on their revenue, and to pick the best selling and the second best-selling products based the ranking. Below is the SQL query used to answer this question by using window function dense_rank (we will explain the syntax of using window functions in next section). SELECT product, category, revenueFROM ( SELECT product, category, revenue, dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank FROM productRevenue) tmpWHERE rank = 2 The result of this query is shown below. Without using window functions, it is very hard to express the query in SQL, and even if a SQL query can be expressed, it is hard for the underlying engine to efficiently evaluate the query. [image: 1-2] SQLDataFrame APIRanking functionsrankrankdense_rankdenseRankpercent_rank percentRankntilentilerow_numberrowNumber HTH. -Todd On Thu, Jul 16, 2015 at 8:10 AM, Lior Chaga lio...@taboola.com wrote: Does spark HiveContext support the rank() ... distribute by syntax (as in the following article- http://www.edwardcapriolo.com/roller/edwardcapriolo/entry/doing_rank_with_hive )? If not, how can it be achieved? Thanks, Lior
Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Hello, When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this: 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.message.Message.sliceDelimited(Message.scala:236) at kafka.message.Message.payload(Message.scala:218) at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This is happening only when I'm doing a full data processing from Kafka. If there's no load, when you killed the driver and then restart, it resumes the checkpoint as expected without missing data. Did someone encounters something similar ? How did you solve this ? Regards, Nicolas PHUNG
Re: Will multiple filters on the same RDD optimized to one filter?
Depending on what you do with them, they will get computed separately. Bcoz u may have long dag in each branch. So spark tries to run all the transformation function together rather than trying to optimize things across branches. On Jul 16, 2015 1:40 PM, Bin Wang wbi...@gmail.com wrote: What if I would use both rdd1 and rdd2 later? Raghavendra Pandey raghavendra.pan...@gmail.com于2015年7月16日周四 下午4:08写道: If you cache rdd it will save some operations. But anyway filter is a lazy operation. And it runs based on what you will do later on with rdd1 and rdd2... Raghavendra On Jul 16, 2015 1:33 PM, Bin Wang wbi...@gmail.com wrote: If I write code like this: val rdd = input.map(_.value) val f1 = rdd.filter(_ == 1) val f2 = rdd.filter(_ == 2) ... Then the DAG of the execution may be this: - Filter - ... Map - Filter - ... But the two filters is operated on the same RDD, which means it could be done by just scan the RDD once. Does spark have this kind optimization for now?
Re: Select all columns except some
Have you tried to examine what clean_cols contains -- I'm suspect of this part mkString(“, “). Try this: val clean_cols : Seq[String] = df.columns... if you get a type error you need to work on clean_cols (I suspect yours is of type String at the moment and presents itself to Spark as a single column names with commas embedded). Not sure why the .drop call hangs but in either case drop returns a new dataframe -- it's not a setter call On Thu, Jul 16, 2015 at 10:57 AM, saif.a.ell...@wellsfargo.com wrote: Hi, In a hundred columns dataframe, I wish to either *select all of them except* or *drop the ones I dont want*. I am failing in doing such simple task, tried two ways val clean_cols = df.columns.filterNot(col_name = col_name.startWith(“STATE_”).mkString(“, “) df.select(clean_cols) But this throws exception: org.apache.spark.sql.AnalysisException: cannot resolve 'asd_dt, industry_area,...’ at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:63) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:52) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123) The other thing I tried is df.columns.filter(col_name = col_name.startWith(“STATE_”) for (col - cols) df.drop(col) But this other thing doesn’t do anything or hangs up. Saif
BroadCast on Interval ( eg every 10 min )
Hi All , How can i broadcast a data change to all the executor ever other 10 min or 1 min Ashish
Re: Getting not implemented by the TFS FileSystem implementation
See also https://issues.apache.org/jira/browse/SPARK-8385 (apologies if someone already mentioned that -- just saw this thread) On Thu, Jul 16, 2015 at 7:19 PM, Jerrick Hoang jerrickho...@gmail.com wrote: So, this has to do with the fact that 1.4 has a new way to interact with HiveMetastore, still investigating. Would really appreciate if anybody has any insights :) On Tue, Jul 14, 2015 at 4:28 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I'm upgrading from spark1.3 to spark1.4 and when trying to run spark-sql CLI. It gave an ```ava.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation``` exception. I did not get this error with 1.3 and I don't use any TFS FileSystem. Full stack trace is ```Exception in thread main java.lang.RuntimeException: java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:105) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:170) at org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:166) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:212) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:175) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:358) at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:205) at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:204) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.sql.SQLContext.init(SQLContext.scala:204) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:71) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:53) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.init(SparkSQLCLIDriver.scala:248) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:136) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214) at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:342) ... 31 more``` Thanks all - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: S3 Read / Write makes executors deadlocked
I have tested on another pc which has 8 CPU cores. But it hangs when defaultParallelismLevel 4, e.g. sparkConf.setMaster(local[*]) local[1] ~ local[3] work well. 4 is the mysterious boundary. It seems that I am not the only one encountered this problem: https://issues.apache.org/jira/browse/SPARK-8898 Here is Sean's answer for the jira above: this is a jets3t problem. You will have to manage to update it in your build or get EC2 + Hadoop 2 to work, which I think can be done. At least, this is just a subset of EC2 should support Hadoop 2 and/or that the EC2 support should move out of Spark anyway. I don't know there's another action to take in Spark. But I just use sbt the get the published spark 1.4, and it does not work on my local PC, not EC2. Seriously, I do think something should be done for Spark, because s3 read/write is quite a common use case. Any help on this issue is highly appreciated. If you need more info, checkout the jira I created: https://issues.apache.org/jira/browse/SPARK-8869 On Thu, Jul 16, 2015 at 11:39 AM, Hao Ren inv...@gmail.com wrote: Given the following code which just reads from s3, then saves files to s3 val inputFileName: String = s3n://input/file/path val outputFileName: String = s3n://output/file/path val conf = new SparkConf().setAppName(this.getClass.getName).setMaster(local[4]) val sparkContext = new SparkContext(conf) // Problems here: executors thread locked sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName) // But this one works sparkContext.textFile(inputFileName).count() It blocks without showing any exceptions or errors. jstack shows that all executors are locked. The thread dump is in end of this post. I am using spark-1.4.0 on my PC which has 4 CPU cores. There are 21 parquet files in the input directory, 500KB / file. In addition, if we change the last action to a non IO bounded one, for example, count(). It works. It seems that S3 read and write in the same stage makes executors deadlocked. I encountered the same problem when using DataFrame load/save operations, jira created: https://issues.apache.org/jira/browse/SPARK-8869 Executor task launch worker-3 #69 daemon prio=5 os_prio=0 tid=0x7f7bd4036800 nid=0x1296 in Object.wait() [0x7f7c1099a000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518) - *locked* 0xe56745b8 (a org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at
Please add two groups to Community page
Moscow : http://www.meetup.com/Apache-Spark-in-Moscow/ Slovenija (Ljubljana) http://www.meetup.com/Apache-Spark-Ljubljana-Meetup/
Re: Select all columns except some
The snippet at the end worked for me. We run Spark 1.3.x, so DataFrame.drop is not available to us. As pointed out by Yana, DataFrame operations typically return a new DataFrame, so use as such: import com.foo.sparkstuff.DataFrameOps._ ... val df = ... val prunedDf = df.dropColumns(one_col, other_col) package com.foo.sparkstuff import org.apache.spark.sql.{Column, DataFrame} import scala.language.implicitConversions class PimpedDataFrame(frame: DataFrame) { /** * Drop named columns from dataframe. Replace with DataFrame.drop when upgrading to Spark 1.4.0. */ def dropColumns(toDrop: String*): DataFrame = { val invalid = toDrop filterNot(frame.columns.contains(_)) if (invalid.nonEmpty) { throw new IllegalArgumentException(Columns not found: + invalid.mkString(,)) } val newColumns = frame.columns filter {c = !toDrop.contains(c)} map {new Column(_)} frame.select(newColumns:_*) } } object DataFrameOps { implicit def pimpDataFrame(df: DataFrame): PimpedDataFrame = new PimpedDataFrame(df) } On Thu, Jul 16, 2015 at 4:57 PM, saif.a.ell...@wellsfargo.com wrote: Hi, In a hundred columns dataframe, I wish to either select all of them except or drop the ones I dont want. I am failing in doing such simple task, tried two ways val clean_cols = df.columns.filterNot(col_name = col_name.startWith(“STATE_”).mkString(“, “) df.select(clean_cols) But this throws exception: org.apache.spark.sql.AnalysisException: cannot resolve 'asd_dt, industry_area,...’ at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:63) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:52) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123) The other thing I tried is df.columns.filter(col_name = col_name.startWith(“STATE_”) for (col - cols) df.drop(col) But this other thing doesn’t do anything or hangs up. Saif - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class 178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemARPACK.class 21138 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemBLAS.class 178349 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemLAPACK.class Also I see the following in /usr/lib64 ls /usr/lib64/libblas. libblas.a libblas.solibblas.so.3 libblas.so.3.2 libblas.so.3.2.1 ls /usr/lib64/liblapack liblapack.a liblapack_pic.a liblapack.soliblapack.so.3 liblapack.so.3.2liblapack.so.3.2.1 But I stil see the following in the Spark logs: 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Anything in this process I missed? Thanks, Arun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: create HiveContext if available, otherwise SQLContext
We do this in SparkILookp ( https://github.com/apache/spark/blob/master/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala#L1023-L1037). What is the version of Spark you are using? How did you add the spark-csv jar? On Thu, Jul 16, 2015 at 1:21 PM, Koert Kuipers ko...@tresata.com wrote: has anyone tried to make HiveContext only if the class is available? i tried this: implicit lazy val sqlc: SQLContext = try { Class.forName(org.apache.spark.sql.hive.HiveContext, true, Thread.currentThread.getContextClassLoader) .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] } catch { case e: ClassNotFoundException = new SQLContext(sc) } it compiles fine, but i get classloader issues when i actually use it on a cluster. for example: Exception in thread main java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:216) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:229) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
Re: create HiveContext if available, otherwise SQLContext
i am using scala 2.11 spark jars are not in my assembly jar (they are provided), since i launch with spark-submit On Thu, Jul 16, 2015 at 4:34 PM, Koert Kuipers ko...@tresata.com wrote: spark 1.4.0 spark-csv is a normal dependency of my project and in the assembly jar that i use but i also tried adding spark-csv with --package for spark-submit, and got the same error On Thu, Jul 16, 2015 at 4:31 PM, Yin Huai yh...@databricks.com wrote: We do this in SparkILookp ( https://github.com/apache/spark/blob/master/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala#L1023-L1037). What is the version of Spark you are using? How did you add the spark-csv jar? On Thu, Jul 16, 2015 at 1:21 PM, Koert Kuipers ko...@tresata.com wrote: has anyone tried to make HiveContext only if the class is available? i tried this: implicit lazy val sqlc: SQLContext = try { Class.forName(org.apache.spark.sql.hive.HiveContext, true, Thread.currentThread.getContextClassLoader) .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] } catch { case e: ClassNotFoundException = new SQLContext(sc) } it compiles fine, but i get classloader issues when i actually use it on a cluster. for example: Exception in thread main java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:216) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:229) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
Re: create HiveContext if available, otherwise SQLContext
spark 1.4.0 spark-csv is a normal dependency of my project and in the assembly jar that i use but i also tried adding spark-csv with --package for spark-submit, and got the same error On Thu, Jul 16, 2015 at 4:31 PM, Yin Huai yh...@databricks.com wrote: We do this in SparkILookp ( https://github.com/apache/spark/blob/master/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala#L1023-L1037). What is the version of Spark you are using? How did you add the spark-csv jar? On Thu, Jul 16, 2015 at 1:21 PM, Koert Kuipers ko...@tresata.com wrote: has anyone tried to make HiveContext only if the class is available? i tried this: implicit lazy val sqlc: SQLContext = try { Class.forName(org.apache.spark.sql.hive.HiveContext, true, Thread.currentThread.getContextClassLoader) .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] } catch { case e: ClassNotFoundException = new SQLContext(sc) } it compiles fine, but i get classloader issues when i actually use it on a cluster. for example: Exception in thread main java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:216) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:229) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
Re: Getting not implemented by the TFS FileSystem implementation
So, this has to do with the fact that 1.4 has a new way to interact with HiveMetastore, still investigating. Would really appreciate if anybody has any insights :) On Tue, Jul 14, 2015 at 4:28 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I'm upgrading from spark1.3 to spark1.4 and when trying to run spark-sql CLI. It gave an ```ava.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation``` exception. I did not get this error with 1.3 and I don't use any TFS FileSystem. Full stack trace is ```Exception in thread main java.lang.RuntimeException: java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:105) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:170) at org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:166) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:212) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:175) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:358) at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:205) at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:204) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.sql.SQLContext.init(SQLContext.scala:204) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:71) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:53) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.init(SparkSQLCLIDriver.scala:248) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:136) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:214) at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2365) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:342) ... 31 more``` Thanks all
Re: create HiveContext if available, otherwise SQLContext
that solved it, thanks! On Thu, Jul 16, 2015 at 6:22 PM, Koert Kuipers ko...@tresata.com wrote: thanks i will try 1.4.1 On Thu, Jul 16, 2015 at 5:24 PM, Yin Huai yh...@databricks.com wrote: Hi Koert, For the classloader issue, you probably hit https://issues.apache.org/jira/browse/SPARK-8365, which has been fixed in Spark 1.4.1. Can you try 1.4.1 and see if the exception disappear? Thanks, Yin On Thu, Jul 16, 2015 at 2:12 PM, Koert Kuipers ko...@tresata.com wrote: i am using scala 2.11 spark jars are not in my assembly jar (they are provided), since i launch with spark-submit On Thu, Jul 16, 2015 at 4:34 PM, Koert Kuipers ko...@tresata.com wrote: spark 1.4.0 spark-csv is a normal dependency of my project and in the assembly jar that i use but i also tried adding spark-csv with --package for spark-submit, and got the same error On Thu, Jul 16, 2015 at 4:31 PM, Yin Huai yh...@databricks.com wrote: We do this in SparkILookp ( https://github.com/apache/spark/blob/master/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala#L1023-L1037). What is the version of Spark you are using? How did you add the spark-csv jar? On Thu, Jul 16, 2015 at 1:21 PM, Koert Kuipers ko...@tresata.com wrote: has anyone tried to make HiveContext only if the class is available? i tried this: implicit lazy val sqlc: SQLContext = try { Class.forName(org.apache.spark.sql.hive.HiveContext, true, Thread.currentThread.getContextClassLoader) .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] } catch { case e: ClassNotFoundException = new SQLContext(sc) } it compiles fine, but i get classloader issues when i actually use it on a cluster. for example: Exception in thread main java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:216) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:229) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
Re: [Spark Shell] Could the spark shell be reset to the original status?
Hi Ted, Thanks for the information. The post seems little different with my requirement: suppose we defined different functions to do different streaming work (e.g. 50 functions), i want to test these 50 functions in the spark shell, and the shell will always throw OOM at the middle of test (yes, it could be solved by increasing the jvm memory size, but if we have more functions, the issue still will happen). The main issue is that the shell keeps track all the information (class, objects...) from started, so the java memory will increase time to time when define/invoke the functions. Thanks! - Terry Ted Yu yuzhih...@gmail.com于2015年7月17日周五 下午12:02写道: See this recent thread: http://search-hadoop.com/m/q3RTtFW7iMDkrj61/Spark+shell+oom+subj=java+lang+OutOfMemoryError+PermGen+space On Jul 16, 2015, at 8:51 PM, Terry Hole hujie.ea...@gmail.com wrote: Hi, Background: The spark shell will get out of memory error after dealing lots of spark work. Is there any method which can reset the spark shell to the startup status? I tried *:reset*, but it seems not working: i can not create spark context anymore (some compile error as below) after the *:reset*. (I have to restart the shell after OOM to workaround) == Expanded type of tree == TypeRef(TypeSymbol(class $read extends Serializable)) uncaught exception during compilation: java.lang.AssertionError java.lang.AssertionError: assertion failed: Tried to find '$line16' in 'C:\Users\jhu\AppData\Local\Temp\spark-2ad09490-c0c6-41e2-addb-63087ce0ae63' but it is not a directory That entry seems to have slain the compiler. Shall I replayyour session? I can re-run each line except the last one.[y/n] Abandoning crashed session. Thanks! -Terry
Re: create HiveContext if available, otherwise SQLContext
No problem:) Glad to hear that! On Thu, Jul 16, 2015 at 8:22 PM, Koert Kuipers ko...@tresata.com wrote: that solved it, thanks! On Thu, Jul 16, 2015 at 6:22 PM, Koert Kuipers ko...@tresata.com wrote: thanks i will try 1.4.1 On Thu, Jul 16, 2015 at 5:24 PM, Yin Huai yh...@databricks.com wrote: Hi Koert, For the classloader issue, you probably hit https://issues.apache.org/jira/browse/SPARK-8365, which has been fixed in Spark 1.4.1. Can you try 1.4.1 and see if the exception disappear? Thanks, Yin On Thu, Jul 16, 2015 at 2:12 PM, Koert Kuipers ko...@tresata.com wrote: i am using scala 2.11 spark jars are not in my assembly jar (they are provided), since i launch with spark-submit On Thu, Jul 16, 2015 at 4:34 PM, Koert Kuipers ko...@tresata.com wrote: spark 1.4.0 spark-csv is a normal dependency of my project and in the assembly jar that i use but i also tried adding spark-csv with --package for spark-submit, and got the same error On Thu, Jul 16, 2015 at 4:31 PM, Yin Huai yh...@databricks.com wrote: We do this in SparkILookp ( https://github.com/apache/spark/blob/master/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala#L1023-L1037). What is the version of Spark you are using? How did you add the spark-csv jar? On Thu, Jul 16, 2015 at 1:21 PM, Koert Kuipers ko...@tresata.com wrote: has anyone tried to make HiveContext only if the class is available? i tried this: implicit lazy val sqlc: SQLContext = try { Class.forName(org.apache.spark.sql.hive.HiveContext, true, Thread.currentThread.getContextClassLoader) .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] } catch { case e: ClassNotFoundException = new SQLContext(sc) } it compiles fine, but i get classloader issues when i actually use it on a cluster. for example: Exception in thread main java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:216) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:229) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
Retrieving Spark Configuration properties
I am using this version of Spark : *spark-1.4.0-bin-hadoop2.6* . I want to check few default properties. So I gave the following statement in spark-shell *scala sqlContext.getConf(spark.sql.hive.metastore.version) *I was expecting the call to method getConf to return a value of 0.13.1 as desribed in this link http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore . But I got the below exception *java.util.NoSuchElementException: spark.sql.hive.metastore.version at org.apache.spark.sql.SQLConf$$anonfun$getConf$1.apply(SQLConf.scala:283) at org.apache.spark.sql.SQLConf$$anonfun$getConf$1.apply(SQLConf.scala:283) *Am I retrieving the properties in the right way? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Retrieving-Spark-Configuration-properties-tp23881.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Spark Shell] Could the spark shell be reset to the original status?
Hi, Background: The spark shell will get out of memory error after dealing lots of spark work. Is there any method which can reset the spark shell to the startup status? I tried *:reset*, but it seems not working: i can not create spark context anymore (some compile error as below) after the *:reset*. (I have to restart the shell after OOM to workaround) == Expanded type of tree == TypeRef(TypeSymbol(class $read extends Serializable)) uncaught exception during compilation: java.lang.AssertionError java.lang.AssertionError: assertion failed: Tried to find '$line16' in 'C:\Users\jhu\AppData\Local\Temp\spark-2ad09490-c0c6-41e2-addb-63087ce0ae63' but it is not a directory That entry seems to have slain the compiler. Shall I replayyour session? I can re-run each line except the last one.[y/n] Abandoning crashed session. Thanks! -Terry
Re: Retrieving Spark Configuration properties
This is because that you did not set the parameter spark.sql. hive.metastore.version. You can check other parameters that you have set, it will work well. Or you can first set this parameter, and then get it. 2015-07-17 11:53 GMT+08:00 RajG rjk...@gmail.com: I am using this version of Spark : *spark-1.4.0-bin-hadoop2.6* . I want to check few default properties. So I gave the following statement in spark-shell *scala sqlContext.getConf(spark.sql.hive.metastore.version) *I was expecting the call to method getConf to return a value of 0.13.1 as desribed in this link http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore . But I got the below exception *java.util.NoSuchElementException: spark.sql.hive.metastore.version at org.apache.spark.sql.SQLConf$$anonfun$getConf$1.apply(SQLConf.scala:283) at org.apache.spark.sql.SQLConf$$anonfun$getConf$1.apply(SQLConf.scala:283) *Am I retrieving the properties in the right way? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Retrieving-Spark-Configuration-properties-tp23881.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Java 8 vs Scala
IMHO only Scala is an option. Once you're familiar with it you just cant even look at java code. czw., 16.07.2015 o 07:20 użytkownik spark user spark_u...@yahoo.com.invalid napisał: I struggle lots in Scala , almost 10 days n0 improvement , but when i switch to Java 8 , things are so smooth , and I used Data Frame with Redshift and Hive all are looking good . if you are very good In Scala the go with Scala otherwise Java is best fit . This is just my openion because I am Java guy. On Wednesday, July 15, 2015 12:33 PM, vaquar khan vaquar.k...@gmail.com wrote: My choice is java 8 On 15 Jul 2015 18:03, Alan Burlison alan.burli...@oracle.com wrote: On 15/07/2015 08:31, Ignacio Blasco wrote: The main advantage of using scala vs java 8 is being able to use a console https://bugs.openjdk.java.net/browse/JDK-8043364 -- Alan Burlison -- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Setting different amount of cache memory for driver
Hi, I am using the spark thrift server. In my deployment, I need to have more memory for the driver, to be able to get results back from the executors. Currently a lot of the driver memory is spent on caching, but I would prefer the driver would not use memory for that (only the executors) Is that possible? Thanks This message is confidential and is for the sole use of the intended recipient(s). It may also be privileged or otherwise protected by copyright or other legal rules. If you have received it by mistake please let us know by reply email and delete it from your system. It is prohibited to copy this message or disclose its content to anyone. Any confidentiality or privilege is not waived or lost by any mistaken delivery or unauthorized disclosure of the message. All messages sent to and from Agoda may be monitored to ensure compliance with company policies, to protect the company's interests and to remove potential malware. Electronic messages may be intercepted, amended, lost or deleted, or contain viruses.
Re: Spark streaming Processing time keeps increasing
Hi TD, Yes, we do have the invertible function provided. However, I am not sure I understood how to use the filterFunction. Is there an example somewhere showing its usage? The header comment on the function says : * @param filterFunc function to filter expired key-value pairs; * only pairs that satisfy the function are retained * set this to null if you do not want to filter These are the questions I am confused about: 1. The code comment seems to imply that the filterFunc is only used to figure out which keyvalue pairs are used to form the window but how does it actually help expire the old data? 2. Shouldn't the values that are falling off of the window period automatically be removed without the need for an additional filter function? 3. Which side of the key-value pairs are passed to this function? The ones that are coming in or the ones that are going out of window or both? 4. The key-value pairs in use in a particular reduceByKeyAndWindow operation may not have the requisite info (such as a timestamp or similar eg if its aggregated data) to help determine whether to return true or false. What is the semantic expected here? As always, thanks for your help Nikunj On Thu, Jul 16, 2015 at 1:16 AM, Tathagata Das t...@databricks.com wrote: MAke sure you provide the filterFunction with the invertible reduceByKeyAndWindow. Otherwise none of the keys will get removed, and the key space will continue increase. This is what is leading to the lag. So use the filtering function to filter out the keys that are not needed any more. On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das ak...@sigmoidanalytics.com wrote: What is your data volume? Are you having checkpointing/WAL enabled? In that case make sure you are having SSD disks as this behavior is mainly due to the IO wait. Thanks Best Regards On Thu, Jul 16, 2015 at 8:43 AM, N B nb.nos...@gmail.com wrote: Hello, We have a Spark streaming application and the problem that we are encountering is that the batch processing time keeps on increasing and eventually causes the application to start lagging. I am hoping that someone here can point me to any underlying cause of why this might happen. The batch interval is 1 minute as of now and the app does some maps, filters, joins and reduceByKeyAndWindow operations. All the reduces are invertible functions and so we do provide the inverse-reduce functions in all those. The largest window size we have is 1 hour right now. When the app is started, we see that the batch processing time is between 20 and 30 seconds. It keeps creeping up slowly and by the time it hits the 1 hour mark, it somewhere around 35-40 seconds. Somewhat expected and still not bad! I would expect that since the largest window we have is 1 hour long, the application should stabilize around the 1 hour mark and start processing subsequent batches within that 35-40 second zone. However, that is not what is happening. The processing time still keeps increasing and eventually in a few hours it exceeds 1 minute mark and then starts lagging. Eventually the lag builds up and becomes in minutes at which point we have to restart the system. Any pointers on why this could be happening and what we can do to troubleshoot further? Thanks Nikunj
spark-streaming failed to bind ip address
Hi all, I submit a spark-streaming job under yarn-cluster model but error happens 15/07/16 14:36:36 ERROR receiver.ReceiverSupervisorImpl: Stopped executor with error: org.jboss.netty.channel.ChannelException: Failed to bind to: sparktest/192.168.1.17:3 15/07/16 14:36:36 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 74) org.jboss.netty.channel.ChannelException: Failed to bind to: sparktest/192.168.1.17:3 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:288) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:280) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: java.net.BindException: Cannot assign requested address at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:344) at sun.nio.ch.Net.bind(Net.java:336) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:199) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
Re: [Spark Shell] Could the spark shell be reset to the original status?
See this recent thread: http://search-hadoop.com/m/q3RTtFW7iMDkrj61/Spark+shell+oom+subj=java+lang+OutOfMemoryError+PermGen+space On Jul 16, 2015, at 8:51 PM, Terry Hole hujie.ea...@gmail.com wrote: Hi, Background: The spark shell will get out of memory error after dealing lots of spark work. Is there any method which can reset the spark shell to the startup status? I tried :reset, but it seems not working: i can not create spark context anymore (some compile error as below) after the :reset. (I have to restart the shell after OOM to workaround) == Expanded type of tree == TypeRef(TypeSymbol(class $read extends Serializable)) uncaught exception during compilation: java.lang.AssertionError java.lang.AssertionError: assertion failed: Tried to find '$line16' in 'C:\Users\jhu\AppData\Local\Temp\spark-2ad09490-c0c6-41e2-addb-63087ce0ae63' but it is not a directory That entry seems to have slain the compiler. Shall I replayyour session? I can re-run each line except the last one.[y/n] Abandoning crashed session. Thanks! -Terry
what is : ParquetFileReader: reading summary file ?
Hi all, our scenario is to generate lots of folders containinig parquet file and then uses add partition to add these folder locations to a hive table; when trying to read the hive table using Spark, following logs would show up and took a lot of time on reading them; but this won't happen after second of third time of querying this table through sql in HiveContext; does that mean that parquet file has did some chaching by itself? Thanks! Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5 Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150701/LDSN/_common_metadata Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150702/MECC/_common_metadata Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150702/MCOX/_common_metadata Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150629/LCTE/_common_metadata Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150630/MDNS/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150701/VSHM/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150624/LSCB/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150628/MPD8/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150703/VSHM/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150630/LIHI/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150701/LESE/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150626/MPD8/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150624/MDHK/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150628/VEMH/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150626/MDHK/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150628/LSCB/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150627/LESR/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150703/LESE/_common_metadata --- TSMC PROPERTY This email communication (and any attachments) is proprietary information for the sole use of its intended recipient. Any unauthorized review, use or distribution by anyone other than the intended recipient is strictly prohibited. If you are not the intended recipient, please notify the sender by replying to this email, and then delete this email and any copies of it immediately. Thank you. --- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Console log file of CoarseGrainedExecutorBackend
By default it is in ${SPARK_HOME}/work/${APP_ID}/${EXECUTOR_ID} On Thu, Jul 16, 2015 at 3:43 PM, Tao Lu taolu2...@gmail.com wrote: Hi, Guys, Where can I find the console log file of CoarseGrainedExecutorBackend process? Thanks! Tao -- Best Regards Jeff Zhang
Console log file of CoarseGrainedExecutorBackend
Hi, Guys, Where can I find the console log file of CoarseGrainedExecutorBackend process? Thanks! Tao
Re: create HiveContext if available, otherwise SQLContext
thanks i will try 1.4.1 On Thu, Jul 16, 2015 at 5:24 PM, Yin Huai yh...@databricks.com wrote: Hi Koert, For the classloader issue, you probably hit https://issues.apache.org/jira/browse/SPARK-8365, which has been fixed in Spark 1.4.1. Can you try 1.4.1 and see if the exception disappear? Thanks, Yin On Thu, Jul 16, 2015 at 2:12 PM, Koert Kuipers ko...@tresata.com wrote: i am using scala 2.11 spark jars are not in my assembly jar (they are provided), since i launch with spark-submit On Thu, Jul 16, 2015 at 4:34 PM, Koert Kuipers ko...@tresata.com wrote: spark 1.4.0 spark-csv is a normal dependency of my project and in the assembly jar that i use but i also tried adding spark-csv with --package for spark-submit, and got the same error On Thu, Jul 16, 2015 at 4:31 PM, Yin Huai yh...@databricks.com wrote: We do this in SparkILookp ( https://github.com/apache/spark/blob/master/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala#L1023-L1037). What is the version of Spark you are using? How did you add the spark-csv jar? On Thu, Jul 16, 2015 at 1:21 PM, Koert Kuipers ko...@tresata.com wrote: has anyone tried to make HiveContext only if the class is available? i tried this: implicit lazy val sqlc: SQLContext = try { Class.forName(org.apache.spark.sql.hive.HiveContext, true, Thread.currentThread.getContextClassLoader) .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] } catch { case e: ClassNotFoundException = new SQLContext(sc) } it compiles fine, but i get classloader issues when i actually use it on a cluster. for example: Exception in thread main java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:216) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:229) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)