Spark SQL driver memory keeps rising

2016-06-14 Thread Khaled Hammouda
I'm having trouble with a Spark SQL job in which I run a series of SQL
transformations on data loaded from HDFS.

The first two stages load data from hdfs input without issues, but later
stages that require shuffles cause the driver memory to keep rising until
it is exhausted, and then the driver stalls, the spark UI stops responding,
and the I can't even kill the driver with ^C, I have to forcibly kill the
process.

I think I'm allocating enough memory to the driver: driver memory is 44 GB,
and spark.driver.memoryOverhead is 4.5 GB. When I look at the memory usage,
the driver memory before the shuffle starts is at about 2.4 GB (virtual mem
size for the driver process is about 50 GB), and then once the stages that
require shuffle start I can see the driver memory rising fast to about 47
GB, then everything stops responding.

I'm not invoking any output operation that collects data at the driver. I
just call .cache() on a couple of dataframes since they get used more than
once in the SQL transformations, but those should be cached on the workers.
Then I write the final result to a parquet file, but the job doesn't get to
this final stage.

What could possibly be causing the driver memory to rise that fast when no
data is being collected at the driver?

Thanks,
Khaled


Re: sqlcontext - not able to connect to database

2016-06-14 Thread Jeff Zhang
The jdbc driver jar is not on classpath, please add it using --jars

On Wed, Jun 15, 2016 at 12:45 PM, Tejaswini Buche <
tejaswini.buche0...@gmail.com> wrote:

> hi,
>
> I am trying to connect to a mysql database on my machine.
> But, I am getting some error
>
> dataframe_mysql = sqlContext.read.format("jdbc").options(
> url="jdbc:mysql://localhost:3306/my_db",
> driver = "com.mysql.jdbc.Driver",
> dbtable = "data1",
> user="123").load()
>
>
> below is the full trace -
>
> ---Py4JJavaError
>  Traceback (most recent call 
> last) in ()  4 dbtable = 
> "data1",  5 user="root",> 6 password="123").load()  7
> /Users/tejaV/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/readwriter.pyc
>  in load(self, path, format, schema, **options)137 return 
> self._df(self._jreader.load(path))138 else:--> 139 
> return self._df(self._jreader.load())140 141 @since(1.4)
> /Users/tejaV/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)811 answer = 
> self.gateway_client.send_command(command)812 return_value = 
> get_return_value(--> 813 answer, self.gateway_client, 
> self.target_id, self.name)814 815 for temp_arg in temp_args:
> /Users/tejaV/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/utils.pyc
>  in deco(*a, **kw) 43 def deco(*a, **kw): 44 try:---> 45  
>return f(*a, **kw) 46 except 
> py4j.protocol.Py4JJavaError as e: 47 s = 
> e.java_exception.toString()
> /Users/tejaV/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)306  
>raise Py4JJavaError(307 "An error occurred 
> while calling {0}{1}{2}.\n".--> 308 format(target_id, 
> ".", name), value)309 else:310 raise 
> Py4JError(
> Py4JJavaError: An error occurred while calling o98.load.
> : java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:45)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:45)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:45)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:120)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57)
>   at 
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>   at py4j.Gateway.invoke(Gateway.java:259)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at py4j.GatewayConnection.run(GatewayConnection.java:209)
>   at java.lang.Thread.run(Thread.java:745)
>
>
>


-- 
Best Regards

Jeff Zhang


sqlcontext - not able to connect to database

2016-06-14 Thread Tejaswini Buche
hi,

I am trying to connect to a mysql database on my machine.
But, I am getting some error

dataframe_mysql = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost:3306/my_db",
driver = "com.mysql.jdbc.Driver",
dbtable = "data1",
user="123").load()


below is the full trace -

---Py4JJavaError
Traceback (most recent call
last) in ()  4 dbtable =
"data1",  5 user="root",> 6 password="123").load()  7
/Users/tejaV/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/readwriter.pyc
in load(self, path, format, schema, **options)137
return self._df(self._jreader.load(path))138 else:--> 139
   return self._df(self._jreader.load())140 141
@since(1.4)
/Users/tejaV/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
in __call__(self, *args)811 answer =
self.gateway_client.send_command(command)812 return_value
= get_return_value(--> 813 answer, self.gateway_client,
self.target_id, self.name)814 815 for temp_arg in
temp_args:
/Users/tejaV/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/utils.pyc
in deco(*a, **kw) 43 def deco(*a, **kw): 44
try:---> 45 return f(*a, **kw) 46 except
py4j.protocol.Py4JJavaError as e: 47 s =
e.java_exception.toString()
/Users/tejaV/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)306
 raise Py4JJavaError(307 "An error
occurred while calling {0}{1}{2}.\n".--> 308
format(target_id, ".", name), value)309 else:310
  raise Py4JError(
Py4JJavaError: An error occurred while calling o98.load.
: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:45)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:45)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:45)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:120)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
at 
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)


Re: can not show all data for this table

2016-06-14 Thread Mich Talebzadeh
there may be an issue with data in your csv file. like blank header line
etc.

sounds like you have an issue there. I normally get rid of blank lines
before putting csv file in hdfs.

can you actually select from that temp table. like

sql("select TransactionDate, TransactionType, Description, Value, Balance,
AccountName, AccountNumber from tmp").take(2)

replace those with your column names. they are mapped using case class


HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 15 June 2016 at 03:02, Lee Ho Yeung  wrote:

> filter also has error
>
> 16/06/14 19:00:27 WARN Utils: Service 'SparkUI' could not bind on port
> 4040. Attempting port 4041.
> Spark context available as sc.
> SQL context available as sqlContext.
>
> scala> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.SQLContext
>
> scala> val sqlContext = new SQLContext(sc)
> sqlContext: org.apache.spark.sql.SQLContext =
> org.apache.spark.sql.SQLContext@3114ea
>
> scala> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").option("inferSchema", "true").load("/home/martin/result002.csv")
> 16/06/14 19:00:32 WARN SizeEstimator: Failed to check whether
> UseCompressedOops is set; assuming yes
> Java HotSpot(TM) Client VM warning: You have loaded library
> /tmp/libnetty-transport-native-epoll7823347435914767500.so which might have
> disabled stack guard. The VM will try to fix the stack guard now.
> It's highly recommended that you fix the library with 'execstack -c
> ', or link it with '-z noexecstack'.
> df: org.apache.spark.sql.DataFrame = [a0a1a2a3a4a5
> a6a7a8a9: string]
>
> scala> df.printSchema()
> root
>  |-- a0a1a2a3a4a5a6a7a8a9: string
> (nullable = true)
>
>
> scala> df.registerTempTable("sales")
>
> scala> df.filter($"a0".contains("found
> deep=1")).filter($"a1".contains("found
> deep=1")).filter($"a2".contains("found deep=1"))
> org.apache.spark.sql.AnalysisException: cannot resolve 'a0' given input
> columns: [a0a1a2a3a4a5a6a7a8a9];
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>
>
>
>
>
> On Tue, Jun 14, 2016 at 6:19 PM, Lee Ho Yeung 
> wrote:
>
>> after tried following commands, can not show data
>>
>>
>> https://drive.google.com/file/d/0Bxs_ao6uuBDUVkJYVmNaUGx2ZUE/view?usp=sharing
>>
>> https://drive.google.com/file/d/0Bxs_ao6uuBDUc3ltMVZqNlBUYVk/view?usp=sharing
>>
>> /home/martin/Downloads/spark-1.6.1/bin/spark-shell --packages
>> com.databricks:spark-csv_2.11:1.4.0
>>
>> import org.apache.spark.sql.SQLContext
>>
>> val sqlContext = new SQLContext(sc)
>> val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").option("inferSchema", "true").load("/home/martin/result002.csv")
>> df.printSchema()
>> df.registerTempTable("sales")
>> val aggDF = sqlContext.sql("select * from sales where a0 like
>> \"%deep=3%\"")
>> df.collect.foreach(println)
>> aggDF.collect.foreach(println)
>>
>>
>>
>> val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").load("/home/martin/result002.csv")
>> df.printSchema()
>> df.registerTempTable("sales")
>> sqlContext.sql("select * from sales").take(30).foreach(println)
>>
>
>


Re: can not show all data for this table

2016-06-14 Thread Lee Ho Yeung
filter also has error

16/06/14 19:00:27 WARN Utils: Service 'SparkUI' could not bind on port
4040. Attempting port 4041.
Spark context available as sc.
SQL context available as sqlContext.

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext =
org.apache.spark.sql.SQLContext@3114ea

scala> val df =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").option("inferSchema", "true").load("/home/martin/result002.csv")
16/06/14 19:00:32 WARN SizeEstimator: Failed to check whether
UseCompressedOops is set; assuming yes
Java HotSpot(TM) Client VM warning: You have loaded library
/tmp/libnetty-transport-native-epoll7823347435914767500.so which might have
disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c
', or link it with '-z noexecstack'.
df: org.apache.spark.sql.DataFrame = [a0a1a2a3a4a5
a6a7a8a9: string]

scala> df.printSchema()
root
 |-- a0a1a2a3a4a5a6a7a8a9: string
(nullable = true)


scala> df.registerTempTable("sales")

scala> df.filter($"a0".contains("found
deep=1")).filter($"a1".contains("found
deep=1")).filter($"a2".contains("found deep=1"))
org.apache.spark.sql.AnalysisException: cannot resolve 'a0' given input
columns: [a0a1a2a3a4a5a6a7a8a9];
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)




On Tue, Jun 14, 2016 at 6:19 PM, Lee Ho Yeung  wrote:

> after tried following commands, can not show data
>
>
> https://drive.google.com/file/d/0Bxs_ao6uuBDUVkJYVmNaUGx2ZUE/view?usp=sharing
>
> https://drive.google.com/file/d/0Bxs_ao6uuBDUc3ltMVZqNlBUYVk/view?usp=sharing
>
> /home/martin/Downloads/spark-1.6.1/bin/spark-shell --packages
> com.databricks:spark-csv_2.11:1.4.0
>
> import org.apache.spark.sql.SQLContext
>
> val sqlContext = new SQLContext(sc)
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").option("inferSchema", "true").load("/home/martin/result002.csv")
> df.printSchema()
> df.registerTempTable("sales")
> val aggDF = sqlContext.sql("select * from sales where a0 like
> \"%deep=3%\"")
> df.collect.foreach(println)
> aggDF.collect.foreach(println)
>
>
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").load("/home/martin/result002.csv")
> df.printSchema()
> df.registerTempTable("sales")
> sqlContext.sql("select * from sales").take(30).foreach(println)
>


Re: hivecontext error

2016-06-14 Thread Ted Yu
Which release of Spark are you using ?

Can you show the full error trace ?

Thanks

On Tue, Jun 14, 2016 at 6:33 PM, Tejaswini Buche <
tejaswini.buche0...@gmail.com> wrote:

> I am trying to use hivecontext in spark. The following statements are
> running fine :
>
> from pyspark.sql import HiveContext
> sqlContext = HiveContext(sc)
>
> But, when i run the below statement,
>
> sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
>
> I get the following error :
>
> Java Package object not callable
>
> what could be the problem?
> thnx
>


streaming example has error

2016-06-14 Thread Lee Ho Yeung
when simulate streaming with nc -lk 

got error below,

then i try example,

martin@ubuntu:~/Downloads$
/home/martin/Downloads/spark-1.6.1/bin/run-example
streaming.NetworkWordCount localhost 
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
16/06/14 18:33:06 INFO StreamingExamples: Setting log level to [WARN] for
streaming example. To override add a custom log4j.properties to the
classpath.
16/06/14 18:33:06 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
16/06/14 18:33:06 WARN Utils: Your hostname, ubuntu resolves to a loopback
address: 127.0.1.1; using 192.168.157.134 instead (on interface eth0)
16/06/14 18:33:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
16/06/14 18:33:13 WARN SizeEstimator: Failed to check whether
UseCompressedOops is set; assuming yes


got error too.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", )
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
ssc.start()
ssc.awaitTermination()



scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._

scala> val conf = new
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@67bcaf

scala> val ssc = new StreamingContext(conf, Seconds(1))
16/06/14 18:28:44 WARN AbstractLifeCycle: FAILED
SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address
already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at
org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at
org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at
org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.eclipse.jetty.server.Server.doStart(Server.java:293)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at
org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:252)
at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262)
at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1988)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1979)
at
org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:262)
at org.apache.spark.ui.WebUI.bind(WebUI.scala:136)
at
org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481)
at
org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext.(SparkContext.scala:481)
at
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874)
at
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:81)
at
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36)
at
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)
at
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
at
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:57)
at $line37.$read$$iwC$$iwC$$iwC$$iwC.(:59)
at $line37.$read$$iwC$$iwC$$iwC.(:61)
at $line37.$read$$iwC$$iwC.(:63)
at $line37.$read$$iwC.(:65)
at $line37.$read.(:67)
at $line37.$read$.(:71)
at $line37.$read$.()
at $line37.$eval$.(:7)
at $line37.$eval$.()
at $line37.$eval.$print()
at 

hivecontext error

2016-06-14 Thread Tejaswini Buche
I am trying to use hivecontext in spark. The following statements are
running fine :

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

But, when i run the below statement,

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

I get the following error :

Java Package object not callable

what could be the problem?
thnx


Re: Writing empty Dataframes doesn't save any _metadata files in Spark 1.5.1 and 1.6

2016-06-14 Thread Hyukjin Kwon
Ops, I just so the link. It is not actually only for Spark 2.0.


To be clear, https://issues.apache.org/jira/browse/SPARK-15393 was a bit
different with your case (it was about writing empty data frame with empty
partitions).

This was caused by https://github.com/apache/spark/pull/12855 and reverted.



I wrote your case in the comments in that JIRA.



2016-06-15 10:26 GMT+09:00 Hyukjin Kwon :

> Yea, I met this case before. I guess this is related with
> https://issues.apache.org/jira/browse/SPARK-15393.
>
> 2016-06-15 8:46 GMT+09:00 antoniosi :
>
>> I tried the following code in both Spark 1.5.1 and Spark 1.6.0:
>>
>> import org.apache.spark.sql.types.{
>> StructType, StructField, StringType, IntegerType}
>> import org.apache.spark.sql.Row
>>
>> val schema = StructType(
>> StructField("k", StringType, true) ::
>> StructField("v", IntegerType, false) :: Nil)
>>
>> sqlContext.createDataFrame(sc.emptyRDD[Row], schema)
>> df.write.save("hdfs://xxx")
>>
>> Both 1.5.1 and 1.6.0 only save _SUCCESS file. It does not save any
>> _metadata
>> files. Also, in 1.6.0, it also gives the following error:
>>
>> 16/06/14 16:29:27 WARN ParquetOutputCommitter: could not write summary
>> file
>> for hdfs://xxx
>> java.lang.NullPointerException
>> at
>>
>> org.apache.parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:456)
>> at
>>
>> org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:420)
>> at
>>
>> org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
>> at
>>
>> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
>> at
>>
>> org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
>> at
>>
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
>> at
>>
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
>> at
>>
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
>>
>> I do not get this exception in 1.5.1 version though.
>>
>> I see this bug https://issues.apache.org/jira/browse/SPARK-15393, but
>> this
>> is for Spark 2.0. Is there a same bug in Spark 1.5.1 and 1.6?
>>
>> Is there a way we could save an empty dataframe properly?
>>
>> Thanks.
>>
>> Antonio.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Writing-empty-Dataframes-doesn-t-save-any-metadata-files-in-Spark-1-5-1-and-1-6-tp27169.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Writing empty Dataframes doesn't save any _metadata files in Spark 1.5.1 and 1.6

