Re: Read Time from a remote data source

2018-12-18 Thread jiaan.geng
You said your hdfs cluster and spark cluster is running on different
cluster.This is not a good idea,because you should consider data
locality.Your spark node need config hdfs client configuration.
Spark Job is composed of stages,each stage have one or more
partitions。Parallelism of job decided by these partitions.
Shuffle process is decided by your operator,like
reduceByKey、repartition、sortBy and so on.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to update structured streaming apps gracefully

2018-12-18 Thread Yuta Morisawa

Hi Priya and  Vincent

Thank you for your reply!
It looks the new feature is implemented only in the latest version.
But I'm using Spark 2.3.0 so, in my understanding, I need to stop and 
reload apps.


Thanks

On 2018/12/19 9:09, vincent gromakowski wrote:

I totally missed this new feature. Thanks for the pointer

Le mar. 18 déc. 2018 à 21:18, Priya Matpadi > a écrit :


Changes in streaming query that allow or disallow recovery from
checkpoint is clearly provided in

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query.

On Tue, Dec 18, 2018 at 9:45 AM vincent gromakowski
mailto:vincent.gromakow...@gmail.com>> wrote:

Checkpointing is only used for failure recovery not for app
upgrades. You need to manually code the unload/load and save it
to a persistent store

Le mar. 18 déc. 2018 à 17:29, Priya Matpadi mailto:pmatp...@gmail.com>> a écrit :

Using checkpointing for graceful updates is my understanding
as well, based on the writeup in

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing,
and some prototyping. Have you faced any missed events?

On Mon, Dec 17, 2018 at 6:56 PM Yuta Morisawa
mailto:yu-moris...@kddi-research.jp>> wrote:

Hi

Now I'm trying to update my structured streaming
application.
But I have no idea how to update it gracefully.

Should I stop it, replace a jar file then restart it?
In my understanding, in that case, all the state will be
recovered if I
use checkpoints.
Is this correct?

Thank you,


-- 




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to update structured streaming apps gracefully

2018-12-18 Thread vincent gromakowski
I totally missed this new feature. Thanks for the pointer

Le mar. 18 déc. 2018 à 21:18, Priya Matpadi  a écrit :

> Changes in streaming query that allow or disallow recovery from checkpoint
> is clearly provided in
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query
> .
>
> On Tue, Dec 18, 2018 at 9:45 AM vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> Checkpointing is only used for failure recovery not for app upgrades. You
>> need to manually code the unload/load and save it to a persistent store
>>
>> Le mar. 18 déc. 2018 à 17:29, Priya Matpadi  a
>> écrit :
>>
>>> Using checkpointing for graceful updates is my understanding as well,
>>> based on the writeup in
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing,
>>> and some prototyping. Have you faced any missed events?
>>>
>>> On Mon, Dec 17, 2018 at 6:56 PM Yuta Morisawa <
>>> yu-moris...@kddi-research.jp> wrote:
>>>
 Hi

 Now I'm trying to update my structured streaming application.
 But I have no idea how to update it gracefully.

 Should I stop it, replace a jar file then restart it?
 In my understanding, in that case, all the state will be recovered if I
 use checkpoints.
 Is this correct?

 Thank you,


 --


 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org




Re: How to update structured streaming apps gracefully

2018-12-18 Thread Priya Matpadi
Changes in streaming query that allow or disallow recovery from checkpoint
is clearly provided in
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query
.

On Tue, Dec 18, 2018 at 9:45 AM vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Checkpointing is only used for failure recovery not for app upgrades. You
> need to manually code the unload/load and save it to a persistent store
>
> Le mar. 18 déc. 2018 à 17:29, Priya Matpadi  a écrit :
>
>> Using checkpointing for graceful updates is my understanding as well,
>> based on the writeup in
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing,
>> and some prototyping. Have you faced any missed events?
>>
>> On Mon, Dec 17, 2018 at 6:56 PM Yuta Morisawa <
>> yu-moris...@kddi-research.jp> wrote:
>>
>>> Hi
>>>
>>> Now I'm trying to update my structured streaming application.
>>> But I have no idea how to update it gracefully.
>>>
>>> Should I stop it, replace a jar file then restart it?
>>> In my understanding, in that case, all the state will be recovered if I
>>> use checkpoints.
>>> Is this correct?
>>>
>>> Thank you,
>>>
>>>
>>> --
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Dataset experimental interfaces

