Re: Questions about count() performance with dataframes and parquet files

2020-02-17 Thread Enrico Minack
It is not about very large or small, it is about how large your cluster 
is w.r.t. your data. Caching is only useful if you have the respective 
memory available across your executors. Otherwise you could either 
materialize the Dataframe on HDFS (e.g. parquet or checkpoint) or indeed 
have to do the join twice. It's a memory-over-CPU trade-off.


Enrico


Am 17.02.20 um 22:06 schrieb Nicolas PARIS:

.dropDuplicates() \ .cache() |
Since df_actions is cached, you can count inserts and updates quickly
with only that one join in df_actions:

Hi Enrico. I am wondering if this is ok for very large tables ? Is
caching faster than recomputing both insert/update ?

Thanks

Enrico Minack  writes:


Ashley,

I want to suggest a few optimizations. The problem might go away but
at least performance should improve.
The freeze problems could have many reasons, the Spark UI SQL pages
and stages detail pages would be useful. You can send them privately,
if you wish.

1. the repartition(1) should be replaced by coalesce(1). The former
will shuffle all data, while the latter will read in the existing
partitions and not shuffle them again.
2. Repartitioning to a single partition is discouraged, unless you can
guarantee the data fit into one worker's memory.
3. You can compute Insert and Update in one go, so that you don't have
to join with df_reference twice.

|df_actions =
df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
how="left") \ .withColumn('|||_action|',
when(col('b.hashkey')||.isNull,
'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'),
'Update')) \| .select(col('_action'), *df_source_hashed) \
.dropDuplicates() \ .cache() |

Since df_actions is cached, you can count inserts and updates quickly
with only that one join in df_actions:

|inserts_count = df_actions|||.where(col('_action') === 
'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 
'Update')|.count()|

And you can get rid of the union:

|df_output = df_actions.where(col('_action').isNotNull) |

If you have to write that output to parquet anyway, then you can get
the count quickly from the parquet file if it is partitioned by the
_action column (Spark then only looks into parquet's metadata to get
the count, it does not read any row):

|df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
df_output =
|||sql_context.read.parquet('|/path/to/output.parquet|')
|inserts_count = |||df_output|.where(col('_action') ===
'Insert').count() updates_count = |||df_output|.where(col('_action')
=== 'Update').count() |

These are all just sketches, but I am sure you get the idea.

Enrico


Am 13.02.20 um 05:08 schrieb Ashley Hoff:

Hi,