2016-06-14 Thread Hyukjin Kwon
Yea, I met this case before. I guess this is related with
https://issues.apache.org/jira/browse/SPARK-15393.

2016-06-15 8:46 GMT+09:00 antoniosi :

> I tried the following code in both Spark 1.5.1 and Spark 1.6.0:
>
> import org.apache.spark.sql.types.{
> StructType, StructField, StringType, IntegerType}
> import org.apache.spark.sql.Row
>
> val schema = StructType(
> StructField("k", StringType, true) ::
> StructField("v", IntegerType, false) :: Nil)
>
> sqlContext.createDataFrame(sc.emptyRDD[Row], schema)
> df.write.save("hdfs://xxx")
>
> Both 1.5.1 and 1.6.0 only save _SUCCESS file. It does not save any
> _metadata
> files. Also, in 1.6.0, it also gives the following error:
>
> 16/06/14 16:29:27 WARN ParquetOutputCommitter: could not write summary file
> for hdfs://xxx
> java.lang.NullPointerException
> at
>
> org.apache.parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:456)
> at
>
> org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:420)
> at
>
> org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
> at
>
> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
> at
>
> org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
> at
>
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
> at
>
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
> at
>
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
>
> I do not get this exception in 1.5.1 version though.
>
> I see this bug https://issues.apache.org/jira/browse/SPARK-15393, but this
> is for Spark 2.0. Is there a same bug in Spark 1.5.1 and 1.6?
>
> Is there a way we could save an empty dataframe properly?
>
> Thanks.
>
> Antonio.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Writing-empty-Dataframes-doesn-t-save-any-metadata-files-in-Spark-1-5-1-and-1-6-tp27169.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


can not show all data for this table

2016-06-14 Thread Lee Ho Yeung
after tried following commands, can not show data

https://drive.google.com/file/d/0Bxs_ao6uuBDUVkJYVmNaUGx2ZUE/view?usp=sharing
https://drive.google.com/file/d/0Bxs_ao6uuBDUc3ltMVZqNlBUYVk/view?usp=sharing

/home/martin/Downloads/spark-1.6.1/bin/spark-shell --packages
com.databricks:spark-csv_2.11:1.4.0

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val df =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").option("inferSchema", "true").load("/home/martin/result002.csv")
df.printSchema()
df.registerTempTable("sales")
val aggDF = sqlContext.sql("select * from sales where a0 like \"%deep=3%\"")
df.collect.foreach(println)
aggDF.collect.foreach(println)



val df =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("/home/martin/result002.csv")
df.printSchema()
df.registerTempTable("sales")
sqlContext.sql("select * from sales").take(30).foreach(println)


Writing empty Dataframes doesn't save any _metadata files in Spark 1.5.1 and 1.6

2016-06-14 Thread antoniosi
I tried the following code in both Spark 1.5.1 and Spark 1.6.0:

import org.apache.spark.sql.types.{
StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row

val schema = StructType(
StructField("k", StringType, true) ::
StructField("v", IntegerType, false) :: Nil)

sqlContext.createDataFrame(sc.emptyRDD[Row], schema)
df.write.save("hdfs://xxx")

Both 1.5.1 and 1.6.0 only save _SUCCESS file. It does not save any _metadata
files. Also, in 1.6.0, it also gives the following error:

16/06/14 16:29:27 WARN ParquetOutputCommitter: could not write summary file
for hdfs://xxx
java.lang.NullPointerException
at
org.apache.parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:456)
at
org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:420)
at
org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
at
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at
org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)

I do not get this exception in 1.5.1 version though.

I see this bug https://issues.apache.org/jira/browse/SPARK-15393, but this
is for Spark 2.0. Is there a same bug in Spark 1.5.1 and 1.6?

Is there a way we could save an empty dataframe properly?

Thanks.

Antonio.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-empty-Dataframes-doesn-t-save-any-metadata-files-in-Spark-1-5-1-and-1-6-tp27169.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Memory Error - Not enough space to cache broadcast

2016-06-14 Thread Cassa L
Hi,
I would appreciate any clue on this. It has become a bottleneck for our
spark job.

On Mon, Jun 13, 2016 at 2:56 PM, Cassa L  wrote:

> Hi,
>
> I'm using spark 1.5.1 version. I am reading data from Kafka into Spark and 
> writing it into Cassandra after processing it. Spark job starts fine and runs 
> all good for some time until I start getting below errors. Once these errors 
> come, job start to lag behind and I see that job has scheduling and 
> processing delays in streaming  UI.
>
> Worker memory is 6GB, executor-memory is 5GB, I also tried to tweak 
> memoryFraction parameters. Nothing works.
>
>
> 16/06/13 21:26:02 INFO MemoryStore: ensureFreeSpace(4044) called with 
> curMem=565394, maxMem=2778495713
> 16/06/13 21:26:02 INFO MemoryStore: Block broadcast_69652_piece0 stored as 
> bytes in memory (estimated size 3.9 KB, free 2.6 GB)
> 16/06/13 21:26:02 INFO TorrentBroadcast: Reading broadcast variable 69652 
> took 2 ms
> 16/06/13 21:26:02 WARN MemoryStore: Failed to reserve initial memory 
> threshold of 1024.0 KB for computing block broadcast_69652 in memory.
> 16/06/13 21:26:02 WARN MemoryStore: Not enough space to cache broadcast_69652 
> in memory! (computed 496.0 B so far)
> 16/06/13 21:26:02 INFO MemoryStore: Memory use = 556.1 KB (blocks) + 2.6 GB 
> (scratch space shared across 0 tasks(s)) = 2.6 GB. Storage limit = 2.6 GB.
> 16/06/13 21:26:02 WARN MemoryStore: Persisting block broadcast_69652 to disk 
> instead.
> 16/06/13 21:26:02 INFO BlockManager: Found block rdd_100761_1 locally
> 16/06/13 21:26:02 INFO Executor: Finished task 0.0 in stage 71577.0 (TID 
> 452316). 2043 bytes result sent to driver
>
>
> Thanks,
>
> L
>
>


spark standalone High availibilty issues

2016-06-14 Thread Darshan Singh
Hi,

I am using standalone spark cluster and using zookeeper cluster for the
high availbilty. I am getting sometimes error when I start the master. The
error is related to Leader election in curator and says that noMethod found
(getProcess) and master doesnt get started.

Just wondering what could be causing the issue.

I am using same zookeeper cluster for HDFS High availability and it is
working just fine.


Thanks


choice of RDD function

2016-06-14 Thread Sivakumaran S
Dear friends,

I have set up Kafka 0.9.0.0, Spark 1.6.1 and Scala 2.10. My source is sending a 
json string periodically to a topic in kafka. I am able to consume this topic 
using Spark Streaming and print it. The schema of the source json is as 
follows: 

{ “id”: 121156, “ht”: 42, “rotor_rpm”: 180, “temp”: 14.2, “time”:146593512}
{ “id”: 121157, “ht”: 18, “rotor_rpm”: 110, “temp”: 12.2, “time”: 146593512}
{ “id”: 121156, “ht”: 36, “rotor_rpm”: 160, “temp”: 14.4, “time”: 146593513}
{ “id”: 121157, “ht”: 19, “rotor_rpm”: 120, “temp”: 12.0, “time”: 146593513}
and so on.


In Spark streaming, I want to find the average of “ht” (height), “rotor_rpm” 
and “temp” for each “id". I also want to find the max and min of the same 
fields in the time window (300 seconds in this case). 

Q1. Can this be done using plain RDD and streaming functions or does it 
require Dataframes/SQL? There may be more fields added to the json at a later 
stage. There will be a lot of “id”s at a later stage.

Q2. If it can be done using either, which one would render to be more 
efficient and fast?

As of now, the entire set up is in a single laptop. 

Thanks in advance.

Regards,

Siva

Re: spark-ec2 scripts with spark-2.0.0-preview

2016-06-14 Thread Shivaram Venkataraman
Can you open an issue on https://github.com/amplab/spark-ec2 ?  I
think we should be able to escape the version string and pass the
2.0.0-preview through the scripts

Shivaram

On Tue, Jun 14, 2016 at 12:07 PM, Sunil Kumar
 wrote:
> Hi,
>
> The spark-ec2 scripts are missing from spark-2.0.0-preview. Is there a
> workaround available ? I tried to change the ec2 scripts to accomodate
> spark-2.0.0...If I call the release spark-2.0.0-preview, then it barfs
> because the command line argument : --spark-version=spark-2.0.0-preview
> gets translated to spark-2.0.0-preiew (-v is taken as a switch)...If I call
> the release spark-2.0.0, then it cant find it in aws, since it looks for
> http://s3.amazonaws.com/spark-related-packages/spark-2.0.0-bin-hadoop2.4.tgz
> instead of
> http://s3.amazonaws.com/spark-related-packages/spark-2.0.0-preview-bin-hadoop2.4.tgz
>
> Any ideas on how to make this work ? How can I tweak/hack the code to look
> for spark-2.0.0-preview in spark-related-packages ?
>
> thanks
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkContext#cancelJobGroup : is it safe? Who got burn? Who is alive?

2016-06-14 Thread Bertrand Dechoux
Hi,

I am wondering about the safety of the *SparkContext#cancelJobGroup* method
that should allow to stop specific (ie not all) jobs inside a spark context.

There is a big disclaimer (
https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/SparkContext.html#setJobGroup(java.lang.String,%20java.lang.String,%20boolean)
.

If interruptOnCancel is set to true for the job group, then job
> cancellation will result in Thread.interrupt() being called on the job's
> executor threads. This is useful to help ensure that the tasks are actually
> stopped in a timely manner, but is off by default due to HDFS-1208, where
> HDFS may respond to Thread.interrupt() by marking nodes as dead.


I have two main questions :

   1. What is the expected behavior if it is not interrupted on cancel? I
   am especially curious about the YARN case with HDFS but any info is welcome.
   2. Who is or was using the *interruptOnCancel* ? Do you got burn? It is
   still working without any incident?

Thanks in advance for any info, feedbacks and war stories.

Bertrand Dechoux


spark-ec2 scripts with spark-2.0.0-preview

2016-06-14 Thread Sunil Kumar
Hi,
The spark-ec2 scripts are missing from spark-2.0.0-preview. Is there a 
workaround available ? I tried to change the ec2 scripts to accomodate 
spark-2.0.0...If I call the release spark-2.0.0-preview, then it barfs because 
the command line argument : --spark-version=spark-2.0.0-preview  gets 
translated to spark-2.0.0-preiew (-v is taken as a switch)...If I call the 
release spark-2.0.0, then it cant find it in aws, since it looks for 
http://s3.amazonaws.com/spark-related-packages/spark-2.0.0-bin-hadoop2.4.tgz 
instead of  
http://s3.amazonaws.com/spark-related-packages/spark-2.0.0-preview-bin-hadoop2.4.tgz
Any ideas on how to make this work ? How can I tweak/hack the code to look for 
spark-2.0.0-preview in spark-related-packages ?
thanks


Re: Spark 2.0: Unify DataFrames and Datasets question

2016-06-14 Thread Xinh Huynh
Hi Arun,

This documentation may be helpful:

The 2.0-preview Scala doc for Dataset class:
http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Dataset
Note that the Dataset API has completely changed from 1.6.

In 2.0, there is no separate DataFrame class. Rather, it is a type alias
defined here:
http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.package@DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
"type DataFrame = Dataset

[Row

]"
Unlike in 1.6, a DataFrame is a specific Dataset[T], where T=Row, so
DataFrame shares the same methods as Dataset.

As mentioned earlier, this unification is only available in Scala and Java.

Xinh

On Tue, Jun 14, 2016 at 10:45 AM, Michael Armbrust 
wrote:

> 1) What does this really mean to an Application developer?
>>
>
> It means there are less concepts to learn.
>
>
>> 2) Why this unification was needed in Spark 2.0?
>>
>
> To simplify the API and reduce the number of concepts that needed to be
> learned.  We only didn't do it in 1.6 because we didn't want to break
> binary compatibility in a minor release.
>
>
>> 3) What changes can be observed in Spark 2.0 vs Spark 1.6?
>>
>
> There is no DataFrame class, all methods are still available, except those
> that returned an RDD (now you can call df.rdd.map if that is still what you
> want)
>
>
>> 4) Compile time safety will be there for DataFrames too?
>>
>
> Slide 7
>
>
>> 5) Python API is supported for Datasets in 2.0?
>>
>
> Slide 10
>


Re: Spark-SQL with Oozie

2016-06-14 Thread nsalian
Hi,

Thanks for the question.
This would be a good starting point for your Oozie workflow application with
a Spark action.




-
Neelesh S. Salian
Cloudera
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-with-Oozie-tp27167p27168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 2.0: Unify DataFrames and Datasets question

2016-06-14 Thread Michael Armbrust
>
> 1) What does this really mean to an Application developer?
>

It means there are less concepts to learn.


> 2) Why this unification was needed in Spark 2.0?
>

To simplify the API and reduce the number of concepts that needed to be
learned.  We only didn't do it in 1.6 because we didn't want to break
binary compatibility in a minor release.


> 3) What changes can be observed in Spark 2.0 vs Spark 1.6?
>

There is no DataFrame class, all methods are still available, except those
that returned an RDD (now you can call df.rdd.map if that is still what you
want)


> 4) Compile time safety will be there for DataFrames too?
>

Slide 7


> 5) Python API is supported for Datasets in 2.0?
>

Slide 10


Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-14 Thread Mich Talebzadeh
Hi Swetha,

Have you actually tried doing this in Hive using Hive CLI or beeline?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 14 June 2016 at 18:43, Mich Talebzadeh  wrote:

> In all probability there is no user database created in Hive
>
> Create a database yourself
>
> sql("create if not exists database test")
>
> It would be helpful if you grasp some concept of Hive databases etc?
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 June 2016 at 15:40, Sree Eedupuganti  wrote:
>
>> Hi Spark users, i am new to spark. I am trying to connect hive using
>> SparkJavaContext. Unable to connect to the database. By executing the below
>> code i can see only "default" database. Can anyone help me out. What i need
>> is a sample program for Querying Hive results using SparkJavaContext. Need
>> to pass any values like this.
>>
>> userDF.registerTempTable("userRecordsTemp")
>>
>> sqlContext.sql("SET hive.default.fileformat=Orc  ")
>> sqlContext.sql("set hive.enforce.bucketing = true; ")
>> sqlContext.sql("set hive.enforce.sorting = true; ")
>>
>>  public static void  main(String[] args ) throws Exception {
>>   SparkConf sparkConf = new
>> SparkConf().setAppName("SparkSQL").setMaster("local");
>>   SparkContext  ctx=new SparkContext(sparkConf);
>>   HiveContext  hiveql=new
>> org.apache.spark.sql.hive.HiveContext(ctx);
>>   DataFrame df=hiveql.sql("show databases");
>>   df.show();
>>   }
>>
>> Any suggestions pleaseThanks.
>>
>
>


Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-14 Thread Mich Talebzadeh
In all probability there is no user database created in Hive