2018-12-18 Thread Andrew Old
We are running Spark 2.2.0 in a hadoop cluster and I worked on a proof of
concept to read event based data into Spark Datasets and operating over
those sets to calculate differences between the event data.

More specifically, ordered position data with odometer values and wanting
to calculate the number of miles traveled within certain jurisdictions by
vehicle.

My prototype utilizes some Dataset interfaces (such as map (using
Encoders), groupByKey) that are marked experimental (even in the 2.4.0
release).  While I understand experimental means that changes may occur in
future releases, I would like to know if others would avoid using the
experimental interfaces in any production code at all costs?  We would have
control on when we would upgrade to newer versions of Spark so we can test
for compatibility when new releases come out but I'm still a bit hesitant
to count on these interfaces moving forward.

Since our prototype is showing success, we are considering using it for a
new application and I would like to get feedback on if I should consider
trying to re-work it using non-experimental interfaces while I have some
time.  So far I have found Datasets being great to use to process data and
would like to keep using them.

Thanks.


Read Time from a remote data source

2018-12-18 Thread swastik mittal
Hi, 

I am new to spark. I am running a hdfs file system on a remote cluster
whereas my spark workers are on another cluster. When my textFile RDD gets
executed, does spark worker read from the file according to hdfs partitions
task by task, or do they read it once when the blockmanager sets after the
start of first task and distributes it among the memory of spark cluster?

I have this question because I have a situation where, when I have only one
worker executing a job it shows less run time per task (shown in history
server) then when I have two workers executing the same job in parallel.
Even though the total duration is almost the same.

I am running a simple grep application and no shuffles within the cluster.
Text file is on a remote hdfs cluster and is of 813MB distributed into 7
chunks of 128MB, last chunk is left over size.

Thanks 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Scala reading from Google Cloud BigQuery table throws error

2018-12-18 Thread Mich Talebzadeh
Thanks Jorn. I will try that. Requires installing sbt etc on ephemeral
compute server in Google Cloud to built an uber jar file.



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 18 Dec 2018 at 11:16, Jörn Franke  wrote:

> Maybe the guava version in your spark lib folder is not compatible (if
> your Spark version has a guava library)? In this case i propose to create a
> fat/uber jar potentially with a shaded guava dependency.
>
> Am 18.12.2018 um 11:26 schrieb Mich Talebzadeh  >:
>
> Hi,
>
> I am writing a small test code in spark-shell with attached jar
> dependencies
>
> spark-shell --jars
> /home/hduser/jars/bigquery-connector-0.13.4-hadoop3.jar,/home/hduser/jars/gcs-connector-1.9.4-hadoop3.jar,/home/hduser/jars/other/guava-19.0.jar,/home/hduser/jars/google-api-client-1.4.1-beta.jar,/home/hduser/jars/google-api-client-json-1.2.3-alpha.jar,/home/hduser/jars/google-api-services-bigquery-v2-rev20181202-1.27.0.jar
>
>  to read an already existing table in Google BigQuery as follows:
>
> import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
> import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
> import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
> import
> com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
> import
> com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
> import com.google.gson.JsonObject
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
> // Assumes you have a spark context (sc) -- running from spark-shell REPL.
> // Marked as transient since configuration is not Serializable. This should
> // only be necessary in spark-shell REPL.
> @transient
> val conf = sc.hadoopConfiguration
> // Input parameters.
> val fullyQualifiedInputTableId = "axial-glow-224522.accounts.ll_18740868"
> val projectId = conf.get("fs.gs.project.id")
> val bucket = conf.get("fs.gs.system.bucket")
> // Input configuration.
> conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
> conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
> BigQueryConfiguration.configureBigQueryInput(conf,
> fullyQualifiedInputTableId)
>
> The problem I have is that even after loading jars with spark-shell --jar
>
> I am getting the following error at the last line
>
> scala> BigQueryConfiguration.configureBigQueryInput(conf,
> fullyQualifiedInputTableId)
>
> java.lang.NoSuchMethodError:
> com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
>   at
> com.google.cloud.hadoop.io.bigquery.BigQueryStrings.parseTableReference(BigQueryStrings.java:68)
>   at
> com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(BigQueryConfiguration.java:260)
>   ... 49 elided
>
> It says it cannot find method
>
> java.lang.NoSuchMethodError:
> com.google.common.base.Preconditions.checkArgument
>
> but I checked it and it is in the following jar file
>
> jar tvf guava-19.0.jar| grep common.base.Preconditions
>   5249 Wed Dec 09 15:58:14 UTC 2015
> com/google/common/base/Preconditions.class
>
> I have used different version of guava jar files but none works!
>
> The code is based on the following:
>
>
> https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


