Re: Spark SaveMode

2019-07-19 Thread Jörn Franke
This is not an issue of Spark, but the underlying database. The primary key 
constraint has a purpose and ignoring it would defeat that purpose. 
Then to handle your use case, you would need to make multiple decisions that 
may imply you don’t want to simply insert if not exist. Maybe you want to do an 
upsert or how do you want to take into account deleted data?
You could use a Merge in Oracle to achieve what you have in mind. In Spark you 
would need to fetch the data from the Oracle database and then merge it in 
Spark with the new data depending on your requirements.

> Am 20.07.2019 um 06:34 schrieb Richard :
> 
> Any reason why Spark's SaveMode doesn't have mode that ignore any Primary 
> Key/Unique constraint violations?
> 
> Let's say I'm using spark to migrate some data from Cassandra to Oracle, I 
> want the insert operation to be "ignore if exist primary keys" instead of 
> failing the whole batch.
> 
> Thanks, 
> Richard 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark SaveMode

2019-07-19 Thread Richard
Any reason why Spark's SaveMode doesn't have mode that ignore any Primary
Key/Unique constraint violations?

Let's say I'm using spark to migrate some data from Cassandra to Oracle, I
want the insert operation to be "ignore if exist primary keys" instead of
failing the whole batch.

Thanks,
Richard


Re: Spark dataset to explode json string

2019-07-19 Thread Richard
ok, thanks,
I have another way that is currently working but not efficient if I have to
extract lot of fields
that is creating udf for each extraction:
df = df.withColumn("foo", getfoo.apply(col("jsonCol")))
.withColumn("bar", getbar.apply(col("jsonCol")));






On Fri, Jul 19, 2019 at 8:54 PM Mich Talebzadeh 
wrote:

> You can try to split the {"foo": "val1", "bar": "val2"} as below.
>
>
> /*
> This is an example of output!
> (c1003d93-5157-4092-86cf-0607157291d8,{"rowkey":"c1003d93-5157-4092-86cf-0607157291d8","ticker":"TSCO",
> "timeissued":"2019-07-01T09:10:55", "price":395.25})
> {"rowkey":"c1003d93-5157-4092-86cf-0607157291d8","ticker":"TSCO",
> "timeissued":"2019-07-01T09:10:55", "price":395.25}
> */
> // Then I do this to get individual values
>
>var rowkey =
> row._2.split(',').view(0).split(':').view(1).toString.drop(1).dropRight(1).trim
>var ticker = row._2.split(',').view(1).
> split(':').view(1).toString.drop(1).dropRight(1).trim
>var timeissued = row._2.split(',').view(2).
> toString.substring(14,35).drop(1).dropRight(1).trim
>var price =
> row._2.split(',').view(3).split(':').view(1).toString.dropRight(1).toDouble
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 20 Jul 2019 at 00:00, Richard  wrote:
>
>> example of jsonCol (String):
>> {"foo": "val1", "bar": "val2"}
>>
>> Thanks,
>>
>> On Fri, Jul 19, 2019 at 3:57 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Sure.
>>>
>>> Do you have an example of a record from Cassandra read into df by any
>>> chance? Only columns that need to go into Oracle.
>>>
>>> df.select('col1, 'col2, 'jsonCol).take(1).foreach(println)
>>>
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 19 Jul 2019 at 23:17, Richard  wrote:
>>>
 Thanks for the reply,
 my situation is little different than your sample:
 Following is the schema from source (df.printSchema();)

 root
  |-- id: string (nullable = true)
  |-- col1: string (nullable = true)
  |-- col2: string (nullable = true)
  |-- jsonCol: string (nullable = true)

 I want extract multiple fields from jsonCol to schema to be
 root
  |-- id: string (nullable = true)
  |-- col1: string (nullable = true)
  |-- col2: string (nullable = true)
  |-- jsonCol: string (nullable = true)
  |-- foo: string (nullable = true)
  |-- bar: string (nullable = true)
 ...
 Thanks,



 On Fri, Jul 19, 2019 at 2:26 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Hi Richard,
>
> You can use the following to read JSON data into DF. The example is
> reading JSON from Kafka topic
>
>   val sc = spark.sparkContext
>  import spark.implicits._
>  // Use map to create the new RDD using the value portion of
> the pair.
>  val jsonRDD = pricesRDD.map(x => x._2)
>  // Create DataFrame from jsonRDD
>  val jsonDF = sqlContext.read.json(jsonRDD)
>
> This is an example of reading a MongoDB document into Spark
>
> dfrddMongoDB.printSchema
> /*
> root
>  |-- _id: struct (nullable = true)
>  ||-- oid: string (nullable = true)
>  |-- operation: struct (nullable = true)
>  ||-- op_type: integer (nullable = true)
>  ||-- op_time: string (nullable = true)
>  |-- priceInfo: struct (nullable = true)
>  ||-- key: string (nullable = true)
>  ||-- ticker: string (nullable = true)
>  ||-- timeissued: string (nullable = true)
>  ||-- price: double (nullable = true)
>  ||-- currency: string (nullable = true)
> // one example of mongo document from mongo collection
> {
> "_id" : 

Re: Spark dataset to explode json string

2019-07-19 Thread Mich Talebzadeh
You can try to split the {"foo": "val1", "bar": "val2"} as below.


/*
This is an example of output!
(c1003d93-5157-4092-86cf-0607157291d8,{"rowkey":"c1003d93-5157-4092-86cf-0607157291d8","ticker":"TSCO",
"timeissued":"2019-07-01T09:10:55", "price":395.25})
{"rowkey":"c1003d93-5157-4092-86cf-0607157291d8","ticker":"TSCO",
"timeissued":"2019-07-01T09:10:55", "price":395.25}
*/
// Then I do this to get individual values

   var rowkey =
row._2.split(',').view(0).split(':').view(1).toString.drop(1).dropRight(1).trim
   var ticker = row._2.split(',').view(1).
split(':').view(1).toString.drop(1).dropRight(1).trim
   var timeissued = row._2.split(',').view(2).
toString.substring(14,35).drop(1).dropRight(1).trim
   var price =
row._2.split(',').view(3).split(':').view(1).toString.dropRight(1).toDouble

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 20 Jul 2019 at 00:00, Richard  wrote:

> example of jsonCol (String):
> {"foo": "val1", "bar": "val2"}
>
> Thanks,
>
> On Fri, Jul 19, 2019 at 3:57 PM Mich Talebzadeh 
> wrote:
>
>> Sure.
>>
>> Do you have an example of a record from Cassandra read into df by any
>> chance? Only columns that need to go into Oracle.
>>
>> df.select('col1, 'col2, 'jsonCol).take(1).foreach(println)
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 19 Jul 2019 at 23:17, Richard  wrote:
>>
>>> Thanks for the reply,
>>> my situation is little different than your sample:
>>> Following is the schema from source (df.printSchema();)
>>>
>>> root
>>>  |-- id: string (nullable = true)
>>>  |-- col1: string (nullable = true)
>>>  |-- col2: string (nullable = true)
>>>  |-- jsonCol: string (nullable = true)
>>>
>>> I want extract multiple fields from jsonCol to schema to be
>>> root
>>>  |-- id: string (nullable = true)
>>>  |-- col1: string (nullable = true)
>>>  |-- col2: string (nullable = true)
>>>  |-- jsonCol: string (nullable = true)
>>>  |-- foo: string (nullable = true)
>>>  |-- bar: string (nullable = true)
>>> ...
>>> Thanks,
>>>
>>>
>>>
>>> On Fri, Jul 19, 2019 at 2:26 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Hi Richard,

 You can use the following to read JSON data into DF. The example is
 reading JSON from Kafka topic

   val sc = spark.sparkContext
  import spark.implicits._
  // Use map to create the new RDD using the value portion of
 the pair.
  val jsonRDD = pricesRDD.map(x => x._2)
  // Create DataFrame from jsonRDD
  val jsonDF = sqlContext.read.json(jsonRDD)

 This is an example of reading a MongoDB document into Spark

 dfrddMongoDB.printSchema
 /*
 root
  |-- _id: struct (nullable = true)
  ||-- oid: string (nullable = true)
  |-- operation: struct (nullable = true)
  ||-- op_type: integer (nullable = true)
  ||-- op_time: string (nullable = true)
  |-- priceInfo: struct (nullable = true)
  ||-- key: string (nullable = true)
  ||-- ticker: string (nullable = true)
  ||-- timeissued: string (nullable = true)
  ||-- price: double (nullable = true)
  ||-- currency: string (nullable = true)
 // one example of mongo document from mongo collection
 {
 "_id" : ObjectId("5cae4fa25d8b5279db785b43"),
 "priceInfo" : {
 "key" : "2ca8de24-eaf3-40d4-b0ef-c8b56534ceb5",
 "ticker" : "ORCL",
 "timeissued" : "2019-04-10T21:20:57",
 "price" : 41.13,
 "currency" : "GBP"
 },
 "operation" : {
 "op_type" : NumberInt(1),
 "op_time" : "1554927506012"
 }
 }
 */
 // Flatten the structs
 val df = dfrddMongoDB.
select(
 

Re: Spark and Oozie

2019-07-19 Thread William Shen
Dennis, do you know what’s taking the additional time? Is it the Spark Job,
or oozie waiting for allocation from YARN? Do you have resource contention
issue in YARN?

On Fri, Jul 19, 2019 at 12:24 AM Bartek Dobija 
wrote:

> Hi Dennis,
>
> Oozie jobs shouldn't take that long in a well configured cluster. Oozie
> allocates it's own resources in Yarn which may require fine tuning. Check
> if YARN gives resources to the Oozie job immediately which may be one of
> the reasons and change jobs priorities in YARN scheduling configuration.
>
> Alternatively check the Apache Airflow project which is a good alternative
> to Oozie.
>
> Regards,
> Bartek
>
> On Fri, Jul 19, 2019, 09:09 Dennis Suhari 
> wrote:
>
>>
>> Dear experts,
>>
>> I am using Spark for processing data from HDFS (hadoop). These Spark
>> application are data pipelines, data wrangling and machine learning
>> applications. Thus Spark submits its job using YARN.
>> This also works well. For scheduling I am now trying to use Apache Oozie,
>> but I am facing performqnce impacts. A Spark job which tooks 44 seconds
>> when submitting it via CLI now takes nearly 3 Minutes.
>>
>> Have you faced similar experiences in using Oozie for scheduling Spark
>> application jobs ? What alternative workflow tools are you using for
>> scheduling Spark jobs on Hadoop ?
>>
>>
>> Br,
>>
>> Dennis
>>
>> Von meinem iPhone gesendet
>> Von meinem iPhone gesendet
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark dataset to explode json string

2019-07-19 Thread Richard
example of jsonCol (String):
{"foo": "val1", "bar": "val2"}

Thanks,

On Fri, Jul 19, 2019 at 3:57 PM Mich Talebzadeh 
wrote:

> Sure.
>
> Do you have an example of a record from Cassandra read into df by any
> chance? Only columns that need to go into Oracle.
>
> df.select('col1, 'col2, 'jsonCol).take(1).foreach(println)
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 19 Jul 2019 at 23:17, Richard  wrote:
>
>> Thanks for the reply,
>> my situation is little different than your sample:
>> Following is the schema from source (df.printSchema();)
>>
>> root
>>  |-- id: string (nullable = true)
>>  |-- col1: string (nullable = true)
>>  |-- col2: string (nullable = true)
>>  |-- jsonCol: string (nullable = true)
>>
>> I want extract multiple fields from jsonCol to schema to be
>> root
>>  |-- id: string (nullable = true)
>>  |-- col1: string (nullable = true)
>>  |-- col2: string (nullable = true)
>>  |-- jsonCol: string (nullable = true)
>>  |-- foo: string (nullable = true)
>>  |-- bar: string (nullable = true)
>> ...
>> Thanks,
>>
>>
>>
>> On Fri, Jul 19, 2019 at 2:26 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Richard,
>>>
>>> You can use the following to read JSON data into DF. The example is
>>> reading JSON from Kafka topic
>>>
>>>   val sc = spark.sparkContext
>>>  import spark.implicits._
>>>  // Use map to create the new RDD using the value portion of the
>>> pair.
>>>  val jsonRDD = pricesRDD.map(x => x._2)
>>>  // Create DataFrame from jsonRDD
>>>  val jsonDF = sqlContext.read.json(jsonRDD)
>>>
>>> This is an example of reading a MongoDB document into Spark
>>>
>>> dfrddMongoDB.printSchema
>>> /*
>>> root
>>>  |-- _id: struct (nullable = true)
>>>  ||-- oid: string (nullable = true)
>>>  |-- operation: struct (nullable = true)
>>>  ||-- op_type: integer (nullable = true)
>>>  ||-- op_time: string (nullable = true)
>>>  |-- priceInfo: struct (nullable = true)
>>>  ||-- key: string (nullable = true)
>>>  ||-- ticker: string (nullable = true)
>>>  ||-- timeissued: string (nullable = true)
>>>  ||-- price: double (nullable = true)
>>>  ||-- currency: string (nullable = true)
>>> // one example of mongo document from mongo collection
>>> {
>>> "_id" : ObjectId("5cae4fa25d8b5279db785b43"),
>>> "priceInfo" : {
>>> "key" : "2ca8de24-eaf3-40d4-b0ef-c8b56534ceb5",
>>> "ticker" : "ORCL",
>>> "timeissued" : "2019-04-10T21:20:57",
>>> "price" : 41.13,
>>> "currency" : "GBP"
>>> },
>>> "operation" : {
>>> "op_type" : NumberInt(1),
>>> "op_time" : "1554927506012"
>>> }
>>> }
>>> */
>>> // Flatten the structs
>>> val df = dfrddMongoDB.
>>>select(
>>> 'priceInfo.getItem("key").as("key")
>>>   , 'priceInfo.getItem("ticker").as("ticker")
>>>   , 'priceInfo.getItem("timeissued").as("timeissued")
>>>   , 'priceInfo.getItem("price").as("price")
>>>   , 'priceInfo.getItem("currency").as("currency")
>>>   , 'operation.getItem("op_type").as("op_type")
>>>   , 'operation.getItem("op_time").as("op_time")
>>>  )
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 19 Jul 2019 at 21:48, Richard  wrote:
>>>
 let's say I use spark to migrate some data from Cassandra table to
 Oracle table
 Cassandra Table:
 CREATE TABLE SOURCE(
 id UUID PRIMARY KEY,
 col1 text,
 col2 text,
 jsonCol text
 );
 example jsonCol value: {"foo": "val1", "bar", "val2"}

 I am trying to extract fields from the json column while importing to
 Oracle table
 

Re: Spark dataset to explode json string

2019-07-19 Thread Mich Talebzadeh
Sure.

Do you have an example of a record from Cassandra read into df by any
chance? Only columns that need to go into Oracle.

df.select('col1, 'col2, 'jsonCol).take(1).foreach(println)


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 19 Jul 2019 at 23:17, Richard  wrote:

> Thanks for the reply,
> my situation is little different than your sample:
> Following is the schema from source (df.printSchema();)
>
> root
>  |-- id: string (nullable = true)
>  |-- col1: string (nullable = true)
>  |-- col2: string (nullable = true)
>  |-- jsonCol: string (nullable = true)
>
> I want extract multiple fields from jsonCol to schema to be
> root
>  |-- id: string (nullable = true)
>  |-- col1: string (nullable = true)
>  |-- col2: string (nullable = true)
>  |-- jsonCol: string (nullable = true)
>  |-- foo: string (nullable = true)
>  |-- bar: string (nullable = true)
> ...
> Thanks,
>
>
>
> On Fri, Jul 19, 2019 at 2:26 PM Mich Talebzadeh 
> wrote:
>
>> Hi Richard,
>>
>> You can use the following to read JSON data into DF. The example is
>> reading JSON from Kafka topic
>>
>>   val sc = spark.sparkContext
>>  import spark.implicits._
>>  // Use map to create the new RDD using the value portion of the
>> pair.
>>  val jsonRDD = pricesRDD.map(x => x._2)
>>  // Create DataFrame from jsonRDD
>>  val jsonDF = sqlContext.read.json(jsonRDD)
>>
>> This is an example of reading a MongoDB document into Spark
>>
>> dfrddMongoDB.printSchema
>> /*
>> root
>>  |-- _id: struct (nullable = true)
>>  ||-- oid: string (nullable = true)
>>  |-- operation: struct (nullable = true)
>>  ||-- op_type: integer (nullable = true)
>>  ||-- op_time: string (nullable = true)
>>  |-- priceInfo: struct (nullable = true)
>>  ||-- key: string (nullable = true)
>>  ||-- ticker: string (nullable = true)
>>  ||-- timeissued: string (nullable = true)
>>  ||-- price: double (nullable = true)
>>  ||-- currency: string (nullable = true)
>> // one example of mongo document from mongo collection
>> {
>> "_id" : ObjectId("5cae4fa25d8b5279db785b43"),
>> "priceInfo" : {
>> "key" : "2ca8de24-eaf3-40d4-b0ef-c8b56534ceb5",
>> "ticker" : "ORCL",
>> "timeissued" : "2019-04-10T21:20:57",
>> "price" : 41.13,
>> "currency" : "GBP"
>> },
>> "operation" : {
>> "op_type" : NumberInt(1),
>> "op_time" : "1554927506012"
>> }
>> }
>> */
>> // Flatten the structs
>> val df = dfrddMongoDB.
>>select(
>> 'priceInfo.getItem("key").as("key")
>>   , 'priceInfo.getItem("ticker").as("ticker")
>>   , 'priceInfo.getItem("timeissued").as("timeissued")
>>   , 'priceInfo.getItem("price").as("price")
>>   , 'priceInfo.getItem("currency").as("currency")
>>   , 'operation.getItem("op_type").as("op_type")
>>   , 'operation.getItem("op_time").as("op_time")
>>  )
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 19 Jul 2019 at 21:48, Richard  wrote:
>>
>>> let's say I use spark to migrate some data from Cassandra table to
>>> Oracle table
>>> Cassandra Table:
>>> CREATE TABLE SOURCE(
>>> id UUID PRIMARY KEY,
>>> col1 text,
>>> col2 text,
>>> jsonCol text
>>> );
>>> example jsonCol value: {"foo": "val1", "bar", "val2"}
>>>
>>> I am trying to extract fields from the json column while importing to
>>> Oracle table
>>> Destination (
>>> id varchar2(50),
>>> col1 varchar(128).
>>> col2 varchar(128)
>>> raw_json clob,
>>> foo varchar2(256),
>>> bar varchar2(256)
>>> );
>>>
>>> What I have done:
>>> separate udf for foo and bar.
>>> This approach works, but that also means I need to deserialize raw json
>>> to json object twice, things getting worse if 

Re: Spark dataset to explode json string

2019-07-19 Thread Richard
Thanks for the reply,
my situation is little different than your sample:
Following is the schema from source (df.printSchema();)

root
 |-- id: string (nullable = true)
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- jsonCol: string (nullable = true)

I want extract multiple fields from jsonCol to schema to be
root
 |-- id: string (nullable = true)
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- jsonCol: string (nullable = true)
 |-- foo: string (nullable = true)
 |-- bar: string (nullable = true)
...
Thanks,



On Fri, Jul 19, 2019 at 2:26 PM Mich Talebzadeh 
wrote:

> Hi Richard,
>
> You can use the following to read JSON data into DF. The example is
> reading JSON from Kafka topic
>
>   val sc = spark.sparkContext
>  import spark.implicits._
>  // Use map to create the new RDD using the value portion of the
> pair.
>  val jsonRDD = pricesRDD.map(x => x._2)
>  // Create DataFrame from jsonRDD
>  val jsonDF = sqlContext.read.json(jsonRDD)
>
> This is an example of reading a MongoDB document into Spark
>
> dfrddMongoDB.printSchema
> /*
> root
>  |-- _id: struct (nullable = true)
>  ||-- oid: string (nullable = true)
>  |-- operation: struct (nullable = true)
>  ||-- op_type: integer (nullable = true)
>  ||-- op_time: string (nullable = true)
>  |-- priceInfo: struct (nullable = true)
>  ||-- key: string (nullable = true)
>  ||-- ticker: string (nullable = true)
>  ||-- timeissued: string (nullable = true)
>  ||-- price: double (nullable = true)
>  ||-- currency: string (nullable = true)
> // one example of mongo document from mongo collection
> {
> "_id" : ObjectId("5cae4fa25d8b5279db785b43"),
> "priceInfo" : {
> "key" : "2ca8de24-eaf3-40d4-b0ef-c8b56534ceb5",
> "ticker" : "ORCL",
> "timeissued" : "2019-04-10T21:20:57",
> "price" : 41.13,
> "currency" : "GBP"
> },
> "operation" : {
> "op_type" : NumberInt(1),
> "op_time" : "1554927506012"
> }
> }
> */
> // Flatten the structs
> val df = dfrddMongoDB.
>select(
> 'priceInfo.getItem("key").as("key")
>   , 'priceInfo.getItem("ticker").as("ticker")
>   , 'priceInfo.getItem("timeissued").as("timeissued")
>   , 'priceInfo.getItem("price").as("price")
>   , 'priceInfo.getItem("currency").as("currency")
>   , 'operation.getItem("op_type").as("op_type")
>   , 'operation.getItem("op_time").as("op_time")
>  )
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 19 Jul 2019 at 21:48, Richard  wrote:
>
>> let's say I use spark to migrate some data from Cassandra table to Oracle
>> table
>> Cassandra Table:
>> CREATE TABLE SOURCE(
>> id UUID PRIMARY KEY,
>> col1 text,
>> col2 text,
>> jsonCol text
>> );
>> example jsonCol value: {"foo": "val1", "bar", "val2"}
>>
>> I am trying to extract fields from the json column while importing to
>> Oracle table
>> Destination (
>> id varchar2(50),
>> col1 varchar(128).
>> col2 varchar(128)
>> raw_json clob,
>> foo varchar2(256),
>> bar varchar2(256)
>> );
>>
>> What I have done:
>> separate udf for foo and bar.
>> This approach works, but that also means I need to deserialize raw json
>> to json object twice, things getting worse if i want to extract many fields
>> from the json.
>> example:
>> df = df.withColumn("foo", getFoo.apply(col("jsonCol")))
>>  .withColumn("bar", getBar.apply(col("jsonCol")));
>> // getFoo and getBar are UserDefinedFunction
>>
>> how do I parse raw json string only once and explode fields I need to
>> multiple columns into Oracle in spark?
>>
>> Thanks,
>>
>>
>>
>>
>>
>>


Re: Spark dataset to explode json string

2019-07-19 Thread Mich Talebzadeh
Hi Richard,

You can use the following to read JSON data into DF. The example is reading
JSON from Kafka topic

  val sc = spark.sparkContext
 import spark.implicits._
 // Use map to create the new RDD using the value portion of the
pair.
 val jsonRDD = pricesRDD.map(x => x._2)
 // Create DataFrame from jsonRDD
 val jsonDF = sqlContext.read.json(jsonRDD)

This is an example of reading a MongoDB document into Spark

dfrddMongoDB.printSchema
/*
root
 |-- _id: struct (nullable = true)
 ||-- oid: string (nullable = true)
 |-- operation: struct (nullable = true)
 ||-- op_type: integer (nullable = true)
 ||-- op_time: string (nullable = true)
 |-- priceInfo: struct (nullable = true)
 ||-- key: string (nullable = true)
 ||-- ticker: string (nullable = true)
 ||-- timeissued: string (nullable = true)
 ||-- price: double (nullable = true)
 ||-- currency: string (nullable = true)
// one example of mongo document from mongo collection
{
"_id" : ObjectId("5cae4fa25d8b5279db785b43"),
"priceInfo" : {
"key" : "2ca8de24-eaf3-40d4-b0ef-c8b56534ceb5",
"ticker" : "ORCL",
"timeissued" : "2019-04-10T21:20:57",
"price" : 41.13,
"currency" : "GBP"
},
"operation" : {
"op_type" : NumberInt(1),
"op_time" : "1554927506012"
}
}
*/
// Flatten the structs
val df = dfrddMongoDB.
   select(
'priceInfo.getItem("key").as("key")
  , 'priceInfo.getItem("ticker").as("ticker")
  , 'priceInfo.getItem("timeissued").as("timeissued")
  , 'priceInfo.getItem("price").as("price")
  , 'priceInfo.getItem("currency").as("currency")
  , 'operation.getItem("op_type").as("op_type")
  , 'operation.getItem("op_time").as("op_time")
 )

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 19 Jul 2019 at 21:48, Richard  wrote:

> let's say I use spark to migrate some data from Cassandra table to Oracle
> table
> Cassandra Table:
> CREATE TABLE SOURCE(
> id UUID PRIMARY KEY,
> col1 text,
> col2 text,
> jsonCol text
> );
> example jsonCol value: {"foo": "val1", "bar", "val2"}
>
> I am trying to extract fields from the json column while importing to
> Oracle table
> Destination (
> id varchar2(50),
> col1 varchar(128).
> col2 varchar(128)
> raw_json clob,
> foo varchar2(256),
> bar varchar2(256)
> );
>
> What I have done:
> separate udf for foo and bar.
> This approach works, but that also means I need to deserialize raw json to
> json object twice, things getting worse if i want to extract many fields
> from the json.
> example:
> df = df.withColumn("foo", getFoo.apply(col("jsonCol")))
>  .withColumn("bar", getBar.apply(col("jsonCol")));
> // getFoo and getBar are UserDefinedFunction
>
> how do I parse raw json string only once and explode fields I need to
> multiple columns into Oracle in spark?
>
> Thanks,
>
>
>
>
>
>


Spark dataset to explode json string

2019-07-19 Thread Richard
let's say I use spark to migrate some data from Cassandra table to Oracle
table
Cassandra Table:
CREATE TABLE SOURCE(
id UUID PRIMARY KEY,
col1 text,
col2 text,
jsonCol text
);
example jsonCol value: {"foo": "val1", "bar", "val2"}

I am trying to extract fields from the json column while importing to
Oracle table
Destination (
id varchar2(50),
col1 varchar(128).
col2 varchar(128)
raw_json clob,
foo varchar2(256),
bar varchar2(256)
);

What I have done:
separate udf for foo and bar.
This approach works, but that also means I need to deserialize raw json to
json object twice, things getting worse if i want to extract many fields
from the json.
example:
df = df.withColumn("foo", getFoo.apply(col("jsonCol")))
 .withColumn("bar", getBar.apply(col("jsonCol")));
// getFoo and getBar are UserDefinedFunction

how do I parse raw json string only once and explode fields I need to
multiple columns into Oracle in spark?

Thanks,


Spark ImportError: No module named XXX

2019-07-19 Thread zenglong chen
Hi,all:
aused by: org.apache.spark.api.python.PythonException: Traceback (most
recent call last):
  File "/home/ubuntu/spark-2.4.3/python/lib/pyspark.zip/pyspark/worker.py",
line 364, in main
func, profiler, deserializer, serializer = read_command(pickleSer,
infile)
  File "/home/ubuntu/spark-2.4.3/python/lib/pyspark.zip/pyspark/worker.py",
line 69, in read_command
command = serializer._read_with_length(file)
  File
"/home/ubuntu/spark-2.4.3/python/lib/pyspark.zip/pyspark/serializers.py",
line 172, in _read_with_length
return self.loads(obj)
  File
"/home/ubuntu/spark-2.4.3/python/lib/pyspark.zip/pyspark/serializers.py",
line 583, in loads
return pickle.loads(obj)
ImportError: No module named
risk_platform.mutli_process_thread.decision_tree

Is the problem caused by comunication between spark master and slave in
pickle way?
Python cannot pickle instancemethod and code object,So that is why load
error?
Thanks for answer!


Unsubscribe

2019-07-19 Thread Aslan Bakirov



Re: Spark and Oozie

2019-07-19 Thread Bartek Dobija
Hi Dennis,

Oozie jobs shouldn't take that long in a well configured cluster. Oozie
allocates it's own resources in Yarn which may require fine tuning. Check
if YARN gives resources to the Oozie job immediately which may be one of
the reasons and change jobs priorities in YARN scheduling configuration.

Alternatively check the Apache Airflow project which is a good alternative
to Oozie.

Regards,
Bartek

On Fri, Jul 19, 2019, 09:09 Dennis Suhari 
wrote:

>
> Dear experts,
>
> I am using Spark for processing data from HDFS (hadoop). These Spark
> application are data pipelines, data wrangling and machine learning
> applications. Thus Spark submits its job using YARN.
> This also works well. For scheduling I am now trying to use Apache Oozie,
> but I am facing performqnce impacts. A Spark job which tooks 44 seconds
> when submitting it via CLI now takes nearly 3 Minutes.
>
> Have you faced similar experiences in using Oozie for scheduling Spark
> application jobs ? What alternative workflow tools are you using for
> scheduling Spark jobs on Hadoop ?
>
>
> Br,
>
> Dennis
>
> Von meinem iPhone gesendet
> Von meinem iPhone gesendet
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark and Oozie

2019-07-19 Thread Dennis Suhari


Dear experts,

I am using Spark for processing data from HDFS (hadoop). These Spark 
application are data pipelines, data wrangling and machine learning 
applications. Thus Spark submits its job using YARN. 
This also works well. For scheduling I am now trying to use Apache Oozie, but I 
am facing performqnce impacts. A Spark job which tooks 44 seconds when 
submitting it via CLI now takes nearly 3 Minutes.

Have you faced similar experiences in using Oozie for scheduling Spark 
application jobs ? What alternative workflow tools are you using for scheduling 
Spark jobs on Hadoop ?


Br,

Dennis

Von meinem iPhone gesendet
Von meinem iPhone gesendet

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Looking for a developer to help us with a small ETL project using Spark and Kubernetes

2019-07-19 Thread Sebastian Piu
Hey Warren,

I've done similar integrations in the past, are you looking for a freelance
dev to achieve this? I'm based in the UK.

Cheers
Seb


On Thu, 18 Jul 2019, 11:47 pm Information Technologies, <
i...@digitalearthnetwork.com> wrote:

> Hello,
>
> We are looking for a developer to help us with a small ETL project using
> Spark and Kubernetes. Here are some of the requirements:
>
> 1. We need a REST API to run and schedule jobs. We would prefer this done
> in Node.js but can be done using Java. The REST API will not be available
> to the public.
> 2. We need an easy way to create new jobs in Java without deploying the
> whole server again.
> 3. We want jobs deployed/ran using Kubernetes.
> 4. Must be able to scale to 1000s of ETL jobs.
> 5. Source for data will be one REST API.
> 6. Destination for data will be one Couchbase Database cluster. (Couchbase
> also uses a REST API)
> 7. I am not sure how many records will be processed per job.
> 8. The data is mostly sales related data.
>
> I know there are commercial ETL solutions that do everything I want. We
> are looking for something simple and do not need a fancy UI to describe our
> ETL. We want to use Spark and Java to programmatically describe out ETL
> jobs.
>
> Please let me know if you are interested.
>
> Thanks,
>
> Warren Bell
> --
>
>
> **
>
> This
>  email and any files transmitted with it are confidential and intended
>
> solely for the use of the individual or entity to whom they are
>
> addressed. They may not be disseminated or distributed to persons or
>
> entities other than the ones intended without the authority of the
> sender.
> If you have received this email in error or are not the
>  intended
> recipient, you may not use, copy, disseminate or distribute
> it. Delete it
> immediately from your system and notify the sender
> promptly
>  by email that
> you have done so. This footnote also confirms that this
> email message has
> been scanned for the presence of computer viruses.
>
>
> **
>
>
> Please consider the environment before printing
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>