Create a database yourself

sql("create if not exists database test")

It would be helpful if you grasp some concept of Hive databases etc?

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 14 June 2016 at 15:40, Sree Eedupuganti  wrote:

> Hi Spark users, i am new to spark. I am trying to connect hive using
> SparkJavaContext. Unable to connect to the database. By executing the below
> code i can see only "default" database. Can anyone help me out. What i need
> is a sample program for Querying Hive results using SparkJavaContext. Need
> to pass any values like this.
>
> userDF.registerTempTable("userRecordsTemp")
>
> sqlContext.sql("SET hive.default.fileformat=Orc  ")
> sqlContext.sql("set hive.enforce.bucketing = true; ")
> sqlContext.sql("set hive.enforce.sorting = true; ")
>
>  public static void  main(String[] args ) throws Exception {
>   SparkConf sparkConf = new
> SparkConf().setAppName("SparkSQL").setMaster("local");
>   SparkContext  ctx=new SparkContext(sparkConf);
>   HiveContext  hiveql=new
> org.apache.spark.sql.hive.HiveContext(ctx);
>   DataFrame df=hiveql.sql("show databases");
>   df.show();
>   }
>
> Any suggestions pleaseThanks.
>


Re: Spark 2.0: Unify DataFrames and Datasets question

2016-06-14 Thread Arun Patel
Can anyone answer these questions please.



On Mon, Jun 13, 2016 at 6:51 PM, Arun Patel  wrote:

> Thanks Michael.
>
> I went thru these slides already and could not find answers for these
> specific questions.
>
> I created a Dataset and converted it to DataFrame in 1.6 and 2.0.  I don't
> see any difference in 1.6 vs 2.0.  So, I really got confused and asked
> these questions about unification.
>
> Appreciate if you can answer these specific questions.  Thank you very
> much!
>
> On Mon, Jun 13, 2016 at 2:55 PM, Michael Armbrust 
> wrote:
>
>> Here's a talk I gave on the topic:
>>
>> https://www.youtube.com/watch?v=i7l3JQRx7Qw
>>
>> http://www.slideshare.net/SparkSummit/structuring-spark-dataframes-datasets-and-streaming-by-michael-armbrust
>>
>> On Mon, Jun 13, 2016 at 4:01 AM, Arun Patel 
>> wrote:
>>
>>> In Spark 2.0, DataFrames and Datasets are unified. DataFrame is simply
>>> an alias for a Dataset of type row.   I have few questions.
>>>
>>> 1) What does this really mean to an Application developer?
>>> 2) Why this unification was needed in Spark 2.0?
>>> 3) What changes can be observed in Spark 2.0 vs Spark 1.6?
>>> 4) Compile time safety will be there for DataFrames too?
>>> 5) Python API is supported for Datasets in 2.0?
>>>
>>> Thanks
>>> Arun
>>>
>>
>>
>


Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-14 Thread swetha kasireddy
Hi Bijay,

This approach might not work for me as I have to do partial
inserts/overwrites in a given table and data_frame.write.partitionBy will
overwrite the entire table.

Thanks,
Swetha

On Mon, Jun 13, 2016 at 9:25 PM, Bijay Pathak 
wrote:

> Hi Swetha,
>
> One option is to use Hive with the above issues fixed which is Hive 2.0 or
> Cloudera CDH Hive 1.2 which has above issue resolved. One thing to remember
> is it's not the Hive you have installed but the Hive Spark is using which
> in Spark 1.6 is Hive version 1.2 as of now.
>
> The workaround I did for this issue was to write dataframe directly using
> dataframe write method and to create the Hive Table on top of that, doing
> which my processing time was down  from 4+ hrs to just under 1 hr.
>
>
>
> data_frame.write.partitionBy('idPartitioner','dtPartitoner').orc("path/to/final/location")
>
> And ORC format is supported with HiveContext only.
>
> Thanks,
> Bijay
>
>
> On Mon, Jun 13, 2016 at 11:41 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Mich,
>>
>> Following is  a sample code snippet:
>>
>>
>> *val *userDF =
>> userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId", 
>> "userRecord").persist()
>> System.*out*.println(" userRecsDF.partitions.size"+
>> userRecsDF.partitions.size)
>>
>> userDF.registerTempTable("userRecordsTemp")
>>
>> sqlContext.sql("SET hive.default.fileformat=Orc  ")
>> sqlContext.sql("set hive.enforce.bucketing = true; ")
>> sqlContext.sql("set hive.enforce.sorting = true; ")
>> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
>> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
>> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' ")
>> sqlContext.sql(
>>   """ from userRecordsTemp ps   insert overwrite table users
>> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
>> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
>> """.stripMargin)
>>
>>
>> On Mon, Jun 13, 2016 at 10:57 AM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi Bijay,
>>>
>>> If I am hitting this issue,
>>> https://issues.apache.org/jira/browse/HIVE-11940. What needs to be
>>> done? Incrementing to higher version of hive is the only solution?
>>>
>>> Thanks!
>>>
>>> On Mon, Jun 13, 2016 at 10:47 AM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
 Hi,

 Following is  a sample code snippet:


 *val *userDF = userRecsDF.toDF("idPartitioner", "dtPartitioner",
 "userId", "userRecord").persist()
 System.*out*.println(" userRecsDF.partitions.size"+
 userRecsDF.partitions.size)

 userDF.registerTempTable("userRecordsTemp")

 sqlContext.sql("SET hive.default.fileformat=Orc  ")
 sqlContext.sql("set hive.enforce.bucketing = true; ")
 sqlContext.sql("set hive.enforce.sorting = true; ")
 sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
 STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
 dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' "
 )
 sqlContext.sql(
   """ from userRecordsTemp ps   insert overwrite table users
 partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
 ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
 """.stripMargin)




 On Fri, Jun 10, 2016 at 12:10 AM, Bijay Pathak <
 bijay.pat...@cloudwick.com> wrote:

> Hello,
>
> Looks like you are hitting this:
> https://issues.apache.org/jira/browse/HIVE-11940.
>
> Thanks,
> Bijay
>
>
>
> On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> cam you provide a code snippet of how you are populating the target
>> table from temp table.
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 9 June 2016 at 23:43, swetha kasireddy 
>> wrote:
>>
>>> No, I am reading the data from hdfs, transforming it , registering
>>> the data in a temp table using registerTempTable and then doing insert
>>> overwrite using Spark SQl' hiveContext.
>>>
>>> On Thu, Jun 9, 2016 at 3:40 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 how are you doing the insert? from an existing table?

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 

restarting of spark streaming

2016-06-14 Thread Chen, Yan I
Hi,

I notice that in the process of restarting, spark streaming will try to 
recover/replay all the batches it missed. But in this process, will streams be 
checkpointed like the way they are checkpointed in the normal process?

Does anyone know?

Sometimes our cluster goes maintenance, and our streaming process is shutdown 
for e.g. 1 day and restarted. If replaying batches in this period of time 
without checkpointing, the RDD chain will be very big, and memory usage will 
keep going up until all missing batches are replayed.

[memory usage will keep going up until all missing batches are replayed]: this 
is what we observe now.

Thanks,
Yan Chen

___
If you received this email in error, please advise the sender (by return email 
or otherwise) immediately. You have consented to receive the attached 
electronically at the above-noted email address; please retain a copy of this 
confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté 
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse 
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation 
pour les fins de reference future.


Spark-SQL with Oozie

2016-06-14 Thread chandana
Hello,
I would like to configure a spark oozie action to execute spark-sql from a
file on AWS EMR.

spark-sql -f 

Has anybody tried this with oozie spark action? If so, please post your
spark action xml.

Thanks in advance!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-with-Oozie-tp27167.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL NoSuchMethodException...DriverWrapper.()

2016-06-14 Thread Mirko Bernardoni
Hi All,

I’m using Spark 1.6.1 and I’m getting the error below. This appear also with 
the current branch 1.6
The code that is generating the error is loading a table from MsSql server.
I’ve also looked if the microsoft jdbc driver is loaded correctly and it is 
(I’m using an uber jar with all the dependencies inside)
With the version 1.6.0 and the same source the error is not appearing.
I will really appreciate any help or suggestion.

Many thanks,
Mirko




Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at 
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.InstantiationException: 
org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper
at java.lang.Class.newInstance(Class.java:427)
at 
org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:53)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:52)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:120)
at 
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
at 
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at 
com.ixxus.analytic.batch.AlfrescoToRDD.loadDataFrameAsRdd(AlfrescoToRDD.scala:250)
at 
com.ixxus.analytic.batch.AlfrescoToRDD.loadTags(AlfrescoToRDD.scala:189)
at 
com.ixxus.analytic.phase.extraction.ExtractionJob$.execute(ExtractionJob.scala:29)
at com.ixxus.analytic.program.AllProgram$.main(AllProgram.scala:36)
at com.ixxus.analytic.program.AllProgram.main(AllProgram.scala)
... 6 more
Caused by: java.lang.NoSuchMethodException: 
org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 19 more




Re: MAtcheERROR : STRINGTYPE

2016-06-14 Thread Ted Yu
Can you give a bit more detail ?

version of Spark

complete error trace

code snippet which reproduces the error

On Tue, Jun 14, 2016 at 9:54 AM, pseudo oduesp 
wrote:

> hello
>
> why i get this error
>
> when  using
>
> assembleur =  VectorAssembler(  inputCols=l_CDMVT,
> outputCol="aev"+"CODEM")
> output = assembler.transform(df_aev)
>
> L_CDMTV list of columns
>
>
> thanks  ?
>


MAtcheERROR : STRINGTYPE

2016-06-14 Thread pseudo oduesp
hello

why i get this error

when  using

assembleur =  VectorAssembler(  inputCols=l_CDMVT,
outputCol="aev"+"CODEM")
output = assembler.transform(df_aev)

L_CDMTV list of columns


thanks  ?


Re: [Spark 2.0.0] Structured Stream on Kafka

2016-06-14 Thread Chris Fregly
+1 vote, +1 watch

this would be huge.

On Tue, Jun 14, 2016 at 10:47 AM, andy petrella 
wrote:

> kool, voted and watched!
> tx
>
> On Tue, Jun 14, 2016 at 4:44 PM Cody Koeninger  wrote:
>
>> I haven't done any significant work on using structured streaming with
>> kafka, there's a jira ticket for tracking purposes
>>
>> https://issues.apache.org/jira/browse/SPARK-15406
>>
>>
>>
>> On Tue, Jun 14, 2016 at 9:21 AM, andy petrella 
>> wrote:
>> > Heya folks,
>> >
>> > Just wondering if there are some doc regarding using kafka directly
>> from the
>> > reader.stream?
>> > Has it been integrated already (I mean the source)?
>> >
>> > Sorry if the answer is RTFM (but then I'd appreciate a pointer anyway^^)
>> >
>> > thanks,
>> > cheers
>> > andy
>> > --
>> > andy
>>
> --
> andy
>



-- 
*Chris Fregly*
Research Scientist @ PipelineIO
San Francisco, CA
http://pipeline.io


Re: how to investigate skew and DataFrames and RangePartitioner

2016-06-14 Thread Takeshi Yamamuro
Hi,

I'm afraid there is currently no api to define RangeParititoner in df.

// maropu

On Tue, Jun 14, 2016 at 5:04 AM, Peter Halliday  wrote:

> I have two questions
>
> First,I have a failure when I write parquet from Spark 1.6.1 on Amazon EMR
> to S3.  This is full batch, which is over 200GB of source data.  The
> partitioning is based on a geographic identifier we use, and also a date we
> got the data.  However, because of geographical density we certainly could
> be hitting the fact we are getting tiles too dense.  I’m trying to figure
> out how to figure out the size of the file it’s trying to write out.
>
> Second, We use to use RDDs and RangePartitioner for task partitioning.
> However, I don’t see this available in DataFrames.  How does one achieve
> this now.
>
> Peter Halliday
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Is there a limit on the number of tasks in one job?

2016-06-14 Thread Khaled Hammouda
Yes, I check Spark UI to follow what’s going on. It seems to start several 
tasks fine (8 tasks in my case) out of ~70k tasks, and then stalls.

I actually was able to get things to work by disabling dynamic allocation. 
Basically I set the number of executors manually, which disables dynamic 
allocation. This seems to fix the problem.

My guess is that when faced with too many backlogged tasks, the dynamic 
allocator could be having trouble launching executors, or something similar. 
I’m not sure though if this is a bug, but maybe someone familiar with the 
internal of dynamic allocation can tell if this is a bug worth filing.

I’m using YARN as resource manager.

Khaled 

> On Jun 13, 2016, at 6:24 PM, Mich Talebzadeh  
> wrote:
> 
> Have you looked at spark GUI to see what it is waiting for. is that available 
> memory. What is the resource manager you are using?
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 13 June 2016 at 20:45, Khaled Hammouda  > wrote:
> Hi Michael,
> 
> Thanks for the suggestion to use Spark 2.0 preview. I just downloaded the 
> preview and tried using it, but I’m running into the exact same issue.
> 
> Khaled
> 
>> On Jun 13, 2016, at 2:58 PM, Michael Armbrust > > wrote:
>> 
>> You might try with the Spark 2.0 preview.  We spent a bunch of time 
>> improving the handling of many small files.
>> 
>> On Mon, Jun 13, 2016 at 11:19 AM, khaled.hammouda > > wrote:
>> I'm trying to use Spark SQL to load json data that are split across about 70k
>> files across 24 directories in hdfs, using
>> sqlContext.read.json("hdfs:///user/hadoop/data/* <>/*").
>> 
>> This doesn't seem to work for some reason, I get timeout errors like the
>> following:
>> 
>> ---
>> 6/06/13 15:46:31 ERROR TransportChannelHandler: Connection to
>> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 
>>  has been quiet for 12
>> ms while there are outstanding requests. Assuming connection is dead; please
>> adjust spark.network.timeout if this is wrong.
>> 16/06/13 15:46:31 ERROR TransportResponseHandler: Still have 1 requests
>> outstanding when connection from
>> ip-172-31-31-114.ec2.internal/172.31.31.114:46028 
>>  is closed
>> ...
>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
>> seconds]. This timeout is controlled by spark.rpc.askTimeout
>> ...
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [120 seconds]
>> --
>> 
>> I don't want to start tinkering with increasing timeouts yet. I tried to
>> load just one sub-directory, which contains around 4k files, and this seems
>> to work fine. So I thought of writing a loop where I load the json files
>> from each sub-dir and then unionAll the current dataframe with the previous
>> dataframe. However, this also fails because apparently the json files don't
>> have the exact same schema, causing this error:
>> 
>> ---
>> Traceback (most recent call last):
>>   File "/home/hadoop/load_json.py", line 65, in 
>> df = df.unionAll(hrdf)
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
>> line 998, in unionAll
>>   File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
>> line 813, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
>> 51, in deco
>> pyspark.sql.utils.AnalysisException: u"unresolved operator 'Union;"
>> ---
>> 
>> I'd like to know what's preventing Spark from loading 70k files the same way
>> it's loading 4k files?
>> 
>> To give you some idea about my setup and data:
>> - ~70k files across 24 directories in HDFS
>> - Each directory contains 3k files on average
>> - Cluster: 200 nodes EMR cluster, each node has 53 GB memory and 8 cores
>> available to YARN
>> - Spark 1.6.1
>> 
>> Thanks.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-limit-on-the-number-of-tasks-in-one-job-tp27158.html
>>  
>> 
>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>> .
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
>> 
> 
> 