Re: How to clean up logs-dirs and local-dirs of running spark streaming in yarn cluster mode

2018-12-18 Thread shyla deshpande
Is there a way to do this without stopping the streaming application in
yarn cluster mode?

On Mon, Dec 17, 2018 at 4:42 PM shyla deshpande 
wrote:

> I get the ERROR
> 1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad:
> /var/log/hadoop-yarn/containers
>
> Is there a way to clean up these directories while the spark streaming
> application is running?
>
> Thanks
>


Re: Questions about caching

2018-12-18 Thread Reza Safi
Hi Andrew,
1) df2 will cache all the columns
2) In spark2 you will receive a warning like:

WARN execution.CacheManager: Asked to cache already cached data.
I don't recall whether it is the same in 1.6. Seems you are not using spark
2.
2a) Not sure whether you are suggesting for a feature in Spark. Maybe
someone else with more experience with pyspark can respond to you.
3) Have you considered using an external block store? Also I guess if this
is an academic environment maybe there are much easier ways to handle this.

Best,
Reza.

On Tue, Dec 11, 2018 at 12:13 PM Andrew Melo  wrote:

> Greetings, Spark Aficionados-
>
> I'm working on a project to (ab-)use PySpark to do particle physics
> analysis, which involves iterating with a lot of transformations (to
> apply weights and select candidate events) and reductions (to produce
> histograms of relevant physics objects). We have a basic version
> working, but I'm looking to exploit some of Spark's caching behavior
> to speed up the interactive computation portion of the analysis,
> probably by writing a thin convenience wrapper. I have a couple
> questions I've been unable to find definitive answers to, which would
> help me design this wrapper an efficient way:
>
> 1) When cache()-ing a dataframe where only a subset of the columns are
> used, is the entire dataframe placed into the cache, or only the used
> columns. E.G. does "df2" end up caching only "a", or all three
> columns?
>
> df1 = sc.read.load('test.parquet') # Has columns a, b, c
> df2 = df1.cache()
> df2.select('a').collect()
>
> 2) Are caches reference-based, or is there some sort of de-duplication
> based on the logical/physical plans. So, for instance, does spark take
> advantage of the fact that these two dataframes should have the same
> content:
>
> df1 = sc.read.load('test.parquet').cache()
> df2 = sc.read.load('test.parquet').cache()
>
> ...or are df1 and df2 totally independent WRT caching behavior?
>
> 2a) If the cache is reference-based, is it sufficient to hold a
> weakref to the python object to keep the cache in-scope?
>
> 3) Finally, the spark.externalBlockStore.blockManager is intriguing in
> our environment where we have multiple users concurrently analyzing
> mostly the same input datasets. We have enough RAM in our clusters to
> cache a high percentage of the very common datasets, but only if users
> could somehow share their caches (which, conveniently, are the larger
> datasets), We also have very large edge SSD cache servers we use to
> cache trans-oceanic I/O we could throw at this as well.
>
> It looks, however, like that API was removed in 2.0.0 and there wasn't
> a replacement. There are products like Alluxio, but they aren't
> transparent, requiring the user to manually cache their dataframes by
> doing save/loads to external files using "alluxio://" URIs. Is there
> no way around this behavior now?
>
> Sorry for the long email, and thanks!
> Andrew
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to update structured streaming apps gracefully

2018-12-18 Thread vincent gromakowski
Checkpointing is only used for failure recovery not for app upgrades. You
need to manually code the unload/load and save it to a persistent store

Le mar. 18 déc. 2018 à 17:29, Priya Matpadi  a écrit :

> Using checkpointing for graceful updates is my understanding as well,
> based on the writeup in
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing,
> and some prototyping. Have you faced any missed events?
>
> On Mon, Dec 17, 2018 at 6:56 PM Yuta Morisawa <
> yu-moris...@kddi-research.jp> wrote:
>
>> Hi
>>
>> Now I'm trying to update my structured streaming application.
>> But I have no idea how to update it gracefully.
>>
>> Should I stop it, replace a jar file then restart it?
>> In my understanding, in that case, all the state will be recovered if I
>> use checkpoints.
>> Is this correct?
>>
>> Thank you,
>>
>>
>> --
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: How to update structured streaming apps gracefully