I am currently working on an app using PySpark to produce an insert
and update daily delta capture, being outputted as Parquet.  This is
running on a 8 core 32 GB Linux server in standalone mode (set to 6
worker cores of 2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into
a dataframe, which has a hash of all records appended to it and
comparing it to a dataframe from yesterdays data (which has been
saved also as parquet).

As part of the monitoring and logging, I am trying to count the
number of records for the respective actions.  Example code:
|df_source = spark_session.read.format('jdbc'). df_reference =
sql_context.read.parquet('/path/to/reference.parquet')
df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
*df_source.columns))) \ .cache() df_inserts =
df_source_hashed.join(df_reference, pk_list, how='left_anti') \
.select(lit('Insert').alias('_action'), *df_source_hashed) \
.dropDuplicates() \ .cache() inserts_count = df_inserts.count()
df_updates =
df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
how="inner") \ .select(lit('Update').alias('_action'),
*df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \
.dropDuplicates() \ .cache() updates_count = df_updates.count()
df_output = df_inserts.union(df_updates)
df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')|
The above code is running two occurrences concurrently via Python
threading.Thread (this is to try and overcome the network bottle
neck connecting to the database server).

What I am finding is I am getting some very inconsistent behavior
with the counts.  Occasionally, it appears that it will freeze up on
a count operation for a few minutes and quite often that specific
data frame will have zero records in it.  According to the DAG
(which I am not 100% sure how to read) the following is the
processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0
  => WholeStageCodegen/MapPartitionsRDD [75]count at
NativeMethodAccessorImpl.java:0  =>
InMemoryTableScan/MapPartitionsRDD [78]count at
NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at

Re: Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-17 Thread Mich Talebzadeh
I stripped everything from the jar list. This is all I have

sspark-shell --jars shc-core-1.1.1-2.1-s_2.11.jar, \
  json4s-native_2.11-3.5.3.jar, \
  json4s-jackson_2.11-3.5.3.jar, \
  hbase-client-1.2.3.jar, \
  hbase-common-1.2.3.jar

Now I still get the same error!

scala> val df = withCatalog(catalog)
java.lang.NoSuchMethodError:
org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
  at
org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
  at
org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
  at
org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
  at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
  at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
  at withCatalog(:54)

Thanks


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 17 Feb 2020 at 21:37, Mich Talebzadeh 
wrote:

>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
> Many thanks both.
>
> Let me check and confirm.
>
> regards,
>
> Mich
>
>
> On Mon, 17 Feb 2020 at 21:33, Jörn Franke  wrote:
>
>> Is there a reason why different Scala (it seems at least 2.10/2.11)
>> versions are mixed? This never works.
>> Do you include by accident a dependency to with an old Scala version? Ie
>> the Hbase datasource maybe?
>>
>>
>> Am 17.02.2020 um 22:15 schrieb Mich Talebzadeh > >:
>>
>> 
>> Thanks Muthu,
>>
>>
>> I am using the following jar files for now in local mode i.e.  
>> spark-shell_local
>> --jars …..
>>
>> json4s-jackson_2.10-3.2.10.jar
>> json4s_2.11-3.2.11.jar
>> json4s-native_2.10-3.4.0.jar
>>
>> Which one is the incorrect one please/
>>
>> Regards,
>>
>> Mich
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 17 Feb 2020 at 20:28, Muthu Jayakumar  wrote:
>>
>>> I suspect the spark job is somehow having an incorrect (newer) version
>>> of json4s in the classpath. json4s 3.5.3 is the utmost version that can be
>>> used.
>>>
>>> Thanks,
>>> Muthu
>>>
>>> On Mon, Feb 17, 2020, 06:43 Mich Talebzadeh 
>>> wrote:
>>>
 Hi,

 Spark version 2.4.3
 Hbase 1.2.7

 Data is stored in Hbase as Json. example of a row shown below
 

 I am trying to read this table in Spark Scala

 import org.apache.spark.sql.{SQLContext, _}
 import org.apache.spark.sql.execution.datasources.hbase._
 import org.apache.spark.{SparkConf, SparkContext}
 import spark.sqlContext.implicits._
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
 import org.json4s.jackson.Serialization.{read => JsonRead}
 import org.json4s.jackson.Serialization.{read, write}
 def catalog = s"""{
  | "table":{"namespace":"trading", "name":"MARKETDATAHBASEBATCH",
  | "rowkey":"key",
  | "columns":{
  | "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
  | |"ticker":{"cf":"PRICE_INFO", "col":"ticker",
 "type":"string"},
  | |"timeissued":{"cf":"PRICE_INFO", "col":"timeissued",
 "type":"string"},
  | |"price":{"cf":"PRICE_INFO", "col":"price", "type":"string"}
  | |}
  | |}""".stripMargin
 def withCatalog(cat: String): DataFrame = {
spark.sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
 }
 val df = withCatalog(catalog)


 However, I am 

Re: Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-17 Thread Mich Talebzadeh
Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Many thanks both.

Let me check and confirm.

regards,

Mich


On Mon, 17 Feb 2020 at 21:33, Jörn Franke  wrote:

> Is there a reason why different Scala (it seems at least 2.10/2.11)
> versions are mixed? This never works.
> Do you include by accident a dependency to with an old Scala version? Ie
> the Hbase datasource maybe?
>
>
> Am 17.02.2020 um 22:15 schrieb Mich Talebzadeh  >:
>
> 
> Thanks Muthu,
>
>
> I am using the following jar files for now in local mode i.e.  
> spark-shell_local
> --jars …..
>
> json4s-jackson_2.10-3.2.10.jar
> json4s_2.11-3.2.11.jar
> json4s-native_2.10-3.4.0.jar
>
> Which one is the incorrect one please/
>
> Regards,
>
> Mich
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 17 Feb 2020 at 20:28, Muthu Jayakumar  wrote:
>
>> I suspect the spark job is somehow having an incorrect (newer) version of
>> json4s in the classpath. json4s 3.5.3 is the utmost version that can be
>> used.
>>
>> Thanks,
>> Muthu
>>
>> On Mon, Feb 17, 2020, 06:43 Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> Spark version 2.4.3
>>> Hbase 1.2.7
>>>
>>> Data is stored in Hbase as Json. example of a row shown below
>>> 
>>>
>>> I am trying to read this table in Spark Scala
>>>
>>> import org.apache.spark.sql.{SQLContext, _}
>>> import org.apache.spark.sql.execution.datasources.hbase._
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> import spark.sqlContext.implicits._
>>> import org.json4s._
>>> import org.json4s.jackson.JsonMethods._
>>> import org.json4s.jackson.Serialization.{read => JsonRead}
>>> import org.json4s.jackson.Serialization.{read, write}
>>> def catalog = s"""{
>>>  | "table":{"namespace":"trading", "name":"MARKETDATAHBASEBATCH",
>>>  | "rowkey":"key",
>>>  | "columns":{
>>>  | "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
>>>  | |"ticker":{"cf":"PRICE_INFO", "col":"ticker",
>>> "type":"string"},
>>>  | |"timeissued":{"cf":"PRICE_INFO", "col":"timeissued",
>>> "type":"string"},
>>>  | |"price":{"cf":"PRICE_INFO", "col":"price", "type":"string"}
>>>  | |}
>>>  | |}""".stripMargin
>>> def withCatalog(cat: String): DataFrame = {
>>>spark.sqlContext
>>>.read
>>>.options(Map(HBaseTableCatalog.tableCatalog->cat))
>>>.format("org.apache.spark.sql.execution.datasources.hbase")
>>>.load()
>>> }
>>> val df = withCatalog(catalog)
>>>
>>>
>>> However, I am getting this error
>>>
>>> Spark session available as 'spark'.
>>> java.lang.NoSuchMethodError:
>>> org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
>>>   at
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
>>>   at
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
>>>   at
>>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>>   at
>>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>>>   at
>>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>>>   at withCatalog(testme.scala:49)
>>>   ... 65 elided
>>>
>>> I have Googled it but with little luck!
>>>
>>> Thanks,
>>> Mich
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


Re: Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-17 Thread Jörn Franke
Is there a reason why different Scala (it seems at least 2.10/2.11) versions 
are mixed? This never works.
Do you include by accident a dependency to with an old Scala version? Ie the 
Hbase datasource maybe?


> Am 17.02.2020 um 22:15 schrieb Mich Talebzadeh :
> 
> 
> Thanks Muthu,
> 
> 
> I am using the following jar files for now in local mode i.e.  
> spark-shell_local --jars …..
> 
> json4s-jackson_2.10-3.2.10.jar
> json4s_2.11-3.2.11.jar
> json4s-native_2.10-3.4.0.jar
> 
> Which one is the incorrect one please/
> 
> Regards,
> 
> Mich
> 
> 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Mon, 17 Feb 2020 at 20:28, Muthu Jayakumar  wrote:
>> I suspect the spark job is somehow having an incorrect (newer) version of 
>> json4s in the classpath. json4s 3.5.3 is the utmost version that can be 
>> used. 
>> 
>> Thanks,
>> Muthu
>> 
>>> On Mon, Feb 17, 2020, 06:43 Mich Talebzadeh  
>>> wrote:
>>> Hi,
>>> 
>>> Spark version 2.4.3
>>> Hbase 1.2.7
>>> 
>>> Data is stored in Hbase as Json. example of a row shown below
>>> 
>>> 
>>> I am trying to read this table in Spark Scala
>>> 
>>> import org.apache.spark.sql.{SQLContext, _}
>>> import org.apache.spark.sql.execution.datasources.hbase._
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> import spark.sqlContext.implicits._
>>> import org.json4s._
>>> import org.json4s.jackson.JsonMethods._
>>> import org.json4s.jackson.Serialization.{read => JsonRead}
>>> import org.json4s.jackson.Serialization.{read, write}
>>> def catalog = s"""{
>>>  | "table":{"namespace":"trading", "name":"MARKETDATAHBASEBATCH",
>>>  | "rowkey":"key",
>>>  | "columns":{
>>>  | "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
>>>  | |"ticker":{"cf":"PRICE_INFO", "col":"ticker", "type":"string"},
>>>  | |"timeissued":{"cf":"PRICE_INFO", "col":"timeissued", 
>>> "type":"string"},
>>>  | |"price":{"cf":"PRICE_INFO", "col":"price", "type":"string"}
>>>  | |}
>>>  | |}""".stripMargin
>>> def withCatalog(cat: String): DataFrame = {
>>>spark.sqlContext
>>>.read
>>>.options(Map(HBaseTableCatalog.tableCatalog->cat))
>>>.format("org.apache.spark.sql.execution.datasources.hbase")
>>>.load()
>>> }
>>> val df = withCatalog(catalog)
>>> 
>>> 
>>> However, I am getting this error
>>> 
>>> Spark session available as 'spark'.
>>> java.lang.NoSuchMethodError: 
>>> org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
>>>   at 
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
>>>   at 
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
>>>   at 
>>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>>   at 
>>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>>>   at 
>>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>>>   at withCatalog(testme.scala:49)
>>>   ... 65 elided
>>> 
>>> I have Googled it but with little luck!
>>> 
>>> Thanks,
>>> Mich
>>> 
>>> http://talebzadehmich.wordpress.com
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  


Re: Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-17 Thread Muthu Jayakumar
Hello Mich,

Thank you for the mail. From, what I can understand from json4s history,
spark and the versions you have...
1. Apache Spark 2.4.3 uses json4s 3.5.3 (to be specific it uses
json4s-jackson)
2. json4s 3.2.11 and 3.2.10 is not compatible (ref:
https://github.com/json4s/json4s/issues/212)
3. I notice that you are using scala 2.10 and scala 2.11 versions on jars.
I believe spark 2.4.3 supports scala 2.11 or 2.12 only.

I would suggest using json4s-jackson, json4s and json4s-native be in
version 3.5.3 (for scala 2.11 or 2.12 depending on your spark version). In
case, if you want to use older version, make sure all of them are older
than 3.2.11 at the least.

Hope it helps.

Thanks,
Muthu


On Mon, Feb 17, 2020 at 1:15 PM Mich Talebzadeh 
wrote:

> Thanks Muthu,
>
>
> I am using the following jar files for now in local mode i.e.  
> spark-shell_local
> --jars …..
>
> json4s-jackson_2.10-3.2.10.jar
> json4s_2.11-3.2.11.jar
> json4s-native_2.10-3.4.0.jar
>
> Which one is the incorrect one please/
>
> Regards,
>
> Mich
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 17 Feb 2020 at 20:28, Muthu Jayakumar  wrote:
>
>> I suspect the spark job is somehow having an incorrect (newer) version of
>> json4s in the classpath. json4s 3.5.3 is the utmost version that can be
>> used.
>>
>> Thanks,
>> Muthu
>>
>> On Mon, Feb 17, 2020, 06:43 Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> Spark version 2.4.3
>>> Hbase 1.2.7
>>>
>>> Data is stored in Hbase as Json. example of a row shown below
>>> [image: image.png]
>>> I am trying to read this table in Spark Scala
>>>
>>> import org.apache.spark.sql.{SQLContext, _}
>>> import org.apache.spark.sql.execution.datasources.hbase._
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> import spark.sqlContext.implicits._
>>> import org.json4s._
>>> import org.json4s.jackson.JsonMethods._
>>> import org.json4s.jackson.Serialization.{read => JsonRead}
>>> import org.json4s.jackson.Serialization.{read, write}
>>> def catalog = s"""{
>>>  | "table":{"namespace":"trading", "name":"MARKETDATAHBASEBATCH",
>>>  | "rowkey":"key",
>>>  | "columns":{
>>>  | "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
>>>  | |"ticker":{"cf":"PRICE_INFO", "col":"ticker",
>>> "type":"string"},
>>>  | |"timeissued":{"cf":"PRICE_INFO", "col":"timeissued",
>>> "type":"string"},
>>>  | |"price":{"cf":"PRICE_INFO", "col":"price", "type":"string"}
>>>  | |}
>>>  | |}""".stripMargin
>>> def withCatalog(cat: String): DataFrame = {
>>>spark.sqlContext
>>>.read
>>>.options(Map(HBaseTableCatalog.tableCatalog->cat))
>>>.format("org.apache.spark.sql.execution.datasources.hbase")
>>>.load()
>>> }
>>> val df = withCatalog(catalog)
>>>
>>>
>>> However, I am getting this error
>>>
>>> Spark session available as 'spark'.
>>> java.lang.NoSuchMethodError:
>>> org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
>>>   at
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
>>>   at
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
>>>   at
>>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>>   at
>>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>>>   at
>>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>>>   at withCatalog(testme.scala:49)
>>>   ... 65 elided
>>>
>>> I have Googled it but with little luck!
>>>
>>> Thanks,
>>> Mich
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


Re: Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-17 Thread Mich Talebzadeh
Thanks Muthu,


I am using the following jar files for now in local mode i.e.
spark-shell_local
--jars …..

json4s-jackson_2.10-3.2.10.jar
json4s_2.11-3.2.11.jar
json4s-native_2.10-3.4.0.jar

Which one is the incorrect one please/

Regards,

Mich



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 17 Feb 2020 at 20:28, Muthu Jayakumar  wrote:

> I suspect the spark job is somehow having an incorrect (newer) version of
> json4s in the classpath. json4s 3.5.3 is the utmost version that can be
> used.
>
> Thanks,
> Muthu
>
> On Mon, Feb 17, 2020, 06:43 Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> Spark version 2.4.3
>> Hbase 1.2.7
>>
>> Data is stored in Hbase as Json. example of a row shown below
>> [image: image.png]
>> I am trying to read this table in Spark Scala
>>
>> import org.apache.spark.sql.{SQLContext, _}
>> import org.apache.spark.sql.execution.datasources.hbase._
>> import org.apache.spark.{SparkConf, SparkContext}
>> import spark.sqlContext.implicits._
>> import org.json4s._
>> import org.json4s.jackson.JsonMethods._
>> import org.json4s.jackson.Serialization.{read => JsonRead}
>> import org.json4s.jackson.Serialization.{read, write}
>> def catalog = s"""{
>>  | "table":{"namespace":"trading", "name":"MARKETDATAHBASEBATCH",
>>  | "rowkey":"key",
>>  | "columns":{
>>  | "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
>>  | |"ticker":{"cf":"PRICE_INFO", "col":"ticker", "type":"string"},
>>  | |"timeissued":{"cf":"PRICE_INFO", "col":"timeissued",
>> "type":"string"},
>>  | |"price":{"cf":"PRICE_INFO", "col":"price", "type":"string"}
>>  | |}
>>  | |}""".stripMargin
>> def withCatalog(cat: String): DataFrame = {
>>spark.sqlContext
>>.read
>>.options(Map(HBaseTableCatalog.tableCatalog->cat))
>>.format("org.apache.spark.sql.execution.datasources.hbase")
>>.load()
>> }
>> val df = withCatalog(catalog)
>>
>>
>> However, I am getting this error
>>
>> Spark session available as 'spark'.
>> java.lang.NoSuchMethodError:
>> org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
>>   at
>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
>>   at
>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
>>   at
>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>   at
>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>>   at
>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>>   at withCatalog(testme.scala:49)
>>   ... 65 elided
>>
>> I have Googled it but with little luck!
>>
>> Thanks,
>> Mich
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: Questions about count() performance with dataframes and parquet files

2020-02-17 Thread Nicolas PARIS


> .dropDuplicates() \ .cache() |
> Since df_actions is cached, you can count inserts and updates quickly
> with only that one join in df_actions:

Hi Enrico. I am wondering if this is ok for very large tables ? Is
caching faster than recomputing both insert/update ?

Thanks

Enrico Minack  writes:

> Ashley,
>
> I want to suggest a few optimizations. The problem might go away but
> at least performance should improve.
> The freeze problems could have many reasons, the Spark UI SQL pages
> and stages detail pages would be useful. You can send them privately,
> if you wish.
>
> 1. the repartition(1) should be replaced by coalesce(1). The former
> will shuffle all data, while the latter will read in the existing
> partitions and not shuffle them again.
> 2. Repartitioning to a single partition is discouraged, unless you can
> guarantee the data fit into one worker's memory.
> 3. You can compute Insert and Update in one go, so that you don't have
> to join with df_reference twice.
>
> |df_actions =
> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
> how="left") \ .withColumn('|||_action|',
> when(col('b.hashkey')||.isNull,
> 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'),
> 'Update')) \| .select(col('_action'), *df_source_hashed) \
> .dropDuplicates() \ .cache() |
>
> Since df_actions is cached, you can count inserts and updates quickly
> with only that one join in df_actions:
>
> |inserts_count = df_actions|||.where(col('_action') === 
> 'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 
> 'Update')|.count()|
>
> And you can get rid of the union:
>
> |df_output = df_actions.where(col('_action').isNotNull) |
>
> If you have to write that output to parquet anyway, then you can get
> the count quickly from the parquet file if it is partitioned by the
> _action column (Spark then only looks into parquet's metadata to get
> the count, it does not read any row):
>
> |df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
> df_output =
> |||sql_context.read.parquet('|/path/to/output.parquet|')
> |inserts_count = |||df_output|.where(col('_action') ===
> 'Insert').count() updates_count = |||df_output|.where(col('_action')
> === 'Update').count() |
>
> These are all just sketches, but I am sure you get the idea.
>
> Enrico
>
>
> Am 13.02.20 um 05:08 schrieb Ashley Hoff:
>> Hi,
>>
>> I am currently working on an app using PySpark to produce an insert
>> and update daily delta capture, being outputted as Parquet.  This is
>> running on a 8 core 32 GB Linux server in standalone mode (set to 6
>> worker cores of 2GB memory each) running Spark 2.4.3.
>>
>> This is being achieved by reading in data from a TSQL database, into
>> a dataframe, which has a hash of all records appended to it and
>> comparing it to a dataframe from yesterdays data (which has been
>> saved also as parquet).
>>
>> As part of the monitoring and logging, I am trying to count the
>> number of records for the respective actions.  Example code:
>> |df_source = spark_session.read.format('jdbc'). df_reference =
>> sql_context.read.parquet('/path/to/reference.parquet')
>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
>> *df_source.columns))) \ .cache() df_inserts =
>> df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>> .select(lit('Insert').alias('_action'), *df_source_hashed) \
>> .dropDuplicates() \ .cache() inserts_count = df_inserts.count()
>> df_updates =
>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
>> how="inner") \ .select(lit('Update').alias('_action'),
>> *df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \
>> .dropDuplicates() \ .cache() updates_count = df_updates.count()
>> df_output = df_inserts.union(df_updates)
>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')|
>> The above code is running two occurrences concurrently via Python
>> threading.Thread (this is to try and overcome the network bottle
>> neck connecting to the database server).
>>
>> What I am finding is I am getting some very inconsistent behavior
>> with the counts.  Occasionally, it appears that it will freeze up on
>> a count operation for a few minutes and quite often that specific
>> data frame will have zero records in it.  According to the DAG
>> (which I am not 100% sure how to read) the following is the
>> processing flow:
>>
>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0
>>  => WholeStageCodegen/MapPartitionsRDD [75]count at
>> NativeMethodAccessorImpl.java:0  =>
>> InMemoryTableScan/MapPartitionsRDD [78]count at
>> NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at
>> NativeMethodAccessorImpl.java:0 =>
>> WholeStageCodegen/MapPartitionsRDD [80]count at
>> NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
>> [81]count at NativeMethodAccessorImpl.java:0
>>
>> 

Re: Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-17 Thread Muthu Jayakumar
I suspect the spark job is somehow having an incorrect (newer) version of
json4s in the classpath. json4s 3.5.3 is the utmost version that can be
used.

Thanks,
Muthu

On Mon, Feb 17, 2020, 06:43 Mich Talebzadeh 
wrote:

> Hi,
>
> Spark version 2.4.3
> Hbase 1.2.7
>
> Data is stored in Hbase as Json. example of a row shown below
> [image: image.png]
> I am trying to read this table in Spark Scala
>
> import org.apache.spark.sql.{SQLContext, _}
> import org.apache.spark.sql.execution.datasources.hbase._
> import org.apache.spark.{SparkConf, SparkContext}
> import spark.sqlContext.implicits._
> import org.json4s._
> import org.json4s.jackson.JsonMethods._
> import org.json4s.jackson.Serialization.{read => JsonRead}
> import org.json4s.jackson.Serialization.{read, write}
> def catalog = s"""{
>  | "table":{"namespace":"trading", "name":"MARKETDATAHBASEBATCH",
>  | "rowkey":"key",
>  | "columns":{
>  | "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
>  | |"ticker":{"cf":"PRICE_INFO", "col":"ticker", "type":"string"},
>  | |"timeissued":{"cf":"PRICE_INFO", "col":"timeissued",
> "type":"string"},
>  | |"price":{"cf":"PRICE_INFO", "col":"price", "type":"string"}
>  | |}
>  | |}""".stripMargin
> def withCatalog(cat: String): DataFrame = {
>spark.sqlContext
>.read
>.options(Map(HBaseTableCatalog.tableCatalog->cat))
>.format("org.apache.spark.sql.execution.datasources.hbase")
>.load()
> }
> val df = withCatalog(catalog)
>
>
> However, I am getting this error
>
> Spark session available as 'spark'.
> java.lang.NoSuchMethodError:
> org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
>   at
> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
>   at
> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
>   at
> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>   at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>   at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>   at withCatalog(testme.scala:49)
>   ... 65 elided
>
> I have Googled it but with little luck!
>
> Thanks,
> Mich
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-17 Thread Mich Talebzadeh
Hi,

Spark version 2.4.3
Hbase 1.2.7

Data is stored in Hbase as Json. example of a row shown below
[image: image.png]
I am trying to read this table in Spark Scala

import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}
import spark.sqlContext.implicits._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization.{read => JsonRead}
import org.json4s.jackson.Serialization.{read, write}
def catalog = s"""{
 | "table":{"namespace":"trading", "name":"MARKETDATAHBASEBATCH",
 | "rowkey":"key",
 | "columns":{
 | "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
 | |"ticker":{"cf":"PRICE_INFO", "col":"ticker", "type":"string"},
 | |"timeissued":{"cf":"PRICE_INFO", "col":"timeissued",
"type":"string"},
 | |"price":{"cf":"PRICE_INFO", "col":"price", "type":"string"}
 | |}
 | |}""".stripMargin