Re: [Spark 2.0.0] Structured Stream on Kafka

2016-06-14 Thread andy petrella
kool, voted and watched!
tx

On Tue, Jun 14, 2016 at 4:44 PM Cody Koeninger  wrote:

> I haven't done any significant work on using structured streaming with
> kafka, there's a jira ticket for tracking purposes
>
> https://issues.apache.org/jira/browse/SPARK-15406
>
>
>
> On Tue, Jun 14, 2016 at 9:21 AM, andy petrella 
> wrote:
> > Heya folks,
> >
> > Just wondering if there are some doc regarding using kafka directly from
> the
> > reader.stream?
> > Has it been integrated already (I mean the source)?
> >
> > Sorry if the answer is RTFM (but then I'd appreciate a pointer anyway^^)
> >
> > thanks,
> > cheers
> > andy
> > --
> > andy
>
-- 
andy


Re: [Spark 2.0.0] Structured Stream on Kafka

2016-06-14 Thread Cody Koeninger
I haven't done any significant work on using structured streaming with
kafka, there's a jira ticket for tracking purposes

https://issues.apache.org/jira/browse/SPARK-15406



On Tue, Jun 14, 2016 at 9:21 AM, andy petrella  wrote:
> Heya folks,
>
> Just wondering if there are some doc regarding using kafka directly from the
> reader.stream?
> Has it been integrated already (I mean the source)?
>
> Sorry if the answer is RTFM (but then I'd appreciate a pointer anyway^^)
>
> thanks,
> cheers
> andy
> --
> andy

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark corrupts text lines

2016-06-14 Thread Kristoffer Sjögren
I managed to get remote debugging up and running and can in fact
reproduce the error and get a breakpoint triggered as it happens.

But it seems like the code does not go through TextInputFormat, or at
least the breakpoint is not triggered from this class? Don't know what
other class to look for the actual split could to occur?

Any pointers?



On Tue, Jun 14, 2016 at 4:03 PM, Kristoffer Sjögren  wrote:
> I'm pretty confident the lines are encoded correctly since I can read
> them both locally and on Spark (by ignoring the faulty line and
> proceed to next). I also get the correct number of lines through
> Spark, again by ignoring the faulty line.
>
> I get the same error by reading the original file using Spark, save as
> new text file, then try decoding again.
>
> context.textFile("/orgfile").saveAsTextFile("/newfile");
>
> Ok, not much left than to do some remote debugging.
>
>
> On Tue, Jun 14, 2016 at 3:38 PM, Kristoffer Sjögren  wrote:
>> Thanks for you help. Really appreciate it!
>>
>> Give me some time i'll come back after I've tried your suggestions.
>>
>> On Tue, Jun 14, 2016 at 3:28 PM, Kristoffer Sjögren  wrote:
>>> I cannot reproduce it by running the file through Spark in local mode
>>> on my machine. So it does indeed seems to be something related to
>>> split across partitions.
>>>
>>> On Tue, Jun 14, 2016 at 3:04 PM, Kristoffer Sjögren  
>>> wrote:
 Can you do remote debugging in Spark? Didn't know that. Do you have a link?

 Also noticed isSplittable in
 org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for
 org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there
 are some way to tell it not to split?

 On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen  wrote:
> It really sounds like the line is being split across partitions. This
> is what TextInputFormat does but should be perfectly capable of
> putting together lines that break across files (partitions). If you're
> into debugging, that's where I would start if you can. Breakpoints
> around how TextInputFormat is parsing lines. See if you can catch it
> when it returns a line that doesn't contain what you expect.
>
> On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren  
> wrote:
>> That's funny. The line after is the rest of the whole line that got
>> split in half. Every following lines after that are fine.
>>
>> I managed to reproduce without gzip also so maybe it's no gzip's fault
>> after all..
>>
>> I'm clueless...
>>
>> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren  
>> wrote:
>>> Seems like it's the gzip. It works if download the file, gunzip and
>>> put it back to another directory and read it the same way.
>>>
>>> Hm.. I wonder what happens with the lines after it..
>>>
>>>
>>>
>>> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen  wrote:
 What if you read it uncompressed from HDFS?
 gzip compression is unfriendly to MR in that it can't split the file.
 It still should just work, certainly if the line is in one file. But,
 a data point worth having.

 On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren 
  wrote:
> The line is in one file. I did download the file manually from HDFS,
> read and decoded it line-by-line successfully without Spark.
>
>
>
> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen  
> wrote:
>> The only thing I can think of is that a line is being broken across 
>> two files?
>> Hadoop easily puts things back together in this case, or should. 
>> There
>> could be some weird factor preventing that. One first place to look:
>> are you using a weird line separator? or at least different from the
>> host OS?
>>
>> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren 
>>  wrote:
>>> I should mention that we're in the end want to store the input from
>>> Protobuf binary to Parquet using the following code. But this comes
>>> after the lines has been decoded from base64 into binary.
>>>
>>>
>>> public static  void save(JavaRDD rdd, Class
>>> clazz, String path) {
>>>   try {
>>> Job job = Job.getInstance();
>>> ParquetOutputFormat.setWriteSupportClass(job, 
>>> ProtoWriteSupport.class);
>>> ProtoParquetOutputFormat.setProtobufClass(job, clazz);
>>> rdd.mapToPair(order -> new Tuple2<>(null, order))
>>>   .saveAsNewAPIHadoopFile(path, Void.class, clazz,
>>> ParquetOutputFormat.class, job.getConfiguration());
>>>   } catch 

Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-14 Thread Sree Eedupuganti
Hi Spark users, i am new to spark. I am trying to connect hive using
SparkJavaContext. Unable to connect to the database. By executing the below
code i can see only "default" database. Can anyone help me out. What i need
is a sample program for Querying Hive results using SparkJavaContext. Need
to pass any values like this.

userDF.registerTempTable("userRecordsTemp")

sqlContext.sql("SET hive.default.fileformat=Orc  ")
sqlContext.sql("set hive.enforce.bucketing = true; ")
sqlContext.sql("set hive.enforce.sorting = true; ")

 public static void  main(String[] args ) throws Exception {
  SparkConf sparkConf = new
SparkConf().setAppName("SparkSQL").setMaster("local");
  SparkContext  ctx=new SparkContext(sparkConf);
  HiveContext  hiveql=new
org.apache.spark.sql.hive.HiveContext(ctx);
  DataFrame df=hiveql.sql("show databases");
  df.show();
  }

Any suggestions pleaseThanks.


Re: RBM in mllib

2016-06-14 Thread Krishna Kalyan
Hi Robert,
According to the jira the Resolution is wont fix. The pull request was
closed as it did not merge cleanly with the master.
(https://github.com/apache/spark/pull/3222)

On Tue, Jun 14, 2016 at 4:23 PM, Roberto Pagliari  wrote:

> Is RBM being developed?
>
> This one is marked as resolved, but it is not
>
> https://issues.apache.org/jira/browse/SPARK-4251
> 
>
>
>


RBM in mllib

2016-06-14 Thread Roberto Pagliari
Is RBM being developed?

This one is marked as resolved, but it is not

https://issues.apache.org/jira/browse/SPARK-4251




[Spark 2.0.0] Structured Stream on Kafka

2016-06-14 Thread andy petrella
Heya folks,

Just wondering if there are some doc regarding using kafka directly from
the reader.stream?
Has it been integrated already (I mean the source)?

Sorry if the answer is RTFM (but then I'd appreciate a pointer anyway^^)

thanks,
cheers
andy
-- 
andy


Re: Spark corrupts text lines

2016-06-14 Thread Kristoffer Sjögren
I'm pretty confident the lines are encoded correctly since I can read
them both locally and on Spark (by ignoring the faulty line and
proceed to next). I also get the correct number of lines through
Spark, again by ignoring the faulty line.

I get the same error by reading the original file using Spark, save as
new text file, then try decoding again.

context.textFile("/orgfile").saveAsTextFile("/newfile");

Ok, not much left than to do some remote debugging.


On Tue, Jun 14, 2016 at 3:38 PM, Kristoffer Sjögren  wrote:
> Thanks for you help. Really appreciate it!
>
> Give me some time i'll come back after I've tried your suggestions.
>
> On Tue, Jun 14, 2016 at 3:28 PM, Kristoffer Sjögren  wrote:
>> I cannot reproduce it by running the file through Spark in local mode
>> on my machine. So it does indeed seems to be something related to
>> split across partitions.
>>
>> On Tue, Jun 14, 2016 at 3:04 PM, Kristoffer Sjögren  wrote:
>>> Can you do remote debugging in Spark? Didn't know that. Do you have a link?
>>>
>>> Also noticed isSplittable in
>>> org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for
>>> org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there
>>> are some way to tell it not to split?
>>>
>>> On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen  wrote:
 It really sounds like the line is being split across partitions. This
 is what TextInputFormat does but should be perfectly capable of
 putting together lines that break across files (partitions). If you're
 into debugging, that's where I would start if you can. Breakpoints
 around how TextInputFormat is parsing lines. See if you can catch it
 when it returns a line that doesn't contain what you expect.

 On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren  
 wrote:
> That's funny. The line after is the rest of the whole line that got
> split in half. Every following lines after that are fine.
>
> I managed to reproduce without gzip also so maybe it's no gzip's fault
> after all..
>
> I'm clueless...
>
> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren  
> wrote:
>> Seems like it's the gzip. It works if download the file, gunzip and
>> put it back to another directory and read it the same way.
>>
>> Hm.. I wonder what happens with the lines after it..
>>
>>
>>
>> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen  wrote:
>>> What if you read it uncompressed from HDFS?
>>> gzip compression is unfriendly to MR in that it can't split the file.
>>> It still should just work, certainly if the line is in one file. But,
>>> a data point worth having.
>>>
>>> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren  
>>> wrote:
 The line is in one file. I did download the file manually from HDFS,
 read and decoded it line-by-line successfully without Spark.



 On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen  wrote:
> The only thing I can think of is that a line is being broken across 
> two files?
> Hadoop easily puts things back together in this case, or should. There
> could be some weird factor preventing that. One first place to look:
> are you using a weird line separator? or at least different from the
> host OS?
>
> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren 
>  wrote:
>> I should mention that we're in the end want to store the input from
>> Protobuf binary to Parquet using the following code. But this comes
>> after the lines has been decoded from base64 into binary.
>>
>>
>> public static  void save(JavaRDD rdd, Class
>> clazz, String path) {
>>   try {
>> Job job = Job.getInstance();
>> ParquetOutputFormat.setWriteSupportClass(job, 
>> ProtoWriteSupport.class);
>> ProtoParquetOutputFormat.setProtobufClass(job, clazz);
>> rdd.mapToPair(order -> new Tuple2<>(null, order))
>>   .saveAsNewAPIHadoopFile(path, Void.class, clazz,
>> ParquetOutputFormat.class, job.getConfiguration());
>>   } catch (IOException e) {
>> throw new RuntimeException(e);
>>   }
>> }
>>
>>
>>
>> 
>>   org.apache.parquet
>>   parquet-protobuf
>>   1.8.1
>> 
>>
>> On Tue, Jun 14, 2016 at 11:37 AM, Kristoffer Sjögren 
>>  wrote:
>>> I'm trying to figure out exactly what information could be useful 
>>> but
>>> it's all as straight forward.
>>>
>>> - It's text files
>>> - Lines ends with a new 

Re: Spark corrupts text lines

2016-06-14 Thread Kristoffer Sjögren
Thanks for you help. Really appreciate it!

Give me some time i'll come back after I've tried your suggestions.

On Tue, Jun 14, 2016 at 3:28 PM, Kristoffer Sjögren  wrote:
> I cannot reproduce it by running the file through Spark in local mode
> on my machine. So it does indeed seems to be something related to
> split across partitions.
>
> On Tue, Jun 14, 2016 at 3:04 PM, Kristoffer Sjögren  wrote:
>> Can you do remote debugging in Spark? Didn't know that. Do you have a link?
>>
>> Also noticed isSplittable in
>> org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for
>> org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there
>> are some way to tell it not to split?
>>
>> On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen  wrote:
>>> It really sounds like the line is being split across partitions. This
>>> is what TextInputFormat does but should be perfectly capable of
>>> putting together lines that break across files (partitions). If you're
>>> into debugging, that's where I would start if you can. Breakpoints
>>> around how TextInputFormat is parsing lines. See if you can catch it
>>> when it returns a line that doesn't contain what you expect.
>>>
>>> On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren  
>>> wrote:
 That's funny. The line after is the rest of the whole line that got
 split in half. Every following lines after that are fine.

 I managed to reproduce without gzip also so maybe it's no gzip's fault
 after all..

 I'm clueless...

 On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren  
 wrote:
> Seems like it's the gzip. It works if download the file, gunzip and
> put it back to another directory and read it the same way.
>
> Hm.. I wonder what happens with the lines after it..
>
>
>
> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen  wrote:
>> What if you read it uncompressed from HDFS?
>> gzip compression is unfriendly to MR in that it can't split the file.
>> It still should just work, certainly if the line is in one file. But,
>> a data point worth having.
>>
>> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren  
>> wrote:
>>> The line is in one file. I did download the file manually from HDFS,
>>> read and decoded it line-by-line successfully without Spark.
>>>
>>>
>>>
>>> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen  wrote:
 The only thing I can think of is that a line is being broken across 
 two files?
 Hadoop easily puts things back together in this case, or should. There
 could be some weird factor preventing that. One first place to look:
 are you using a weird line separator? or at least different from the
 host OS?

 On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren 
  wrote:
> I should mention that we're in the end want to store the input from
> Protobuf binary to Parquet using the following code. But this comes
> after the lines has been decoded from base64 into binary.
>
>
> public static  void save(JavaRDD rdd, Class
> clazz, String path) {
>   try {
> Job job = Job.getInstance();
> ParquetOutputFormat.setWriteSupportClass(job, 
> ProtoWriteSupport.class);
> ProtoParquetOutputFormat.setProtobufClass(job, clazz);
> rdd.mapToPair(order -> new Tuple2<>(null, order))
>   .saveAsNewAPIHadoopFile(path, Void.class, clazz,
> ParquetOutputFormat.class, job.getConfiguration());
>   } catch (IOException e) {
> throw new RuntimeException(e);
>   }
> }
>
>
>
> 
>   org.apache.parquet
>   parquet-protobuf
>   1.8.1
> 
>
> On Tue, Jun 14, 2016 at 11:37 AM, Kristoffer Sjögren 
>  wrote:
>> I'm trying to figure out exactly what information could be useful but
>> it's all as straight forward.
>>
>> - It's text files
>> - Lines ends with a new line character.
>> - Files are gzipped before added to HDFS
>> - Files are read as gzipped files from HDFS by Spark
>> - There are some extra configuration
>>
>> conf.set("spark.files.overwrite", "true");
>> conf.set("spark.hadoop.validateOutputSpecs", "false");
>>
>> Here's the code using Java 8 Base64 class.
>>
>> context.textFile("/log.gz")
>> .map(line -> line.split("="))
>> .map(split -> Base64.getDecoder().decode(split[0]));
>>
>>
>> On Tue, Jun 14, 2016 at 11:26 AM, Sean Owen  
>> wrote:
>>> It's 

Re: Spark corrupts text lines

2016-06-14 Thread Sean Owen
It takes a little setup, but you can do remote debugging:
http://danosipov.com/?p=779  ... and then use similar config to
connect your IDE to a running executor.

Before that you might strip your program down to only a call to
textFile that then checks the lines according to whatever logic would
decide whether it is valid.

gzip isn't splittable, so you should already have one partition per
file instead of potentially several per file. If the line is entirely
in one file then, hm, it really shouldn't be that issue.

Are you sure lines before and after are parsed correctly? wondering if
somehow you are parsing a huge amount of text as a line before it and
this is just where it happens to finally hit some buffer limit. Any
weird Hadoop settings like a small block size?

I suspect there is something more basic going on here. Like are you
sure that the line you get in your program is truly not a line in the
input? you have another line here that has it as a prefix but ... is
that really the same line of input?

On Tue, Jun 14, 2016 at 2:04 PM, Kristoffer Sjögren  wrote:
> Can you do remote debugging in Spark? Didn't know that. Do you have a link?
>
> Also noticed isSplittable in
> org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for
> org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there
> are some way to tell it not to split?
>
> On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen  wrote:
>> It really sounds like the line is being split across partitions. This
>> is what TextInputFormat does but should be perfectly capable of
>> putting together lines that break across files (partitions). If you're
>> into debugging, that's where I would start if you can. Breakpoints
>> around how TextInputFormat is parsing lines. See if you can catch it
>> when it returns a line that doesn't contain what you expect.
>>
>> On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren  wrote:
>>> That's funny. The line after is the rest of the whole line that got
>>> split in half. Every following lines after that are fine.
>>>
>>> I managed to reproduce without gzip also so maybe it's no gzip's fault
>>> after all..
>>>
>>> I'm clueless...
>>>
>>> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren  
>>> wrote:
 Seems like it's the gzip. It works if download the file, gunzip and
 put it back to another directory and read it the same way.

 Hm.. I wonder what happens with the lines after it..



 On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen  wrote:
> What if you read it uncompressed from HDFS?
> gzip compression is unfriendly to MR in that it can't split the file.
> It still should just work, certainly if the line is in one file. But,
> a data point worth having.
>
> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren  
> wrote:
>> The line is in one file. I did download the file manually from HDFS,
>> read and decoded it line-by-line successfully without Spark.
>>
>>
>>
>> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen  wrote:
>>> The only thing I can think of is that a line is being broken across two 
>>> files?
>>> Hadoop easily puts things back together in this case, or should. There
>>> could be some weird factor preventing that. One first place to look:
>>> are you using a weird line separator? or at least different from the
>>> host OS?
>>>
>>> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren  
>>> wrote:
 I should mention that we're in the end want to store the input from
 Protobuf binary to Parquet using the following code. But this comes
 after the lines has been decoded from base64 into binary.


 public static  void save(JavaRDD rdd, Class
 clazz, String path) {
   try {
 Job job = Job.getInstance();
 ParquetOutputFormat.setWriteSupportClass(job, 
 ProtoWriteSupport.class);
 ProtoParquetOutputFormat.setProtobufClass(job, clazz);
 rdd.mapToPair(order -> new Tuple2<>(null, order))
   .saveAsNewAPIHadoopFile(path, Void.class, clazz,
 ParquetOutputFormat.class, job.getConfiguration());
   } catch (IOException e) {
 throw new RuntimeException(e);
   }
 }



 
   org.apache.parquet
   parquet-protobuf
   1.8.1
 

 On Tue, Jun 14, 2016 at 11:37 AM, Kristoffer Sjögren 
  wrote:
> I'm trying to figure out exactly what information could be useful but
> it's all as straight forward.
>
> - It's text files
> - Lines ends with a new line character.
> - Files are gzipped before added to HDFS
> - Files are read as 

Re: Spark corrupts text lines

2016-06-14 Thread Kristoffer Sjögren
I cannot reproduce it by running the file through Spark in local mode
on my machine. So it does indeed seems to be something related to
split across partitions.

On Tue, Jun 14, 2016 at 3:04 PM, Kristoffer Sjögren  wrote:
> Can you do remote debugging in Spark? Didn't know that. Do you have a link?
>
> Also noticed isSplittable in
> org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for
> org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there
> are some way to tell it not to split?
>
> On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen  wrote:
>> It really sounds like the line is being split across partitions. This
>> is what TextInputFormat does but should be perfectly capable of
>> putting together lines that break across files (partitions). If you're
>> into debugging, that's where I would start if you can. Breakpoints
>> around how TextInputFormat is parsing lines. See if you can catch it
>> when it returns a line that doesn't contain what you expect.
>>
>> On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren  wrote:
>>> That's funny. The line after is the rest of the whole line that got
>>> split in half. Every following lines after that are fine.
>>>
>>> I managed to reproduce without gzip also so maybe it's no gzip's fault
>>> after all..
>>>
>>> I'm clueless...
>>>
>>> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren  
>>> wrote:
 Seems like it's the gzip. It works if download the file, gunzip and
 put it back to another directory and read it the same way.

 Hm.. I wonder what happens with the lines after it..



 On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen  wrote:
> What if you read it uncompressed from HDFS?
> gzip compression is unfriendly to MR in that it can't split the file.
> It still should just work, certainly if the line is in one file. But,
> a data point worth having.
>
> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren  
> wrote:
>> The line is in one file. I did download the file manually from HDFS,
>> read and decoded it line-by-line successfully without Spark.
>>
>>
>>
>> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen  wrote:
>>> The only thing I can think of is that a line is being broken across two 
>>> files?
>>> Hadoop easily puts things back together in this case, or should. There
>>> could be some weird factor preventing that. One first place to look:
>>> are you using a weird line separator? or at least different from the
>>> host OS?
>>>
>>> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren  
>>> wrote:
 I should mention that we're in the end want to store the input from
 Protobuf binary to Parquet using the following code. But this comes
 after the lines has been decoded from base64 into binary.


 public static  void save(JavaRDD rdd, Class
 clazz, String path) {
   try {
 Job job = Job.getInstance();
 ParquetOutputFormat.setWriteSupportClass(job, 
 ProtoWriteSupport.class);
 ProtoParquetOutputFormat.setProtobufClass(job, clazz);
 rdd.mapToPair(order -> new Tuple2<>(null, order))
   .saveAsNewAPIHadoopFile(path, Void.class, clazz,
 ParquetOutputFormat.class, job.getConfiguration());
   } catch (IOException e) {
 throw new RuntimeException(e);
   }
 }



 
   org.apache.parquet
   parquet-protobuf
   1.8.1
 

 On Tue, Jun 14, 2016 at 11:37 AM, Kristoffer Sjögren 
  wrote:
> I'm trying to figure out exactly what information could be useful but
> it's all as straight forward.
>
> - It's text files
> - Lines ends with a new line character.
> - Files are gzipped before added to HDFS
> - Files are read as gzipped files from HDFS by Spark
> - There are some extra configuration
>
> conf.set("spark.files.overwrite", "true");
> conf.set("spark.hadoop.validateOutputSpecs", "false");
>
> Here's the code using Java 8 Base64 class.
>
> context.textFile("/log.gz")
> .map(line -> line.split("="))
> .map(split -> Base64.getDecoder().decode(split[0]));
>
>
> On Tue, Jun 14, 2016 at 11:26 AM, Sean Owen  
> wrote:
>> It's really the MR InputSplit code that splits files into records.
>> Nothing particularly interesting happens in that process, except for
>> breaking on newlines.
>>
>> Do you have one huge line in the file? are you reading as a text 
>> file?
>> can you give any more 

Re: Spark corrupts text lines

2016-06-14 Thread Kristoffer Sjögren
Can you do remote debugging in Spark? Didn't know that. Do you have a link?

Also noticed isSplittable in
org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for
org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there
are some way to tell it not to split?

On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen  wrote:
> It really sounds like the line is being split across partitions. This
> is what TextInputFormat does but should be perfectly capable of
> putting together lines that break across files (partitions). If you're
> into debugging, that's where I would start if you can. Breakpoints
> around how TextInputFormat is parsing lines. See if you can catch it
> when it returns a line that doesn't contain what you expect.
>
> On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren  wrote:
>> That's funny. The line after is the rest of the whole line that got
>> split in half. Every following lines after that are fine.
>>
>> I managed to reproduce without gzip also so maybe it's no gzip's fault
>> after all..
>>
>> I'm clueless...
>>
>> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren  
>> wrote:
>>> Seems like it's the gzip. It works if download the file, gunzip and
>>> put it back to another directory and read it the same way.
>>>
>>> Hm.. I wonder what happens with the lines after it..
>>>
>>>
>>>
>>> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen  wrote:
 What if you read it uncompressed from HDFS?
 gzip compression is unfriendly to MR in that it can't split the file.
 It still should just work, certainly if the line is in one file. But,
 a data point worth having.

 On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren  
 wrote:
> The line is in one file. I did download the file manually from HDFS,
> read and decoded it line-by-line successfully without Spark.
>
>
>
> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen  wrote:
>> The only thing I can think of is that a line is being broken across two 
>> files?
>> Hadoop easily puts things back together in this case, or should. There
>> could be some weird factor preventing that. One first place to look:
>> are you using a weird line separator? or at least different from the
>> host OS?
>>
>> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren  
>> wrote:
>>> I should mention that we're in the end want to store the input from
>>> Protobuf binary to Parquet using the following code. But this comes
>>> after the lines has been decoded from base64 into binary.
>>>
>>>
>>> public static  void save(JavaRDD rdd, Class
>>> clazz, String path) {
>>>   try {
>>> Job job = Job.getInstance();
>>> ParquetOutputFormat.setWriteSupportClass(job, 
>>> ProtoWriteSupport.class);
>>> ProtoParquetOutputFormat.setProtobufClass(job, clazz);
>>> rdd.mapToPair(order -> new Tuple2<>(null, order))
>>>   .saveAsNewAPIHadoopFile(path, Void.class, clazz,
>>> ParquetOutputFormat.class, job.getConfiguration());
>>>   } catch (IOException e) {
>>> throw new RuntimeException(e);
>>>   }
>>> }
>>>
>>>
>>>
>>> 
>>>   org.apache.parquet
>>>   parquet-protobuf
>>>   1.8.1
>>> 
>>>
>>> On Tue, Jun 14, 2016 at 11:37 AM, Kristoffer Sjögren  
>>> wrote:
 I'm trying to figure out exactly what information could be useful but
 it's all as straight forward.

 - It's text files
 - Lines ends with a new line character.
 - Files are gzipped before added to HDFS
 - Files are read as gzipped files from HDFS by Spark
 - There are some extra configuration

 conf.set("spark.files.overwrite", "true");
 conf.set("spark.hadoop.validateOutputSpecs", "false");

 Here's the code using Java 8 Base64 class.

 context.textFile("/log.gz")
 .map(line -> line.split("="))
 .map(split -> Base64.getDecoder().decode(split[0]));


 On Tue, Jun 14, 2016 at 11:26 AM, Sean Owen  wrote:
> It's really the MR InputSplit code that splits files into records.
> Nothing particularly interesting happens in that process, except for
> breaking on newlines.
>
> Do you have one huge line in the file? are you reading as a text file?
> can you give any more detail about exactly how you parse it? it could
> be something else in your code.
>
> On Tue, Jun 14, 2016 at 10:24 AM, Kristoffer Sjögren 
>  wrote:
>> Hi
>>
>> We have log files that are written in base64 encoded text files
>> (gzipped) where each line is ended with a new line character.

Re: Spark corrupts text lines

2016-06-14 Thread Sean Owen
It really sounds like the line is being split across partitions. This
is what TextInputFormat does but should be perfectly capable of
putting together lines that break across files (partitions). If you're
into debugging, that's where I would start if you can. Breakpoints
around how TextInputFormat is parsing lines. See if you can catch it
when it returns a line that doesn't contain what you expect.

On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren  wrote:
> That's funny. The line after is the rest of the whole line that got
> split in half. Every following lines after that are fine.
>
> I managed to reproduce without gzip also so maybe it's no gzip's fault
> after all..
>
> I'm clueless...
>
> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren  wrote:
>> Seems like it's the gzip. It works if download the file, gunzip and
>> put it back to another directory and read it the same way.
>>
>> Hm.. I wonder what happens with the lines after it..
>>
>>
>>
>> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen  wrote:
>>> What if you read it uncompressed from HDFS?
>>> gzip compression is unfriendly to MR in that it can't split the file.
>>> It still should just work, certainly if the line is in one file. But,
>>> a data point worth having.
>>>
>>> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren  
>>> wrote:
 The line is in one file. I did download the file manually from HDFS,
 read and decoded it line-by-line successfully without Spark.



 On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen  wrote:
> The only thing I can think of is that a line is being broken across two 
> files?
> Hadoop easily puts things back together in this case, or should. There
> could be some weird factor preventing that. One first place to look:
> are you using a weird line separator? or at least different from the
> host OS?
>
> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren  
> wrote:
>> I should mention that we're in the end want to store the input from
>> Protobuf binary to Parquet using the following code. But this comes
>> after the lines has been decoded from base64 into binary.
>>
>>
>> public static  void save(JavaRDD rdd, Class
>> clazz, String path) {
>>   try {
>> Job job = Job.getInstance();
>> ParquetOutputFormat.setWriteSupportClass(job, 
>> ProtoWriteSupport.class);
>> ProtoParquetOutputFormat.setProtobufClass(job, clazz);
>> rdd.mapToPair(order -> new Tuple2<>(null, order))
>>   .saveAsNewAPIHadoopFile(path, Void.class, clazz,
>> ParquetOutputFormat.class, job.getConfiguration());
>>   } catch (IOException e) {
>> throw new RuntimeException(e);
>>   }
>> }
>>
>>
>>
>> 
>>   org.apache.parquet
>>   parquet-protobuf
>>   1.8.1
>> 
>>
>> On Tue, Jun 14, 2016 at 11:37 AM, Kristoffer Sjögren  
>> wrote:
>>> I'm trying to figure out exactly what information could be useful but
>>> it's all as straight forward.
>>>
>>> - It's text files
>>> - Lines ends with a new line character.
>>> - Files are gzipped before added to HDFS
>>> - Files are read as gzipped files from HDFS by Spark
>>> - There are some extra configuration
>>>
>>> conf.set("spark.files.overwrite", "true");
>>> conf.set("spark.hadoop.validateOutputSpecs", "false");
>>>
>>> Here's the code using Java 8 Base64 class.
>>>
>>> context.textFile("/log.gz")
>>> .map(line -> line.split("="))
>>> .map(split -> Base64.getDecoder().decode(split[0]));
>>>
>>>
>>> On Tue, Jun 14, 2016 at 11:26 AM, Sean Owen  wrote:
 It's really the MR InputSplit code that splits files into records.
 Nothing particularly interesting happens in that process, except for
 breaking on newlines.

 Do you have one huge line in the file? are you reading as a text file?
 can you give any more detail about exactly how you parse it? it could
 be something else in your code.

 On Tue, Jun 14, 2016 at 10:24 AM, Kristoffer Sjögren 
  wrote:
> Hi
>
> We have log files that are written in base64 encoded text files
> (gzipped) where each line is ended with a new line character.
>
> For some reason a particular line [1] is split by Spark [2] making it
> unparsable by the base64 decoder. It does this consequently no matter
> if I gives it the particular file that contain the line or a bunch of
> files.
>
> I know the line is not corrupt because I can manually download the
> file from HDFS, gunzip it and read/decode all the lines without
> problems.
>
> Was thinking that 

Re: Spark corrupts text lines

2016-06-14 Thread Kristoffer Sjögren
That's funny. The line after is the rest of the whole line that got
split in half. Every following lines after that are fine.

I managed to reproduce without gzip also so maybe it's no gzip's fault
after all..

I'm clueless...

On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren  wrote:
> Seems like it's the gzip. It works if download the file, gunzip and
> put it back to another directory and read it the same way.
>
> Hm.. I wonder what happens with the lines after it..
>
>
>
> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen  wrote:
>> What if you read it uncompressed from HDFS?
>> gzip compression is unfriendly to MR in that it can't split the file.
>> It still should just work, certainly if the line is in one file. But,
>> a data point worth having.
>>
>> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren  
>> wrote:
>>> The line is in one file. I did download the file manually from HDFS,
>>> read and decoded it line-by-line successfully without Spark.
>>>
>>>
>>>
>>> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen  wrote:
 The only thing I can think of is that a line is being broken across two 
 files?
 Hadoop easily puts things back together in this case, or should. There
 could be some weird factor preventing that. One first place to look:
 are you using a weird line separator? or at least different from the
 host OS?

 On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren  
 wrote:
> I should mention that we're in the end want to store the input from
> Protobuf binary to Parquet using the following code. But this comes
> after the lines has been decoded from base64 into binary.
>
>
> public static  void save(JavaRDD rdd, Class
> clazz, String path) {
>   try {
> Job job = Job.getInstance();
> ParquetOutputFormat.setWriteSupportClass(job, 
> ProtoWriteSupport.class);
> ProtoParquetOutputFormat.setProtobufClass(job, clazz);
> rdd.mapToPair(order -> new Tuple2<>(null, order))
>   .saveAsNewAPIHadoopFile(path, Void.class, clazz,
> ParquetOutputFormat.class, job.getConfiguration());
>   } catch (IOException e) {
> throw new RuntimeException(e);
>   }
> }
>
>
>
> 
>   org.apache.parquet
>   parquet-protobuf
>   1.8.1
> 
>
> On Tue, Jun 14, 2016 at 11:37 AM, Kristoffer Sjögren  
> wrote:
>> I'm trying to figure out exactly what information could be useful but
>> it's all as straight forward.
>>
>> - It's text files
>> - Lines ends with a new line character.
>> - Files are gzipped before added to HDFS
>> - Files are read as gzipped files from HDFS by Spark
>> - There are some extra configuration
>>
>> conf.set("spark.files.overwrite", "true");
>> conf.set("spark.hadoop.validateOutputSpecs", "false");
>>
>> Here's the code using Java 8 Base64 class.
>>
>> context.textFile("/log.gz")
>> .map(line -> line.split("="))
>> .map(split -> Base64.getDecoder().decode(split[0]));
>>
>>
>> On Tue, Jun 14, 2016 at 11:26 AM, Sean Owen  wrote:
>>> It's really the MR InputSplit code that splits files into records.
>>> Nothing particularly interesting happens in that process, except for
>>> breaking on newlines.
>>>
>>> Do you have one huge line in the file? are you reading as a text file?
>>> can you give any more detail about exactly how you parse it? it could
>>> be something else in your code.
>>>
>>> On Tue, Jun 14, 2016 at 10:24 AM, Kristoffer Sjögren  
>>> wrote:
 Hi

 We have log files that are written in base64 encoded text files
 (gzipped) where each line is ended with a new line character.

 For some reason a particular line [1] is split by Spark [2] making it
 unparsable by the base64 decoder. It does this consequently no matter
 if I gives it the particular file that contain the line or a bunch of
 files.

 I know the line is not corrupt because I can manually download the
 file from HDFS, gunzip it and read/decode all the lines without
 problems.

 Was thinking that maybe there is a limit to number of characters per
 line but that doesn't sound right? Maybe the combination of characters
 makes Spark think it's new line?

 I'm clueless.

 Cheers,
 -Kristoffer

 [1] Original line:

 

Re: Create external table with partitions using sqlContext.createExternalTable

2016-06-14 Thread Mich Talebzadeh
it is a good to be in control :)

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 14 June 2016 at 13:06, Patrick Duin  wrote:

> Thanks, yes I have something similar working as "the alternative
> solution". :)
>
> I was hoping to get away with not having to specify my schema so the
> sqlContext.createExternalTable seemed like a nice clean approach.
>
> 2016-06-14 13:59 GMT+02:00 Mich Talebzadeh :
>
>> Try this this will work
>>
>> sql("use test")
>> sql("drop table if exists test.orctype")
>> var sqltext: String = ""
>> sqltext = """
>> CREATE EXTERNAL TABLE test.orctype(
>>  prod_id bigint,
>>  cust_id bigint,
>>  time_id timestamp,
>>  channel_id bigint,
>>  promo_id bigint,
>>  quantity_sold decimal(10,0),
>>  amount_sold decimal(10,0))
>> PARTITIONED BY (
>>year int,
>>month int)
>> CLUSTERED BY (
>>  prod_id,
>>  cust_id,
>>  time_id,
>>  channel_id,
>>  promo_id)
>> INTO 256 BUCKETS
>> STORED AS ORC
>> LOCATION '/tmp'
>> TBLPROPERTIES ( "orc.compress"="SNAPPY",
>>   "orc.create.index"="true",
>>
>> "orc.bloom.filter.columns"="PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID",
>>   "orc.bloom.filter.fpp"="0.05",
>>   "orc.stripe.size"="268435456",
>>   "orc.row.index.stride"="1" )
>> """
>> sql(sqltext)
>> sql("select count(1) from test.orctype").show
>>
>> res2: org.apache.spark.sql.DataFrame = [result: string]
>> +---+
>> |_c0|
>> +---+
>> |  0|
>> +---+
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 14 June 2016 at 11:39, Patrick Duin  wrote:
>>
>>> Hi,
>>>
>>> I'm trying to use sqlContext.createExternalTable("my_table",
>>> "/tmp/location/", "orc") to create tables. This is working fine for
>>> non-partitioned tables. I'd like to create a partitioned table though, how
>>> do I do that?
>>>
>>> Can I add some information in the options: Map[String, String]
>>> parameter?
>>>
>>> Thanks,
>>>  Patrick
>>>
>>
>>
>


Re: Spark 2.0.0 : GLM problem

2016-06-14 Thread april_ZMQ
To update the post:

•   First problem:  This problem can be solved by adding a epsilon(very 
small
value to 0 value). Because in poisson model, it doesn't allow the y value to
be zero. But in general, it doesn't have this requirement.

But now I encounter another problem that in every GLM model.
"Values to assemble cannot be null"
 

I've found the code in 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala

  
 

Can you guys explain what that mean?














--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-0-GLM-problem-tp27145p27164.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Create external table with partitions using sqlContext.createExternalTable

2016-06-14 Thread Patrick Duin
Thanks, yes I have something similar working as "the alternative solution".
:)

I was hoping to get away with not having to specify my schema so the
sqlContext.createExternalTable seemed like a nice clean approach.

2016-06-14 13:59 GMT+02:00 Mich Talebzadeh :

> Try this this will work
>
> sql("use test")
> sql("drop table if exists test.orctype")
> var sqltext: String = ""
> sqltext = """
> CREATE EXTERNAL TABLE test.orctype(
>  prod_id bigint,
>  cust_id bigint,
>  time_id timestamp,
>  channel_id bigint,
>  promo_id bigint,
>  quantity_sold decimal(10,0),
>  amount_sold decimal(10,0))
> PARTITIONED BY (
>year int,
>month int)
> CLUSTERED BY (
>  prod_id,
>  cust_id,
>  time_id,
>  channel_id,
>  promo_id)
> INTO 256 BUCKETS
> STORED AS ORC
> LOCATION '/tmp'
> TBLPROPERTIES ( "orc.compress"="SNAPPY",
>   "orc.create.index"="true",
>
> "orc.bloom.filter.columns"="PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID",
>   "orc.bloom.filter.fpp"="0.05",
>   "orc.stripe.size"="268435456",
>   "orc.row.index.stride"="1" )
> """
> sql(sqltext)
> sql("select count(1) from test.orctype").show
>
> res2: org.apache.spark.sql.DataFrame = [result: string]
> +---+
> |_c0|
> +---+
> |  0|
> +---+
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 June 2016 at 11:39, Patrick Duin  wrote:
>
>> Hi,
>>
>> I'm trying to use sqlContext.createExternalTable("my_table",
>> "/tmp/location/", "orc") to create tables. This is working fine for
>> non-partitioned tables. I'd like to create a partitioned table though, how
>> do I do that?
>>
>> Can I add some information in the options: Map[String, String] parameter?
>>
>> Thanks,
>>  Patrick
>>
>
>


Re: Spark Streaming application failing with Kerboros issue while writing data to HBase

2016-06-14 Thread Kamesh
Thanks Ted.

Thanks & Regards
Kamesh.

On Mon, Jun 13, 2016 at 10:48 PM, Ted Yu  wrote:

> Can you show snippet of your code, please ?
>
> Please refer to obtainTokenForHBase() in
> yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
>
> Cheers
>
> On Mon, Jun 13, 2016 at 4:44 AM, Kamesh  wrote:
>
>> Hi All,
>>  We are building a spark streaming application and that application
>> writes data to HBase table. But writes/reads are failing with following
>> exception
>>
>> 16/06/13 04:35:16 ERROR ipc.AbstractRpcClient: SASL authentication
>> failed. The most likely cause is missing or invalid credentials. Consider
>> 'kinit'.
>>
>> javax.security.sasl.SaslException: GSS initiate failed [Caused by
>> GSSException: No valid credentials provided (Mechanism level: Failed to
>> find any Kerberos tgt)]
>>
>> at
>> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>>
>> at
>> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
>>
>> at
>> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
>>
>> This application is failing at Executor machine. Executor is not able to
>> pass the token. Can someone help me how to resolve this issue.
>>
>> *Environment Details*
>> Spark Version : 1.6.1
>> HBase Version : 1.0.0
>> Hadoop Version : 2.6.0
>>
>> --
>> Thanks & Regards
>> Kamesh.
>>
>
>


Re: Create external table with partitions using sqlContext.createExternalTable

2016-06-14 Thread Mich Talebzadeh
Try this this will work

sql("use test")
sql("drop table if exists test.orctype")
var sqltext: String = ""
sqltext = """
CREATE EXTERNAL TABLE test.orctype(
 prod_id bigint,
 cust_id bigint,
 time_id timestamp,
 channel_id bigint,
 promo_id bigint,
 quantity_sold decimal(10,0),
 amount_sold decimal(10,0))
PARTITIONED BY (
   year int,
   month int)
CLUSTERED BY (
 prod_id,
 cust_id,
 time_id,
 channel_id,
 promo_id)
INTO 256 BUCKETS
STORED AS ORC
LOCATION '/tmp'
TBLPROPERTIES ( "orc.compress"="SNAPPY",
  "orc.create.index"="true",

"orc.bloom.filter.columns"="PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID",
  "orc.bloom.filter.fpp"="0.05",
  "orc.stripe.size"="268435456",
  "orc.row.index.stride"="1" )
"""
sql(sqltext)
sql("select count(1) from test.orctype").show

res2: org.apache.spark.sql.DataFrame = [result: string]
+---+
|_c0|
+---+
|  0|
+---+

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 14 June 2016 at 11:39, Patrick Duin  wrote:

> Hi,
>
> I'm trying to use sqlContext.createExternalTable("my_table",
> "/tmp/location/", "orc") to create tables. This is working fine for
> non-partitioned tables. I'd like to create a partitioned table though, how
> do I do that?
>
> Can I add some information in the options: Map[String, String] parameter?
>
> Thanks,
>  Patrick
>