2018-12-18 Thread Priya Matpadi
Using checkpointing for graceful updates is my understanding as well, based
on the writeup in
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing,
and some prototyping. Have you faced any missed events?

On Mon, Dec 17, 2018 at 6:56 PM Yuta Morisawa 
wrote:

> Hi
>
> Now I'm trying to update my structured streaming application.
> But I have no idea how to update it gracefully.
>
> Should I stop it, replace a jar file then restart it?
> In my understanding, in that case, all the state will be recovered if I
> use checkpoints.
> Is this correct?
>
> Thank you,
>
>
> --
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Apache Beam] Custom DataSourceV2 instanciation: parameters passing and Encoders

2018-12-18 Thread Etienne Chauchot
Hi everyone, 
Does anyone have comments on this question? 
CCing user ML
ThanksEtienne
Le mardi 11 décembre 2018 à 19:02 +0100, Etienne Chauchot a écrit :
> Hi Spark guys,
> I'm Etienne Chauchot and I'm a committer on the Apache Beam project. 
> We have what we call runners. They are pieces of software that translate 
> pipelines written using Beam API into
> pipelines that use native execution engine API. Currently, the Spark runner 
> uses old RDD / DStream APIs. I'm writing a
> new runner that will use structured streaming (but not continuous processing, 
> and also no schema for now).
> I am just starting. I'm currently trying to map our sources to yours. I'm 
> targeting new DataSourceV2 API. It maps
> pretty well with Beam sources but I have a problem with instanciation of the 
> custom source.I searched for an answer in
> stack-overflow and user ML with no luck. I guess it is a too specific 
> question:
> When visiting Beam DAG I have access to Beam objects such as Source and 
> Reader that I need to map to MicroBatchReader
> and InputPartitionReader.As far as I understand, a custom DataSourceV2 is 
> instantiated automatically by spark thanks
> to sparkSession.readStream().format(providerClassName) or similar code. The 
> problem is that I can only pass options of
> primitive types + String so I cannot pass the Beam Source to DataSourceV2. => 
> Is there a way to do so ?
> 
> Also I get as an output a Dataset. The Row contains an instance of Beam 
> WindowedValue, T is the type parameter
> of the Source. I  do a map on the Dataset to transform it to a 
> Dataset>. I have a question related to
> the Encoder: => how to properly create an Encoder for the generic type 
> WindowedValue to use in the map?
> Here is the 
> code:https://github.com/apache/beam/tree/spark-runner_structured-streaming
> And more specially:
> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.javahttps://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
> Thanks,
> Etienne
> 
> 
> 
> 
> 
> 


Re: Add column value in the dataset on the basis of a condition

2018-12-18 Thread Devender Yadav
Thanks, Yunus. It solved my problem.


Regards,
Devender

From: Shahab Yunus 
Sent: Tuesday, December 18, 2018 8:27:51 PM
To: Devender Yadav
Cc: user@spark.apache.org
Subject: Re: Add column value in the dataset on the basis of a condition

Sorry Devender, I hit the send button sooner by mistake. I meant to add more 
info.

So what I was trying to say was that you can use withColumn with when/otherwise 
clauses to add a column conditionally. See an example here:
https://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally

On Tue, Dec 18, 2018 at 9:55 AM Shahab Yunus 
mailto:shahab.yu...@gmail.com>> wrote:
Have you tried using withColumn? You can add a boolean column based on whether 
the age exists or not and then drop the older age column. You wouldn't need 
union of dataframes then

On Tue, Dec 18, 2018 at 8:58 AM Devender Yadav 
mailto:devender.ya...@impetus.co.in>> wrote:
Hi All,


useful code:

public class EmployeeBean implements Serializable {

private Long id;

private String name;

private Long salary;

private Integer age;

// getters and setters

}


Relevant spark code:

SparkSession spark = 
SparkSession.builder().master("local[2]").appName("play-with-spark").getOrCreate();
List employees1 = populateEmployees(1, 10);