def withCatalog(cat: String): DataFrame = {
   spark.sqlContext
   .read
   .options(Map(HBaseTableCatalog.tableCatalog->cat))
   .format("org.apache.spark.sql.execution.datasources.hbase")
   .load()
}
val df = withCatalog(catalog)


However, I am getting this error

Spark session available as 'spark'.
java.lang.NoSuchMethodError:
org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
  at
org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
  at
org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
  at
org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
  at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
  at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
  at withCatalog(testme.scala:49)
  ... 65 elided

I have Googled it but with little luck!

Thanks,
Mich

http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Apache Arrow support for Apache Spark

2020-02-17 Thread Chris Teoh
1. I'd also consider how you're structuring the data before applying the
join, naively doing the join could be expensive so doing a bit of data
preparation may be necessary to improve join performance. Try to get a
baseline as well. Arrow would help improve it.

2. Try storing it back as Parquet but in a way the next application can
take advantage of predicate pushdown.



On Mon, 17 Feb 2020, 6:41 pm Subash Prabakar, 
wrote:

> Hi Team,
>
> I have two questions regarding Arrow and Spark integration,
>
> 1. I am joining two huge tables (1PB) each - will the performance be huge
> when I use Arrow format before shuffling ? Will the
> serialization/deserialization cost have significant improvement?
>
> 2. Can we store the final data in Arrow format to HDFS and read them back
> in another Spark application? If so how could I do that ?
> Note: The dataset is transient  - separation of responsibility is for
> easier management. Though resiliency inside spark - we use different
> language (in our case Java and Python)
>
> Thanks,
> Subash
>
>


[ML] [How-to]: How to unload the loaded W2V model in Pyspark?

2020-02-17 Thread Zhefu PENG
Hi all,

I'm using pyspark and Spark-ml to train and use Word2Vect model, here is
the logic of my program:

model = Word2VecModel.load("save path")

result_list = model.findSynonymsArray(target, top_N)

Then I use the graphframe and result_list to create graph and do some
computing. However the program failed due to the out of memory error: xxx
is running beyond physical memory limits. As a result, I want to delete the
word2vec model to free memory, since I don't need to use it after getting
the result_list.

I tried using del function in Python, and
spark.sparkContext._gateway.detach(model._java_obj)
as
https://stackoverflow.com/questions/58759929/how-to-free-the-memory-taken-by-a-pyspark-model-javamodel
suggested.
But neither two worked.

Is there anyway to unload or delete the loaded w2v model in Spark or
Pyspark?

Really appreciate for any reply and help.

Best,
Zhefu