Running streaming applications in Production environment

2016-06-14 Thread Mail.com
Hi All,

Can you please advise best practices to running streaming jobs in Production 
that reads from Kafka.

How do we trigger them - through a start script and best ways to monitor the 
application is running and send alert when down etc.

Thanks,
Pradeep






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 2.0 Preview After caching query didn't work and can't kill job.

2016-06-14 Thread Chanh Le
I am testing Spark 2.0
I load data from alluxio and cached then I query but the first query is ok 
because it kick off cache action. But after that I run the query again and it’s 
stuck.
I ran in cluster 5 nodes in spark-shell.

Did anyone has this issue?



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Create external table with partitions using sqlContext.createExternalTable

2016-06-14 Thread Patrick Duin
Hi,

I'm trying to use sqlContext.createExternalTable("my_table",
"/tmp/location/", "orc") to create tables. This is working fine for
non-partitioned tables. I'd like to create a partitioned table though, how
do I do that?

Can I add some information in the options: Map[String, String] parameter?

Thanks,
 Patrick


Re: Spark corrupts text lines

2016-06-14 Thread Jeff Zhang
Can you read this file using MR job ?

On Tue, Jun 14, 2016 at 5:26 PM, Sean Owen  wrote:

> It's really the MR InputSplit code that splits files into records.
> Nothing particularly interesting happens in that process, except for
> breaking on newlines.
>
> Do you have one huge line in the file? are you reading as a text file?
> can you give any more detail about exactly how you parse it? it could
> be something else in your code.
>
> On Tue, Jun 14, 2016 at 10:24 AM, Kristoffer Sjögren 
> wrote:
> > Hi
> >
> > We have log files that are written in base64 encoded text files
> > (gzipped) where each line is ended with a new line character.
> >
> > For some reason a particular line [1] is split by Spark [2] making it
> > unparsable by the base64 decoder. It does this consequently no matter
> > if I gives it the particular file that contain the line or a bunch of
> > files.
> >
> > I know the line is not corrupt because I can manually download the
> > file from HDFS, gunzip it and read/decode all the lines without
> > problems.
> >
> > Was thinking that maybe there is a limit to number of characters per
> > line but that doesn't sound right? Maybe the combination of characters
> > makes Spark think it's new line?
> >
> > I'm clueless.
> >
> > Cheers,
> > -Kristoffer
> >
> > [1] Original line:
> >
> >
> CsAJCtwGCghwYWdlVmlldxC4PhjM1v66BSJFaHR0cDovL25hLnNlL3Nwb3J0ZW4vc3BvcnR0dC8xLjM5MTU5MjEtdXBwZ2lmdGVyLXNtZWRlcm5hLW1vdC1rb25rdXJzKjhVcHBnaWZ0ZXI6IFNtZWRlcm5hIG1vdCBrb25rdXJzIC0gU3BvcnQgKFRUKSAtIHd3dy5uYS5zZTJXaHR0cDovL25hLnNlL255aGV0ZXIvb3JlYnJvLzEuMzk2OTU0My1rcnlwaGFsLW9wcG5hci1mb3Itb2JvLWF0dC1iZWhhbGxhLXRqYW5zdGViaWxhcm5hOqECcWdrZWplNGpmLHBkMzBmdDRuNCxwbHB0b3JqNncscGxwczBvamZvLHBkYjVrZGM4eCxwbHBzN293Y3UscGE0czN1bXp5LHBhNHJla25sNyxwYTRyd3dxam4scGE0c21ra2Z4LHBkM2tpa3BmMixwZDNqcjE5dGMscGQ0ZGQ0M2F3LHAwZ3MwbmlqMSxwYTRvZTNrbXoscGE0cWJ3eDZxLHBkM2s2NW00dyxwYTRyazc3Z3IscGQzMHAzdW8wLHBkNGM1ajV5dixwbHB0c211NmcscGM3bXNibmM5LHBhNHFpaTdsZCxwbHB0dnpqdnUscGE0bmlsdmFnLHBhNHB6cjN2cyxwZDNsZDNnYmkscGl1a2M2NmVlLHB5dHoyOThzNErIAgoTNzI0NTY2NzU0MzQxNTUyOTQ4ORAAGAAioQJxZ2tlamU0amYscGQzMGZ0NG40LHBscHRvcmo2dyxwbHBzMG9qZm8scGRiNWtkYzh4LHBscHM3b3djdSxwYTRzM3VtenkscGE0cmVrbmw3LHBhNHJ3d3FqbixwYTRzbWtrZngscGQza2lrcGYyLHBkM2pyMTl0YyxwZDRkZDQzYXcscDBnczBuaWoxLHBhNG9lM2tteixwYTRxYnd4NnEscGQzazY1bTR3LHBhNHJrNzdncixwZDMwcDN1bzAscGQ0YzVqNXl2LHBscHRzbXU2ZyxwYzdtc2JuYzkscGE0cWlpN2xkLHBscHR2emp2dSxwYTRuaWx2YWcscGE0cHpyM3ZzLHBkM2xkM2diaSxwaXVrYzY2ZWUscHl0ejI5OHM0KgkzOTUxLDM5NjAS3gIIxNjxhJTVsJcVEqUBTW96aWxsYS81LjAgKExpbnV4OyBBbmRyb2lkIDUuMS4xOyBTQU1TVU5HIFNNLUczODhGIEJ1aWxkL0xNWTQ4QikgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgU2Ftc3VuZ0Jyb3dzZXIvMy4zIENocm9tZS8zOC4wLjIxMjUuMTAyIE1vYmlsZSBTYWZhcmkvNTM3LjM2IjUKDDYyLjIwLjE5Ni44MBWgd3NBHRgibUIiAlNFKgfDlnJlYnJvMg5UZWxpYVNvbmVyYSBBQigAMdejcD0K1+s/OABCCAiAAhWamRlAQgcIURUAAOBAQggIlAEVzczMP0IHCFQVmpkJQUIICJYBFTMzE0BCBwhYFZqZ+UBCCAj6ARWamdk/QggImwEVzcysQEoHCAYVO6ysPkoHCAQVRYO4PkoHCAEVIg0APw===1465887564
> >
> >
> > [2] Line as spark hands it over:
> >
> >
> CsAJCtwGCghwYWdlVmlldxC4PhjM1v66BSJFaHR0cDovL25hLnNlL3Nwb3J0ZW4vc3BvcnR0dC8xLjM5MTU5MjEtdXBwZ2lmdGVyLXNtZWRlcm5hLW1vdC1rb25rdXJzKjhVcHBnaWZ0ZXI6IFNtZWRlcm5hIG1vdCBrb25rdXJzIC0gU3BvcnQgKFRUKSAtIHd3dy5uYS5zZTJXaHR0cDovL25hLnNlL255aGV0ZXIvb3JlYnJvLzEuMzk2OTU0My1rcnlwaGFsLW9wcG5hci1mb3Itb2JvLWF0dC1iZWhhbGxhLXRqYW5zdGViaWxhcm5hOqECcWdrZWplNGpmLHBkMzBmdDRuNCxwbHB0b3JqNncscGxwczBvamZvLHBkYjVrZGM4eCxwbHBzN293Y3UscGE0czN1bXp5LHBhNHJla25sNyxwYTRyd3dxam4scGE0c21ra2Z4LHBkM2tpa3BmMixwZDNqcjE5dGMscGQ0ZGQ0M2F3LHAwZ3MwbmlqMSxwYTRvZTNrbXoscGE0cWJ3eDZxLHBkM2s2NW00dyxwYTRyazc3Z3IscGQzMHAzdW8wLHBkNGM1ajV5dixwbHB0c211NmcscGM3bXNibmM5LHBhNHFpaTdsZCxwbHB0dnpqdnUscGE0bmlsdmFnLHBhNHB6cjN2cyxwZDNsZDNnYmkscGl1a2M2NmVlLHB5dHoyOThzNErIAgoTNzI0NTY2NzU0MzQxNTUyOTQ4ORAAGAAioQJxZ2tlamU0amYscGQzMGZ0NG40LHBscHRvcmo2dyxwbHBzMG9qZm8scGRiNWtkYzh4LHBscHM3b3djdSxwYTRzM3VtenkscGE0cmVrbmw3LHBhNHJ3d3FqbixwYTRzbWtrZngscGQza2lrcGYyLHBkM2pyMTl0YyxwZDRkZDQzYXcscDBnczBuaWoxLHBhNG9lM2tteixwYTRxYnd4NnEscGQzazY1bTR3LHBhNHJrNzdncixwZDMwcDN1bzAscGQ0YzVqNXl2LHBscHRzbXU2ZyxwYzdtc2JuYzkscGE0cWlpN2xkLHBscHR2emp2dSxwYTRuaWx2YWcscGE0
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Spark corrupts text lines