Dataset ds1 = spark.createDataset(employees1, 
Encoders.bean(EmployeeBean.class));
ds1.show();
ds1.printSchema();

Dataset ds2 = ds1.where("age is null").withColumn("is_age_null", 
lit(true));
Dataset ds3 = ds1.where("age is not null").withColumn("is_age_null", 
lit(false));

Dataset ds4 = ds2.union(ds3);
ds4.show();


Relevant Output:


ds1

++---++--+
| age| id|name|salary|
++---++--+
|null|  1|dev1| 11000|
|   2|  2|dev2| 12000|
|null|  3|dev3| 13000|
|   4|  4|dev4| 14000|
|null|  5|dev5| 15000|
++---++--+


ds4

++---++--+---+
| age| id|name|salary|is_age_null|
++---++--+---+
|null|  1|dev1| 11000|   true|
|null|  3|dev3| 13000|   true|
|null|  5|dev5| 15000|   true|
|   2|  2|dev2| 12000|  false|
|   4|  4|dev4| 14000|  false|
++---++--+---+


Is there any better solution to add this column in the dataset rather than 
creating two datasets and performing union?

>



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
<>
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Add column value in the dataset on the basis of a condition

2018-12-18 Thread Shahab Yunus
Sorry Devender, I hit the send button sooner by mistake. I meant to add
more info.

So what I was trying to say was that you can use withColumn with
when/otherwise clauses to add a column conditionally. See an example here:
https://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally

On Tue, Dec 18, 2018 at 9:55 AM Shahab Yunus  wrote:

> Have you tried using withColumn? You can add a boolean column based on
> whether the age exists or not and then drop the older age column. You
> wouldn't need union of dataframes then
>
> On Tue, Dec 18, 2018 at 8:58 AM Devender Yadav <
> devender.ya...@impetus.co.in> wrote:
>
>> Hi All,
>>
>>
>> useful code:
>>
>> public class EmployeeBean implements Serializable {
>>
>> private Long id;
>>
>> private String name;
>>
>> private Long salary;
>>
>> private Integer age;
>>
>> // getters and setters
>>
>> }
>>
>>
>> Relevant spark code:
>>
>> SparkSession spark =
>> SparkSession.builder().master("local[2]").appName("play-with-spark").getOrCreate();
>> List employees1 = populateEmployees(1, 10);
>>
>> Dataset ds1 = spark.createDataset(employees1,
>> Encoders.bean(EmployeeBean.class));
>> ds1.show();
>> ds1.printSchema();
>>
>> Dataset ds2 = ds1.where("age is null").withColumn("is_age_null",
>> lit(true));
>> Dataset ds3 = ds1.where("age is not null").withColumn("is_age_null",
>> lit(false));
>>
>> Dataset ds4 = ds2.union(ds3);
>> ds4.show();
>>
>>
>> Relevant Output:
>>
>>
>> ds1
>>
>> ++---++--+
>> | age| id|name|salary|
>> ++---++--+
>> |null|  1|dev1| 11000|
>> |   2|  2|dev2| 12000|
>> |null|  3|dev3| 13000|
>> |   4|  4|dev4| 14000|
>> |null|  5|dev5| 15000|
>> ++---++--+
>>
>>
>> ds4
>>
>> ++---++--+---+
>> | age| id|name|salary|is_age_null|
>> ++---++--+---+
>> |null|  1|dev1| 11000|   true|
>> |null|  3|dev3| 13000|   true|
>> |null|  5|dev5| 15000|   true|
>> |   2|  2|dev2| 12000|  false|
>> |   4|  4|dev4| 14000|  false|
>> ++---++--+---+
>>
>>
>> Is there any better solution to add this column in the dataset rather
>> than creating two datasets and performing union?
>>
>> <
>> https://stackoverflow.com/questions/53834286/add-column-value-in-spark-dataset-on-the-basis-of-the-condition
>> >
>>
>>
>>
>> Regards,
>> Devender
>>
>> 
>>
>>
>>
>>
>>
>>
>> NOTE: This message may contain information that is confidential,
>> proprietary, privileged or otherwise protected by law. The message is
>> intended solely for the named addressee. If received in error, please
>> destroy and notify the sender. Any use of this email is prohibited when
>> received in error. Impetus does not represent, warrant and/or guarantee,
>> that the integrity of this communication has been maintained nor that the
>> communication is free of errors, virus, interception or interference.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Add column value in the dataset on the basis of a condition

