Re: Use derived column for other derived column in the same statement

2019-04-21 Thread Shraddha Shah
Also the same thing for groupby agg operation, how can we use one
aggregated result (say min(amount)) to derive another aggregated column?

On Sun, Apr 21, 2019 at 11:24 PM Rishi Shah 
wrote:

> Hello All,
>
> How can we use a derived column1 for deriving another column in the same
> dataframe operation statement?
>
> something like:
>
> df = df.withColumn('derived1', lit('something'))
> .withColumn('derived2', col('derived1') == 'something')
>
> --
> Regards,
>
> Rishi Shah
>


Use derived column for other derived column in the same statement

2019-04-21 Thread Rishi Shah
Hello All,

How can we use a derived column1 for deriving another column in the same
dataframe operation statement?

something like:

df = df.withColumn('derived1', lit('something'))
.withColumn('derived2', col('derived1') == 'something')

-- 
Regards,

Rishi Shah


Usage of Explicit Future in Spark program

2019-04-21 Thread Chetan Khatri
Hello Spark Users,

Someone has suggested by breaking 5-5 unpredictable transformation blocks
to Future[ONE STRING ARGUMENT] and claim this can tune the performance. I
am wondering this is a use of explicit Future! in Spark?

Sample code is below:

 def writeData( tableName: String): Future[String] =  Future {

// some heavy lifting Spark transformations, 5-6 read->transform->load.

}

writeDataFutures += writeData("dynamicFieldData")

 writeDataFutures foreach { writeDataFuture =>
  Await.ready(writeDataFuture, Duration.Inf).onComplete {
case Success(table) => logger.info(s"All Success")
case Failure(e) => e.printStackTrace()
  }
}


Please suggest technical doubt.

Thanks


Re: Writing to Aerospike from Spark with bulk write with user authentication fails

2019-04-21 Thread Mich Talebzadeh
Just as an add on I see this in aerospike log

Apr 21 2019 17:52:24 GMT: INFO (security): (security.c:5483) permitted |
client: 50.140.197.220:33466 | authenticated user: mich | action: login |
detail: user=mich

*Apr 21 2019 17:52:25 GMT: INFO (security): (security.c:5483) not
authenticated | client: 50.140.197.220:33468 
| authenticated user: | action: info request | detail: *

So the first one with user=mich is authenticated.

however, the second one looking for bulk-write is missing the authenticated
user. So there must be a way of passing the username?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 21 Apr 2019 at 11:30, Mich Talebzadeh 
wrote:

> Aerospike Enterprise version if anyone has worked with user authentication!
>
> As far as I know one can create a client with aerospike authentication as
> follows that works for single put
>
> import com.aerospike.spark.sql._
> import com.aerospike.client.Bin
> import com.aerospike.client.Key
> import com.aerospike.client.Value
> import com.aerospike.client.AerospikeClient
> import com.aerospike.client.Host
> import com.aerospike.client.policy.ClientPolicy
> import com.aerospike.client.policy.WritePolicy
>
> var hosts = {
> new Host("rhes75", 3000)
> }
>
> var dbHost = "rhes75"
> var dbPort = "3000"
> var dbConnection = "xyz"
> var namespace = "trading"
> var dbPassword = "xx"
> var dbSet = "testSet"
> val policy = new ClientPolicy()
> policy.user = dbConnection
> policy.password = dbPassword
> val client = new AerospikeClient(policy, hosts)
> val TEST_COUNT = 100
> for (i <- 1 to TEST_COUNT) {
>val key = new Key(namespace, namespace, dbSet, i)
>client.put(null, key,
>new Bin("one", i),
>new Bin("two", "two:"+i),
>new Bin("three", i.toDouble)
>   )
> }
> However, I have no way passing of user credential when I am doing bulk
> write using a dataframe
>
>   df.write.  mode(SaveMode.Overwrite).  
> format("com.aerospike.spark.sql").  option("aerospike.namespace", 
> namespace).  option("aerospike.set", dbSet).  
> option("aerospike.updateByKey", "id").  option("aerospike.keyColumn", 
> "__id").  option("aerospike.batchMax", 5000).  
> option("aerospike.keyPath", "/etc/aerospike/features.conf").  save()
>
> when I turn off security in conf file in aerospike this works fine.
> Otherwise I get the following error:
>
> Caused by: com.aerospike.client.AerospikeException$Connection: Error -8:
> Failed to connect to host(s): rhes75 3000 Error 80: not authenticated
>
> Any ideas how this can be done with user authentication
>
> Thanks,
>
> Mich
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Writing to Aerospike from Spark with bulk write with user authentication fails

2019-04-21 Thread Mich Talebzadeh
Aerospike Enterprise version if anyone has worked with user authentication!

As far as I know one can create a client with aerospike authentication as
follows that works for single put

import com.aerospike.spark.sql._
import com.aerospike.client.Bin
import com.aerospike.client.Key
import com.aerospike.client.Value
import com.aerospike.client.AerospikeClient
import com.aerospike.client.Host
import com.aerospike.client.policy.ClientPolicy
import com.aerospike.client.policy.WritePolicy

var hosts = {
new Host("rhes75", 3000)
}

var dbHost = "rhes75"
var dbPort = "3000"
var dbConnection = "xyz"
var namespace = "trading"
var dbPassword = "xx"
var dbSet = "testSet"
val policy = new ClientPolicy()
policy.user = dbConnection
policy.password = dbPassword
val client = new AerospikeClient(policy, hosts)
val TEST_COUNT = 100
for (i <- 1 to TEST_COUNT) {
   val key = new Key(namespace, namespace, dbSet, i)
   client.put(null, key,
   new Bin("one", i),
   new Bin("two", "two:"+i),
   new Bin("three", i.toDouble)
  )
}
However, I have no way passing of user credential when I am doing bulk
write using a dataframe

  df.write.  mode(SaveMode.Overwrite).
format("com.aerospike.spark.sql").  option("aerospike.namespace",
namespace).  option("aerospike.set", dbSet).
option("aerospike.updateByKey", "id").
option("aerospike.keyColumn", "__id").
option("aerospike.batchMax", 5000).  option("aerospike.keyPath",
"/etc/aerospike/features.conf").  save()

when I turn off security in conf file in aerospike this works fine.
Otherwise I get the following error:

Caused by: com.aerospike.client.AerospikeException$Connection: Error -8:
Failed to connect to host(s): rhes75 3000 Error 80: not authenticated

Any ideas how this can be done with user authentication

Thanks,

Mich

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Spark job running for long time

2019-04-21 Thread rajat kumar
Hi Yeikel,

I can not copy anything from the system.
But I have seen explain output.

It was doing sortMergeJoin for all tables.
There are 10 tables , all of them doing left outer join.

Out of 10 tables, 1 table is of 50MB and second table is of 200MB. Rest are
big tables.

Also the data is in Avro form.

I am using spark 2.2

I suspect broadcast can help , not sure because broadcast works for 10MB
sized smaller tables

Thanks
Rajat

On Wed, 17 Apr 2019, 23:53 Yeikel  Can you share the output of df.explain() ?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>