2016-06-14 Thread Sean Owen
It's really the MR InputSplit code that splits files into records.
Nothing particularly interesting happens in that process, except for
breaking on newlines.

Do you have one huge line in the file? are you reading as a text file?
can you give any more detail about exactly how you parse it? it could
be something else in your code.

On Tue, Jun 14, 2016 at 10:24 AM, Kristoffer Sjögren  wrote:
> Hi
>
> We have log files that are written in base64 encoded text files
> (gzipped) where each line is ended with a new line character.
>
> For some reason a particular line [1] is split by Spark [2] making it
> unparsable by the base64 decoder. It does this consequently no matter
> if I gives it the particular file that contain the line or a bunch of
> files.
>
> I know the line is not corrupt because I can manually download the
> file from HDFS, gunzip it and read/decode all the lines without
> problems.
>
> Was thinking that maybe there is a limit to number of characters per
> line but that doesn't sound right? Maybe the combination of characters
> makes Spark think it's new line?
>
> I'm clueless.
>
> Cheers,
> -Kristoffer
>
> [1] Original line:
>
> CsAJCtwGCghwYWdlVmlldxC4PhjM1v66BSJFaHR0cDovL25hLnNlL3Nwb3J0ZW4vc3BvcnR0dC8xLjM5MTU5MjEtdXBwZ2lmdGVyLXNtZWRlcm5hLW1vdC1rb25rdXJzKjhVcHBnaWZ0ZXI6IFNtZWRlcm5hIG1vdCBrb25rdXJzIC0gU3BvcnQgKFRUKSAtIHd3dy5uYS5zZTJXaHR0cDovL25hLnNlL255aGV0ZXIvb3JlYnJvLzEuMzk2OTU0My1rcnlwaGFsLW9wcG5hci1mb3Itb2JvLWF0dC1iZWhhbGxhLXRqYW5zdGViaWxhcm5hOqECcWdrZWplNGpmLHBkMzBmdDRuNCxwbHB0b3JqNncscGxwczBvamZvLHBkYjVrZGM4eCxwbHBzN293Y3UscGE0czN1bXp5LHBhNHJla25sNyxwYTRyd3dxam4scGE0c21ra2Z4LHBkM2tpa3BmMixwZDNqcjE5dGMscGQ0ZGQ0M2F3LHAwZ3MwbmlqMSxwYTRvZTNrbXoscGE0cWJ3eDZxLHBkM2s2NW00dyxwYTRyazc3Z3IscGQzMHAzdW8wLHBkNGM1ajV5dixwbHB0c211NmcscGM3bXNibmM5LHBhNHFpaTdsZCxwbHB0dnpqdnUscGE0bmlsdmFnLHBhNHB6cjN2cyxwZDNsZDNnYmkscGl1a2M2NmVlLHB5dHoyOThzNErIAgoTNzI0NTY2NzU0MzQxNTUyOTQ4ORAAGAAioQJxZ2tlamU0amYscGQzMGZ0NG40LHBscHRvcmo2dyxwbHBzMG9qZm8scGRiNWtkYzh4LHBscHM3b3djdSxwYTRzM3VtenkscGE0cmVrbmw3LHBhNHJ3d3FqbixwYTRzbWtrZngscGQza2lrcGYyLHBkM2pyMTl0YyxwZDRkZDQzYXcscDBnczBuaWoxLHBhNG9lM2tteixwYTRxYnd4NnEscGQzazY1bTR3LHBhNHJrNzdncixwZDMwcDN1bzAscGQ0YzVqNXl2LHBscHRzbXU2ZyxwYzdtc2JuYzkscGE0cWlpN2xkLHBscHR2emp2dSxwYTRuaWx2YWcscGE0cHpyM3ZzLHBkM2xkM2diaSxwaXVrYzY2ZWUscHl0ejI5OHM0KgkzOTUxLDM5NjAS3gIIxNjxhJTVsJcVEqUBTW96aWxsYS81LjAgKExpbnV4OyBBbmRyb2lkIDUuMS4xOyBTQU1TVU5HIFNNLUczODhGIEJ1aWxkL0xNWTQ4QikgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgU2Ftc3VuZ0Jyb3dzZXIvMy4zIENocm9tZS8zOC4wLjIxMjUuMTAyIE1vYmlsZSBTYWZhcmkvNTM3LjM2IjUKDDYyLjIwLjE5Ni44MBWgd3NBHRgibUIiAlNFKgfDlnJlYnJvMg5UZWxpYVNvbmVyYSBBQigAMdejcD0K1+s/OABCCAiAAhWamRlAQgcIURUAAOBAQggIlAEVzczMP0IHCFQVmpkJQUIICJYBFTMzE0BCBwhYFZqZ+UBCCAj6ARWamdk/QggImwEVzcysQEoHCAYVO6ysPkoHCAQVRYO4PkoHCAEVIg0APw===1465887564
>
>
> [2] Line as spark hands it over:
>
> CsAJCtwGCghwYWdlVmlldxC4PhjM1v66BSJFaHR0cDovL25hLnNlL3Nwb3J0ZW4vc3BvcnR0dC8xLjM5MTU5MjEtdXBwZ2lmdGVyLXNtZWRlcm5hLW1vdC1rb25rdXJzKjhVcHBnaWZ0ZXI6IFNtZWRlcm5hIG1vdCBrb25rdXJzIC0gU3BvcnQgKFRUKSAtIHd3dy5uYS5zZTJXaHR0cDovL25hLnNlL255aGV0ZXIvb3JlYnJvLzEuMzk2OTU0My1rcnlwaGFsLW9wcG5hci1mb3Itb2JvLWF0dC1iZWhhbGxhLXRqYW5zdGViaWxhcm5hOqECcWdrZWplNGpmLHBkMzBmdDRuNCxwbHB0b3JqNncscGxwczBvamZvLHBkYjVrZGM4eCxwbHBzN293Y3UscGE0czN1bXp5LHBhNHJla25sNyxwYTRyd3dxam4scGE0c21ra2Z4LHBkM2tpa3BmMixwZDNqcjE5dGMscGQ0ZGQ0M2F3LHAwZ3MwbmlqMSxwYTRvZTNrbXoscGE0cWJ3eDZxLHBkM2s2NW00dyxwYTRyazc3Z3IscGQzMHAzdW8wLHBkNGM1ajV5dixwbHB0c211NmcscGM3bXNibmM5LHBhNHFpaTdsZCxwbHB0dnpqdnUscGE0bmlsdmFnLHBhNHB6cjN2cyxwZDNsZDNnYmkscGl1a2M2NmVlLHB5dHoyOThzNErIAgoTNzI0NTY2NzU0MzQxNTUyOTQ4ORAAGAAioQJxZ2tlamU0amYscGQzMGZ0NG40LHBscHRvcmo2dyxwbHBzMG9qZm8scGRiNWtkYzh4LHBscHM3b3djdSxwYTRzM3VtenkscGE0cmVrbmw3LHBhNHJ3d3FqbixwYTRzbWtrZngscGQza2lrcGYyLHBkM2pyMTl0YyxwZDRkZDQzYXcscDBnczBuaWoxLHBhNG9lM2tteixwYTRxYnd4NnEscGQzazY1bTR3LHBhNHJrNzdncixwZDMwcDN1bzAscGQ0YzVqNXl2LHBscHRzbXU2ZyxwYzdtc2JuYzkscGE0cWlpN2xkLHBscHR2emp2dSxwYTRuaWx2YWcscGE0
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark corrupts text lines