2018-12-18 Thread Shahab Yunus
Have you tried using withColumn? You can add a boolean column based on
whether the age exists or not and then drop the older age column. You
wouldn't need union of dataframes then

On Tue, Dec 18, 2018 at 8:58 AM Devender Yadav 
wrote:

> Hi All,
>
>
> useful code:
>
> public class EmployeeBean implements Serializable {
>
> private Long id;
>
> private String name;
>
> private Long salary;
>
> private Integer age;
>
> // getters and setters
>
> }
>
>
> Relevant spark code:
>
> SparkSession spark =
> SparkSession.builder().master("local[2]").appName("play-with-spark").getOrCreate();
> List employees1 = populateEmployees(1, 10);
>
> Dataset ds1 = spark.createDataset(employees1,
> Encoders.bean(EmployeeBean.class));
> ds1.show();
> ds1.printSchema();
>
> Dataset ds2 = ds1.where("age is null").withColumn("is_age_null",
> lit(true));
> Dataset ds3 = ds1.where("age is not null").withColumn("is_age_null",
> lit(false));
>
> Dataset ds4 = ds2.union(ds3);
> ds4.show();
>
>
> Relevant Output:
>
>
> ds1
>
> ++---++--+
> | age| id|name|salary|
> ++---++--+
> |null|  1|dev1| 11000|
> |   2|  2|dev2| 12000|
> |null|  3|dev3| 13000|
> |   4|  4|dev4| 14000|
> |null|  5|dev5| 15000|
> ++---++--+
>
>
> ds4
>
> ++---++--+---+
> | age| id|name|salary|is_age_null|
> ++---++--+---+
> |null|  1|dev1| 11000|   true|
> |null|  3|dev3| 13000|   true|
> |null|  5|dev5| 15000|   true|
> |   2|  2|dev2| 12000|  false|
> |   4|  4|dev4| 14000|  false|
> ++---++--+---+
>
>
> Is there any better solution to add this column in the dataset rather than
> creating two datasets and performing union?
>
> <
> https://stackoverflow.com/questions/53834286/add-column-value-in-spark-dataset-on-the-basis-of-the-condition
> >
>
>
>
> Regards,
> Devender
>
> 
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Add column value in the dataset on the basis of a condition

2018-12-18 Thread Devender Yadav
Hi All,


useful code:

public class EmployeeBean implements Serializable {

private Long id;

private String name;

private Long salary;

private Integer age;

// getters and setters

}


Relevant spark code:

SparkSession spark = 
SparkSession.builder().master("local[2]").appName("play-with-spark").getOrCreate();
List employees1 = populateEmployees(1, 10);

Dataset ds1 = spark.createDataset(employees1, 
Encoders.bean(EmployeeBean.class));
ds1.show();
ds1.printSchema();

Dataset ds2 = ds1.where("age is null").withColumn("is_age_null", 
lit(true));
Dataset ds3 = ds1.where("age is not null").withColumn("is_age_null", 
lit(false));

Dataset ds4 = ds2.union(ds3);
ds4.show();


Relevant Output:


ds1

++---++--+
| age| id|name|salary|
++---++--+
|null|  1|dev1| 11000|
|   2|  2|dev2| 12000|
|null|  3|dev3| 13000|
|   4|  4|dev4| 14000|
|null|  5|dev5| 15000|
++---++--+


ds4

++---++--+---+
| age| id|name|salary|is_age_null|
++---++--+---+
|null|  1|dev1| 11000|   true|
|null|  3|dev3| 13000|   true|
|null|  5|dev5| 15000|   true|
|   2|  2|dev2| 12000|  false|
|   4|  4|dev4| 14000|  false|
++---++--+---+


Is there any better solution to add this column in the dataset rather than 
creating two datasets and performing union?





Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
<>
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark Scala reading from Google Cloud BigQuery table throws error

2018-12-18 Thread Jörn Franke
Maybe the guava version in your spark lib folder is not compatible (if your 
Spark version has a guava library)? In this case i propose to create a fat/uber 
jar potentially with a shaded guava dependency.