2016-06-14 Thread Kristoffer Sjögren
Hi

We have log files that are written in base64 encoded text files
(gzipped) where each line is ended with a new line character.

For some reason a particular line [1] is split by Spark [2] making it
unparsable by the base64 decoder. It does this consequently no matter
if I gives it the particular file that contain the line or a bunch of
files.

I know the line is not corrupt because I can manually download the
file from HDFS, gunzip it and read/decode all the lines without
problems.

Was thinking that maybe there is a limit to number of characters per
line but that doesn't sound right? Maybe the combination of characters
makes Spark think it's new line?

I'm clueless.

Cheers,
-Kristoffer

[1] Original line:

CsAJCtwGCghwYWdlVmlldxC4PhjM1v66BSJFaHR0cDovL25hLnNlL3Nwb3J0ZW4vc3BvcnR0dC8xLjM5MTU5MjEtdXBwZ2lmdGVyLXNtZWRlcm5hLW1vdC1rb25rdXJzKjhVcHBnaWZ0ZXI6IFNtZWRlcm5hIG1vdCBrb25rdXJzIC0gU3BvcnQgKFRUKSAtIHd3dy5uYS5zZTJXaHR0cDovL25hLnNlL255aGV0ZXIvb3JlYnJvLzEuMzk2OTU0My1rcnlwaGFsLW9wcG5hci1mb3Itb2JvLWF0dC1iZWhhbGxhLXRqYW5zdGViaWxhcm5hOqECcWdrZWplNGpmLHBkMzBmdDRuNCxwbHB0b3JqNncscGxwczBvamZvLHBkYjVrZGM4eCxwbHBzN293Y3UscGE0czN1bXp5LHBhNHJla25sNyxwYTRyd3dxam4scGE0c21ra2Z4LHBkM2tpa3BmMixwZDNqcjE5dGMscGQ0ZGQ0M2F3LHAwZ3MwbmlqMSxwYTRvZTNrbXoscGE0cWJ3eDZxLHBkM2s2NW00dyxwYTRyazc3Z3IscGQzMHAzdW8wLHBkNGM1ajV5dixwbHB0c211NmcscGM3bXNibmM5LHBhNHFpaTdsZCxwbHB0dnpqdnUscGE0bmlsdmFnLHBhNHB6cjN2cyxwZDNsZDNnYmkscGl1a2M2NmVlLHB5dHoyOThzNErIAgoTNzI0NTY2NzU0MzQxNTUyOTQ4ORAAGAAioQJxZ2tlamU0amYscGQzMGZ0NG40LHBscHRvcmo2dyxwbHBzMG9qZm8scGRiNWtkYzh4LHBscHM3b3djdSxwYTRzM3VtenkscGE0cmVrbmw3LHBhNHJ3d3FqbixwYTRzbWtrZngscGQza2lrcGYyLHBkM2pyMTl0YyxwZDRkZDQzYXcscDBnczBuaWoxLHBhNG9lM2tteixwYTRxYnd4NnEscGQzazY1bTR3LHBhNHJrNzdncixwZDMwcDN1bzAscGQ0YzVqNXl2LHBscHRzbXU2ZyxwYzdtc2JuYzkscGE0cWlpN2xkLHBscHR2emp2dSxwYTRuaWx2YWcscGE0cHpyM3ZzLHBkM2xkM2diaSxwaXVrYzY2ZWUscHl0ejI5OHM0KgkzOTUxLDM5NjAS3gIIxNjxhJTVsJcVEqUBTW96aWxsYS81LjAgKExpbnV4OyBBbmRyb2lkIDUuMS4xOyBTQU1TVU5HIFNNLUczODhGIEJ1aWxkL0xNWTQ4QikgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgU2Ftc3VuZ0Jyb3dzZXIvMy4zIENocm9tZS8zOC4wLjIxMjUuMTAyIE1vYmlsZSBTYWZhcmkvNTM3LjM2IjUKDDYyLjIwLjE5Ni44MBWgd3NBHRgibUIiAlNFKgfDlnJlYnJvMg5UZWxpYVNvbmVyYSBBQigAMdejcD0K1+s/OABCCAiAAhWamRlAQgcIURUAAOBAQggIlAEVzczMP0IHCFQVmpkJQUIICJYBFTMzE0BCBwhYFZqZ+UBCCAj6ARWamdk/QggImwEVzcysQEoHCAYVO6ysPkoHCAQVRYO4PkoHCAEVIg0APw===1465887564