> Am 18.12.2018 um 11:26 schrieb Mich Talebzadeh :
> 
> Hi,
> 
> I am writing a small test code in spark-shell with attached jar dependencies
> 
> spark-shell --jars 
> /home/hduser/jars/bigquery-connector-0.13.4-hadoop3.jar,/home/hduser/jars/gcs-connector-1.9.4-hadoop3.jar,/home/hduser/jars/other/guava-19.0.jar,/home/hduser/jars/google-api-client-1.4.1-beta.jar,/home/hduser/jars/google-api-client-json-1.2.3-alpha.jar,/home/hduser/jars/google-api-services-bigquery-v2-rev20181202-1.27.0.jar
> 
>  to read an already existing table in Google BigQuery as follows:
> 
> import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
> import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
> import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
> import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
> import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
> import com.google.gson.JsonObject
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
> // Assumes you have a spark context (sc) -- running from spark-shell REPL.
> // Marked as transient since configuration is not Serializable. This should
> // only be necessary in spark-shell REPL.
> @transient
> val conf = sc.hadoopConfiguration
> // Input parameters.
> val fullyQualifiedInputTableId = "axial-glow-224522.accounts.ll_18740868"
> val projectId = conf.get("fs.gs.project.id")
> val bucket = conf.get("fs.gs.system.bucket")
> // Input configuration.
> conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
> conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
> BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
> 
> The problem I have is that even after loading jars with spark-shell --jar 
> 
> I am getting the following error at the last line
> 
> scala> BigQueryConfiguration.configureBigQueryInput(conf, 
> fullyQualifiedInputTableId)
> 
> java.lang.NoSuchMethodError: 
> com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
>   at 
> com.google.cloud.hadoop.io.bigquery.BigQueryStrings.parseTableReference(BigQueryStrings.java:68)
>   at 
> com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(BigQueryConfiguration.java:260)
>   ... 49 elided
> 
> It says it cannot find method
> 
> java.lang.NoSuchMethodError: 
> com.google.common.base.Preconditions.checkArgument
> 
> but I checked it and it is in the following jar file
> 
> jar tvf guava-19.0.jar| grep common.base.Preconditions
>   5249 Wed Dec 09 15:58:14 UTC 2015 com/google/common/base/Preconditions.class
> 
> I have used different version of guava jar files but none works!
> 
> The code is based on the following:
> 
> https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
> 
> Thanks
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Spark Scala reading from Google Cloud BigQuery table throws error

2018-12-18 Thread Mich Talebzadeh
Hi,

I am writing a small test code in spark-shell with attached jar dependencies

spark-shell --jars
/home/hduser/jars/bigquery-connector-0.13.4-hadoop3.jar,/home/hduser/jars/gcs-connector-1.9.4-hadoop3.jar,/home/hduser/jars/other/guava-19.0.jar,/home/hduser/jars/google-api-client-1.4.1-beta.jar,/home/hduser/jars/google-api-client-json-1.2.3-alpha.jar,/home/hduser/jars/google-api-services-bigquery-v2-rev20181202-1.27.0.jar

 to read an already existing table in Google BigQuery as follows:

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import
com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
import
com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
// Assumes you have a spark context (sc) -- running from spark-shell REPL.
// Marked as transient since configuration is not Serializable. This should
// only be necessary in spark-shell REPL.
@transient
val conf = sc.hadoopConfiguration
// Input parameters.
val fullyQualifiedInputTableId = "axial-glow-224522.accounts.ll_18740868"
val projectId = conf.get("fs.gs.project.id")
val bucket = conf.get("fs.gs.system.bucket")
// Input configuration.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
BigQueryConfiguration.configureBigQueryInput(conf,
fullyQualifiedInputTableId)

The problem I have is that even after loading jars with spark-shell --jar

I am getting the following error at the last line

scala> BigQueryConfiguration.configureBigQueryInput(conf,
fullyQualifiedInputTableId)

java.lang.NoSuchMethodError:
com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
  at
com.google.cloud.hadoop.io.bigquery.BigQueryStrings.parseTableReference(BigQueryStrings.java:68)
  at
com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(BigQueryConfiguration.java:260)
  ... 49 elided

It says it cannot find method

java.lang.NoSuchMethodError:
com.google.common.base.Preconditions.checkArgument

but I checked it and it is in the following jar file

jar tvf guava-19.0.jar| grep common.base.Preconditions
  5249 Wed Dec 09 15:58:14 UTC 2015
com/google/common/base/Preconditions.class

I have used different version of guava jar files but none works!

The code is based on the following:

https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.