[2] Line as spark hands it over:

CsAJCtwGCghwYWdlVmlldxC4PhjM1v66BSJFaHR0cDovL25hLnNlL3Nwb3J0ZW4vc3BvcnR0dC8xLjM5MTU5MjEtdXBwZ2lmdGVyLXNtZWRlcm5hLW1vdC1rb25rdXJzKjhVcHBnaWZ0ZXI6IFNtZWRlcm5hIG1vdCBrb25rdXJzIC0gU3BvcnQgKFRUKSAtIHd3dy5uYS5zZTJXaHR0cDovL25hLnNlL255aGV0ZXIvb3JlYnJvLzEuMzk2OTU0My1rcnlwaGFsLW9wcG5hci1mb3Itb2JvLWF0dC1iZWhhbGxhLXRqYW5zdGViaWxhcm5hOqECcWdrZWplNGpmLHBkMzBmdDRuNCxwbHB0b3JqNncscGxwczBvamZvLHBkYjVrZGM4eCxwbHBzN293Y3UscGE0czN1bXp5LHBhNHJla25sNyxwYTRyd3dxam4scGE0c21ra2Z4LHBkM2tpa3BmMixwZDNqcjE5dGMscGQ0ZGQ0M2F3LHAwZ3MwbmlqMSxwYTRvZTNrbXoscGE0cWJ3eDZxLHBkM2s2NW00dyxwYTRyazc3Z3IscGQzMHAzdW8wLHBkNGM1ajV5dixwbHB0c211NmcscGM3bXNibmM5LHBhNHFpaTdsZCxwbHB0dnpqdnUscGE0bmlsdmFnLHBhNHB6cjN2cyxwZDNsZDNnYmkscGl1a2M2NmVlLHB5dHoyOThzNErIAgoTNzI0NTY2NzU0MzQxNTUyOTQ4ORAAGAAioQJxZ2tlamU0amYscGQzMGZ0NG40LHBscHRvcmo2dyxwbHBzMG9qZm8scGRiNWtkYzh4LHBscHM3b3djdSxwYTRzM3VtenkscGE0cmVrbmw3LHBhNHJ3d3FqbixwYTRzbWtrZngscGQza2lrcGYyLHBkM2pyMTl0YyxwZDRkZDQzYXcscDBnczBuaWoxLHBhNG9lM2tteixwYTRxYnd4NnEscGQzazY1bTR3LHBhNHJrNzdncixwZDMwcDN1bzAscGQ0YzVqNXl2LHBscHRzbXU2ZyxwYzdtc2JuYzkscGE0cWlpN2xkLHBscHR2emp2dSxwYTRuaWx2YWcscGE0

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Suggestions on Lambda Architecture in Spark

2016-06-14 Thread Jörn Franke
You do not describe use cases, but technologies. First be aware on your needs 
and then check technologies.
Otherwise nobody can help you properly and you will end up with an inefficient 
stack for your needs.


> On 14 Jun 2016, at 00:52, KhajaAsmath Mohammed  
> wrote:
> 
> Hi,
> 
> In my current project, we are planning to implement POC for lambda 
> architecture using spark streaming. My use case would be 
> 
> Kafka --> bacth layer --> Saprk SQL --> Cassandra
> 
> Kafka --> Speed layer --> Spark Streaming --> Cassandra
> 
> Serving later --> Contact both the layers but I am not sure how the data is 
> queried in this case.
> 
> Does anyone have any github links or tutorials on how to implement lambda 
> architecture especially the serving layer?
> 
> Thanks,
> Asmath.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



cluster mode for Python on standalone cluster

2016-06-14 Thread Jan Sourek
The official documentation states 'Currently only YARN supports cluster mode
for Python applications.'
I would like to know if work is being done or planned to support cluster
mode for Python applications on standalone spark clusters? Does anyone know
if this is part of the roadmap for Spark 2.0 - or where should I ask such a
question ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cluster-mode-for-Python-on-standalone-cluster-tp27160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Limit pyspark.daemon threads

2016-06-14 Thread agateaaa
Hi,

I am seeing this issue too with pyspark (Using Spark 1.6.1).  I have set
spark.executor.cores to 1, but I see that whenever streaming batch starts
processing data, see python -m pyspark.daemon processes increase gradually
to about 5, (increasing CPU% on a box about 4-5 times, each pyspark.daemon
takes up around 100 % CPU)

After the processing is done 4 pyspark.daemon processes go away and we are
left with one till the next batch run. Also sometimes the  CPU usage for
executor process spikes to about 800% even though spark.executor.core is
set to 1

e.g. top output
PID USER  PR   NI  VIRT  RES  SHR S   %CPU %MEMTIME+  COMMAND
19634 spark 20   0 8871420 1.790g  32056 S 814.1  2.9   0:39.33
/usr/lib/j+ <--EXECUTOR

13897 spark 20   0   46576  17916   6720 S   100.0  0.0   0:00.17
python -m + <--pyspark.daemon
13991 spark 20   0   46524  15572   4124 S   98.0  0.0   0:08.18 python
-m + <--pyspark.daemon
14488 spark 20   0   46524  15636   4188 S   98.0  0.0   0:07.25 python
-m + <--pyspark.daemon
14514 spark 20   0   46524  15636   4188 S   94.0  0.0   0:06.72 python
-m + <--pyspark.daemon
14526 spark 20   0   48200  17172   4092 S   0.0  0.0   0:00.38 python
-m + <--pyspark.daemon



Is there any way to control the number of pyspark.daemon processes that get
spawned ?

Thank you
Agateaaa

On Sun, Mar 27, 2016 at 1:08 AM, Sven Krasser  wrote:

> Hey Ken,
>
> 1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap
> storage option using Alluxio, formerly Tachyon, with which I have no
> experience however.)
>
> 2. The worker memory setting is not a hard maximum unfortunately. What
> happens is that during aggregation the Python daemon will check its process
> size. If the size is larger than this setting, it will start spilling to
> disk. I've seen many occasions where my daemons grew larger. Also, you're
> relying on Python's memory management to free up space again once objects
> are evicted. In practice, leave this setting reasonably small but make sure
> there's enough free memory on the machine so you don't run into OOM
> conditions. If the lower memory setting causes strains for your users, make
> sure they increase the parallelism of their jobs (smaller partitions
> meaning less data is processed at a time).
>
> 3. I believe that is the behavior you can expect when setting
> spark.executor.cores. I've not experimented much with it and haven't looked
> at that part of the code, but what you describe also reflects my
> understanding. Please share your findings here, I'm sure those will be very
> helpful to others, too.
>
> One more suggestion for your users is to move to the Pyspark DataFrame
> API. Much of the processing will then happen in the JVM, and you will bump
> into fewer Python resource contention issues.
>
> Best,
> -Sven
>
>
> On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken 
> wrote:
>
>> This is extremely helpful!
>>
>> I’ll have to talk to my users about how the python memory limit should be
>> adjusted and what their expectations are. I’m fairly certain we bumped it
>> up in the dark past when jobs were failing because of insufficient memory
>> for the python processes.
>>
>> So just to make sure I’m understanding correctly:
>>
>>
>>- JVM memory (set by SPARK_EXECUTOR_MEMORY and/or
>>SPARK_WORKER_MEMORY?) is where the RDDs are stored. Currently both of 
>> those
>>values are set to 90GB
>>- spark.python.worker.memory controls how much RAM each python task
>>can take maximum (roughly speaking. Currently set to 4GB
>>- spark.task.cpus controls how many java worker threads will exist
>>and thus indirectly how many pyspark daemon processes will exist
>>
>>
>> I’m also looking into fixing my cron jobs so they don’t stack up by
>> implementing flock in the jobs and changing how teardowns of the spark
>> cluster work as far as failed workers.
>>
>> Thanks again,
>> —Ken
>>
>> On Mar 26, 2016, at 4:08 PM, Sven Krasser  wrote:
>>
>> My understanding is that the spark.executor.cores setting controls the
>> number of worker threads in the executor in the JVM. Each worker thread
>> communicates then with a pyspark daemon process (these are not threads) to
>> stream data into Python. There should be one daemon process per worker
>> thread (but as I mentioned I sometimes see a low multiple).
>>
>> Your 4GB limit for Python is fairly high, that means even for 12 workers
>> you're looking at a max of 48GB (and it goes frequently beyond that). You
>> will be better off using a lower number there and instead increasing the
>> parallelism of your job (i.e. dividing the job into more and smaller
>> partitions).
>>
>> On Sat, Mar 26, 2016 at 7:10 AM, Carlile, Ken 
>>  wrote:
>>
>>> Thanks, Sven!
>>>
>>> I know that I’ve messed up the memory allocation, but I’m trying not to
>>> think too much about that (because I’ve advertised it to my 

Re: Suggestions on Lambda Architecture in Spark

2016-06-14 Thread Sean Owen
Our labs project oryx is intended to be pretty much a POC of the
lambda architecture on Spark (for ML): http://oryx.io/ You might
consider reusing bits of that.

On Mon, Jun 13, 2016 at 11:52 PM, KhajaAsmath Mohammed
 wrote:
> Hi,
>
> In my current project, we are planning to implement POC for lambda
> architecture using spark streaming. My use case would be
>
> Kafka --> bacth layer --> Saprk SQL --> Cassandra
>
> Kafka --> Speed layer --> Spark Streaming --> Cassandra
>
> Serving later --> Contact both the layers but I am not sure how the data is
> queried in this case.
>
> Does anyone have any github links or tutorials on how to implement lambda
> architecture especially the serving layer?
>
> Thanks,
> Asmath.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

2016-06-14 Thread Ravi Aggarwal
Hi,

Is there any breakthrough here?

I had one more observation while debugging the issue
Here are the 4 types of data I had:

Da -> stored in parquet
Di -> stored in parquet
Dl1 -> parquet version of lookup
Dl2 -> hbase version of lookup

Joins performed and type of join done by spark:
Da and Di Sort-merge failed (OOM)
Da and Dl1   B-H passed
Da and Dl2   Sort-Mergepassed
Di and Dl1B-H passed
Di and Dl2Sort-Mergefailed (OOM)

From entries I can deduce that problem is with sort-merge join involving Di.
So the hbase thing is out of equation, that is not the culprit.
In physical plan I could see there are only two operations that are done 
additionally in sort-merge as compared to Broadcast-hash.

è Exchange Hashpartitioning

è Sort
And finally sort-merge join.

Can we deduce anything from this?

Thanks
Ravi
From: Ravi Aggarwal
Sent: Friday, June 10, 2016 12:31 PM
To: 'Ted Yu' 
Cc: user 
Subject: RE: OutOfMemory when doing joins in spark 2.0 while same code runs 
fine in spark 1.5.2

Hi Ted,
Thanks for the reply.

Here is the code
Btw – df.count is running fine on dataframe generated from this default source. 
I think it is something in the combination of join and hbase data source that 
is creating issue. But not sure entirely.
I have also dumped the physical plans of both approaches s3a/s3a join and 
s3a/hbase join, In case you want that let me know.

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, 
FileFormat}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.slf4j.LoggerFactory

class DefaultSource extends SchemaRelationProvider with FileFormat {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, 
String], schema: StructType) = {
new HBaseRelation(schema, parameters)(sqlContext)
  }

  def inferSchema(sparkSession: SparkSession,
  options: Map[String, String],
  files: Seq[FileStatus]): Option[StructType] = ???

  def prepareWrite(sparkSession: SparkSession,
   job: Job,
   options: Map[String, String],
   dataSchema: StructType): OutputWriterFactory = ???
}

object HBaseConfigurationUtil {
  lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
  val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.mapred.outputtable", tableName)
conf.set("hbase.zookeeper.quorum", hbaseQuorum)
conf
  }
}

class HBaseRelation(val schema: StructType, parameters: Map[String, String])
   (@transient val sqlContext: SQLContext) extends BaseRelation 
with TableScan {

  import sqlContext.sparkContext

  override def buildScan(): RDD[Row] = {

val bcDataSchema = sparkContext.broadcast(schema)

val tableName = parameters.get("path") match {
  case Some(t) => t
  case _ => throw new RuntimeException("Table name (path) not provided in 
parameters")
}

val hbaseQuorum = parameters.get("hbaseQuorum") match {
  case Some(s: String) => s
  case _ => throw new RuntimeException("hbaseQuorum not provided in 
options")
}

val rdd = sparkContext.newAPIHadoopRDD(
  HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum),
  classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result]
)

val rowRdd = rdd
  .map(tuple => tuple._2)
  .map { record =>

  val cells: java.util.List[Cell] = record.listCells()

  val splitRec = 
cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0 {(a, b) =>
a :+ CellUtil.cloneValue(b.asInstanceOf[Cell])
  }

  val keyFieldName = bcDataSchema.value.fields.filter(e => 
e.metadata.contains("isPrimary") && e.metadata.getBoolean("isPrimary"))(0).name

  val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b) => {
val fieldCell = b.asInstanceOf[Cell]
a :+ new 
String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset, 
fieldCell.getQualifierLength + fieldCell.getQualifierOffset)
  }
  }

  val res = Map(schemaArr.zip(splitRec).toArray: _*)

  val recordFields = res.map(value => {
val colDataType =
  try {