Spark UI crashes on Large Workloads

2017-07-17 Thread saatvikshah1994
Hi,

I have a pyspark App which when provided a huge amount of data as input
throws the error explained here sometimes:
https://stackoverflow.com/questions/32340639/unable-to-understand-error-sparklistenerbus-has-already-stopped-dropping-event.
All my code is running inside the main function, and the only slightly
peculiar thing I am doing in this app is using a custom PySpark ML
Transformer(Modified from
https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml).
Could this be the issue? How can I debug why this is happening?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-crashes-on-Large-Workloads-tp28873.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Running Spark und YARN on AWS EMR

2017-07-17 Thread Takashi Sasaki
Hi Josh,


As you say, I also recognize the problem. I feel I got a warning when
specifying a huge data set.


We also adjust the partition size but we are doing command options
instead of default settings, or in code.


Regards,

Takashi

2017-07-18 6:48 GMT+09:00 Josh Holbrook :
> I just ran into this issue! Small world.
>
> As far as I can tell, by default spark on EMR is completely untuned, but it
> comes with a flag that you can set to tell EMR to autotune spark. In your
> configuration.json file, you can add something like:
>
>   {
> "Classification": "spark",
> "Properties": {
>   "maximizeResourceAllocation": "true"
> }
>   },
>
> but keep in mind that, again as far as I can tell, the default parallelism
> with this config is merely twice the number of executor cores--so for a 10
> machine cluster w/ 3 active cores each, 60 partitions. This is pretty low,
> so you'll likely want to adjust this--I'm currently using the following
> because spark chokes on datasets that are bigger than about 2g per
> partition:
>
>   {
> "Classification": "spark-defaults",
> "Properties": {
>   "spark.default.parallelism": "1000"
> }
>   }
>
> Good luck, and I hope this is helpful!
>
> --Josh
>
>
> On Mon, Jul 17, 2017 at 4:59 PM, Takashi Sasaki 
> wrote:
>>
>> Hi Pascal,
>>
>> The error also occurred frequently in our project.
>>
>> As a solution, it was effective to specify the memory size directly
>> with spark-submit command.
>>
>> eg. spark-submit executor-memory 2g
>>
>>
>> Regards,
>>
>> Takashi
>>
>> > 2017-07-18 5:18 GMT+09:00 Pascal Stammer :
>> >> Hi,
>> >>
>> >> I am running a Spark 2.1.x Application on AWS EMR with YARN and get
>> >> following error that kill my application:
>> >>
>> >> AM Container for appattempt_1500320286695_0001_01 exited with
>> >> exitCode:
>> >> -104
>> >> For more detailed output, check application tracking
>> >>
>> >> page:http://ip-172-31-35-192.eu-central-1.compute.internal:8088/cluster/app/application_1500320286695_0001Then,
>> >> click on links to logs of each attempt.
>> >> Diagnostics: Container
>> >> [pid=9216,containerID=container_1500320286695_0001_01_01] is
>> >> running
>> >> beyond physical memory limits. Current usage: 1.4 GB of 1.4 GB physical
>> >> memory used; 3.3 GB of 6.9 GB virtual memory used. Killing container.
>> >>
>> >>
>> >> I already change spark.yarn.executor.memoryOverhead but the error still
>> >> occurs. Does anybody have a hint for me which parameter or
>> >> configuration I
>> >> have to adapt.
>> >>
>> >> Thank you very much.
>> >>
>> >> Regards,
>> >>
>> >> Pascal Stammer
>> >>
>> >>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Running Spark und YARN on AWS EMR

2017-07-17 Thread Josh Holbrook
I just ran into this issue! Small world.

As far as I can tell, by default spark on EMR is completely untuned, but it
comes with a flag that you can set to tell EMR to autotune spark. In your
configuration.json file, you can add something like:

  {
"Classification": "spark",
"Properties": {
  "maximizeResourceAllocation": "true"
}
  },

but keep in mind that, again as far as I can tell, the default parallelism
with this config is merely twice the number of executor cores--so for a 10
machine cluster w/ 3 active cores each, 60 partitions. This is pretty low,
so you'll likely want to adjust this--I'm currently using the following
because spark chokes on datasets that are bigger than about 2g per
partition:

  {
"Classification": "spark-defaults",
"Properties": {
  "spark.default.parallelism": "1000"
}
  }

Good luck, and I hope this is helpful!

--Josh


On Mon, Jul 17, 2017 at 4:59 PM, Takashi Sasaki 
wrote:

> Hi Pascal,
>
> The error also occurred frequently in our project.
>
> As a solution, it was effective to specify the memory size directly
> with spark-submit command.
>
> eg. spark-submit executor-memory 2g
>
>
> Regards,
>
> Takashi
>
> > 2017-07-18 5:18 GMT+09:00 Pascal Stammer :
> >> Hi,
> >>
> >> I am running a Spark 2.1.x Application on AWS EMR with YARN and get
> >> following error that kill my application:
> >>
> >> AM Container for appattempt_1500320286695_0001_01 exited with
> exitCode:
> >> -104
> >> For more detailed output, check application tracking
> >> page:http://ip-172-31-35-192.eu-central-1.compute.internal:
> 8088/cluster/app/application_1500320286695_0001Then,
> >> click on links to logs of each attempt.
> >> Diagnostics: Container
> >> [pid=9216,containerID=container_1500320286695_0001_01_01] is
> running
> >> beyond physical memory limits. Current usage: 1.4 GB of 1.4 GB physical
> >> memory used; 3.3 GB of 6.9 GB virtual memory used. Killing container.
> >>
> >>
> >> I already change spark.yarn.executor.memoryOverhead but the error still
> >> occurs. Does anybody have a hint for me which parameter or
> configuration I
> >> have to adapt.
> >>
> >> Thank you very much.
> >>
> >> Regards,
> >>
> >> Pascal Stammer
> >>
> >>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Running Spark und YARN on AWS EMR

2017-07-17 Thread Pascal Stammer

Hi Takashi,

thanks for your help. After a further investigation, I figure out that the 
killed container was the driver process. After setting 
spark.yarn.driver.memoryOverhead instead of spark.yarn.executor.memoryOverhead 
the error was gone and application is executed without error. Maybe it will 
help you as well.

Regards,

Pascal 




> Am 17.07.2017 um 22:59 schrieb Takashi Sasaki :
> 
> Hi Pascal,
> 
> The error also occurred frequently in our project.
> 
> As a solution, it was effective to specify the memory size directly
> with spark-submit command.
> 
> eg. spark-submit executor-memory 2g
> 
> 
> Regards,
> 
> Takashi
> 
>> 2017-07-18 5:18 GMT+09:00 Pascal Stammer :
>>> Hi,
>>> 
>>> I am running a Spark 2.1.x Application on AWS EMR with YARN and get
>>> following error that kill my application:
>>> 
>>> AM Container for appattempt_1500320286695_0001_01 exited with exitCode:
>>> -104
>>> For more detailed output, check application tracking
>>> page:http://ip-172-31-35-192.eu-central-1.compute.internal:8088/cluster/app/application_1500320286695_0001Then,
>>> click on links to logs of each attempt.
>>> Diagnostics: Container
>>> [pid=9216,containerID=container_1500320286695_0001_01_01] is running
>>> beyond physical memory limits. Current usage: 1.4 GB of 1.4 GB physical
>>> memory used; 3.3 GB of 6.9 GB virtual memory used. Killing container.
>>> 
>>> 
>>> I already change spark.yarn.executor.memoryOverhead but the error still
>>> occurs. Does anybody have a hint for me which parameter or configuration I
>>> have to adapt.
>>> 
>>> Thank you very much.
>>> 
>>> Regards,
>>> 
>>> Pascal Stammer
>>> 
>>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Spark Streaming handling Kafka exceptions

2017-07-17 Thread Jean-Francois Gosselin
How can I handle an error with Kafka with my DirectStream (network issue,
zookeeper or broker going down) ? For example when the consumer fails to
connect with Kafka (at startup) I only get a DEBUG log (not even an ERROR)
and no exception are thrown ...

I'm using Spark 2.1.1 and spark-streaming-kafka-0-10.

16:50:23.149 [ForkJoinPool-1-worker-5] DEBUG
o.a.kafka.common.network.Selector - Connection with localhost/127.0.0.1
disconnected
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)


Thanks


Re: Running Spark und YARN on AWS EMR

2017-07-17 Thread Takashi Sasaki
Hi Pascal,

The error also occurred frequently in our project.

As a solution, it was effective to specify the memory size directly
with spark-submit command.

eg. spark-submit executor-memory 2g


Regards,

Takashi

> 2017-07-18 5:18 GMT+09:00 Pascal Stammer :
>> Hi,
>>
>> I am running a Spark 2.1.x Application on AWS EMR with YARN and get
>> following error that kill my application:
>>
>> AM Container for appattempt_1500320286695_0001_01 exited with exitCode:
>> -104
>> For more detailed output, check application tracking
>> page:http://ip-172-31-35-192.eu-central-1.compute.internal:8088/cluster/app/application_1500320286695_0001Then,
>> click on links to logs of each attempt.
>> Diagnostics: Container
>> [pid=9216,containerID=container_1500320286695_0001_01_01] is running
>> beyond physical memory limits. Current usage: 1.4 GB of 1.4 GB physical
>> memory used; 3.3 GB of 6.9 GB virtual memory used. Killing container.
>>
>>
>> I already change spark.yarn.executor.memoryOverhead but the error still
>> occurs. Does anybody have a hint for me which parameter or configuration I
>> have to adapt.
>>
>> Thank you very much.
>>
>> Regards,
>>
>> Pascal Stammer
>>
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Slowness of Spark Thrift Server

2017-07-17 Thread Maciej Bryński
I did the test on Spark 2.2.0 and problem still exists.

Any ideas how to fix it ?

Regards,
Maciek

2017-07-11 11:52 GMT+02:00 Maciej Bryński :

> Hi,
> I have following issue.
> I'm trying to use Spark as a proxy to Cassandra.
> The problem is the thrift server overhead.
>
> I'm using following query:
> select * from table where primay_key = 123
>
> Job time (from jobs tab) is around 50ms. (and it's similar to query time
> from SQL tab)
> Unfortunately query time from JDBC/ODBC Server is 650 ms.
> Any ideas why ? What could cause such an overhead ?
>
> Regards,
> --
> Maciek Bryński
>



-- 
Maciek Bryński


Running Spark und YARN on AWS EMR

2017-07-17 Thread Pascal Stammer
Hi,

I am running a Spark 2.1.x Application on AWS EMR with YARN and get following 
error that kill my application:

AM Container for appattempt_1500320286695_0001_01 exited with exitCode: -104
For more detailed output, check application tracking 
page:http://ip-172-31-35-192.eu-central-1.compute.internal:8088/cluster/app/application_1500320286695_0001Then,
 click on links to logs of each attempt.
Diagnostics: Container 
[pid=9216,containerID=container_1500320286695_0001_01_01] is running beyond 
physical memory limits. Current usage: 1.4 GB of 1.4 GB physical memory used; 
3.3 GB of 6.9 GB virtual memory used. Killing container.


I already change spark.yarn.executor.memoryOverhead but the error still occurs. 
Does anybody have a hint for me which parameter or configuration I have to 
adapt.

Thank you very much.

Regards,

Pascal Stammer




Re: [ANNOUNCE] Announcing Apache Spark 2.2.0

2017-07-17 Thread Sam Elamin
Well done!  This is amazing news :) Congrats and really cant wait to spread
the structured streaming love!

On Mon, Jul 17, 2017 at 5:25 PM, kant kodali  wrote:

> +1
>
> On Tue, Jul 11, 2017 at 3:56 PM, Jean Georges Perrin  wrote:
>
>> Awesome! Congrats! Can't wait!!
>>
>> jg
>>
>>
>> On Jul 11, 2017, at 18:48, Michael Armbrust 
>> wrote:
>>
>> Hi all,
>>
>> Apache Spark 2.2.0 is the third release of the Spark 2.x line. This
>> release removes the experimental tag from Structured Streaming. In
>> addition, this release focuses on usability, stability, and polish,
>> resolving over 1100 tickets.
>>
>> We'd like to thank our contributors and users for their contributions and
>> early feedback to this release. This release would not have been possible
>> without you.
>>
>> To download Spark 2.2.0, head over to the download page:
>> http://spark.apache.org/downloads.html
>>
>> To view the release notes: https://spark.apache.or
>> g/releases/spark-release-2-2-0.html
>>
>> *(note: If you see any issues with the release notes, webpage or
>> published artifacts, please contact me directly off-list) *
>>
>> Michael
>>
>>
>


[ANNOUNCE] Apache Bahir 2.1.1 Released

2017-07-17 Thread Luciano Resende
Apache Bahir provides extensions to multiple distributed analytic
platforms, extending their reach with a diversity of streaming connectors
and SQL data sources.
The Apache Bahir community is pleased to announce the release of Apache
Bahir 2.1.1 which provides the following extensions for Apache Spark 2.1.1:

   - Apache CouchDB/Cloudant SQL data source (new)
   - Apache CouchDB/Cloudant Streaming (new)
   - Akka Streaming
   - Akka Structured Streaming (new)
   - Google Cloud Pub/Sub Streaming connector (new)
   - MQTT Streaming
   - MQTT Structured Streaming
   - Twitter Streaming
   - ZeroMQ Streaming

For more information about Apache Bahir and to download the
latest release go to:

http://bahir.apache.org

The Apache Bahir streaming connectors are also available at:

https://spark-packages.org/?q=bahir


-- 
Luciano Resende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: running spark job with fat jar file

2017-07-17 Thread ayan guha
Hi Mitch - YARN uses a specific folder convention comprising application
id, container id, attempt number and so on. Once you run a spark-submit
using Yarn, you can see your application in Yarn RM UI page. Once the app
finishes, you can see all logs using

yarn logs -applicationId 

In this log, you can see all details of transient folders, what goes where
and so on.

These local folders get created on OS filesystem, not on HDFS. But they are
transient so once your job finishes, Yarn cleans them up.

On Tue, Jul 18, 2017 at 5:46 AM, Mich Talebzadeh 
wrote:

> great Ayan.
>
> Is that local folder on HDFS? Will that be a hidden folder specific to the
> user executing the spark job?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 17 July 2017 at 19:34, ayan guha  wrote:
>
>> Hi
>>
>> Here is my understanding:
>>
>> 1. For each container, there will be a local folder be created and
>> application jar will be copied over there
>> 2. Jars mentioned in --jars switch will be copied over to container to
>> the class path of the application.
>>
>> So to your question, --jars is not required to be copied over to all
>> nodes during submission time. YARN will take care of it.
>>
>> Best
>> Ayan
>>
>> On Tue, Jul 18, 2017 at 4:10 AM, Marcelo Vanzin 
>> wrote:
>>
>>> Yes.
>>>
>>> On Mon, Jul 17, 2017 at 10:47 AM, Mich Talebzadeh
>>>  wrote:
>>> > thanks Marcelo.
>>> >
>>> > are these files distributed through hdfs?
>>> >
>>> > Dr Mich Talebzadeh
>>> >
>>> >
>>> >
>>> > LinkedIn
>>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJ
>>> d6zP6AcPCCdOABUrV8Pw
>>> >
>>> >
>>> >
>>> > http://talebzadehmich.wordpress.com
>>> >
>>> >
>>> > Disclaimer: Use it at your own risk. Any and all responsibility for any
>>> > loss, damage or destruction of data or any other property which may
>>> arise
>>> > from relying on this email's technical content is explicitly
>>> disclaimed. The
>>> > author will in no case be liable for any monetary damages arising from
>>> such
>>> > loss, damage or destruction.
>>> >
>>> >
>>> >
>>> >
>>> > On 17 July 2017 at 18:46, Marcelo Vanzin  wrote:
>>> >>
>>> >> The YARN backend distributes all files and jars you submit with your
>>> >> application.
>>> >>
>>> >> On Mon, Jul 17, 2017 at 10:45 AM, Mich Talebzadeh
>>> >>  wrote:
>>> >> > thanks guys.
>>> >> >
>>> >> > just to clarify let us assume i am doing spark-submit as below:
>>> >> >
>>> >> > ${SPARK_HOME}/bin/spark-submit \
>>> >> > --packages ${PACKAGES} \
>>> >> > --driver-memory 2G \
>>> >> > --num-executors 2 \
>>> >> > --executor-memory 2G \
>>> >> > --executor-cores 2 \
>>> >> > --master yarn \
>>> >> > --deploy-mode client \
>>> >> > --conf "${SCHEDULER}" \
>>> >> > --conf "${EXTRAJAVAOPTIONS}" \
>>> >> > --jars ${JARS} \
>>> >> > --class "${FILE_NAME}" \
>>> >> > --conf "${SPARKUIPORT}" \
>>> >> > --conf "${SPARKDRIVERPORT}" \
>>> >> > --conf "${SPARKFILESERVERPORT}" \
>>> >> > --conf "${SPARKBLOCKMANAGERPORT}" \
>>> >> > --conf "${SPARKKRYOSERIALIZERBUFFERMAX}" \
>>> >> > ${JAR_FILE}
>>> >> >
>>> >> > The ${JAR_FILE} is the one. As I understand Spark should distribute
>>> that
>>> >> > ${JAR_FILE} to each container?
>>> >> >
>>> >> > Also --jars ${JARS} are the list of normal jar files that need to
>>> exist
>>> >> > in
>>> >> > the same directory on each executor node?
>>> >> >
>>> >> > cheers,
>>> >> >
>>> >> >
>>> >> >
>>> >> > Dr Mich Talebzadeh
>>> >> >
>>> >> >
>>> >> >
>>> >> > LinkedIn
>>> >> >
>>> >> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJ
>>> d6zP6AcPCCdOABUrV8Pw
>>> >> >
>>> >> >
>>> >> >
>>> >> > http://talebzadehmich.wordpress.com
>>> >> >
>>> >> >
>>> >> > Disclaimer: Use it at your own risk. Any and all responsibility for
>>> any
>>> >> > loss, damage or destruction of data or any other property which may
>>> >> > arise
>>> >> > from relying on this email's technical content is explicitly
>>> disclaimed.
>>> >> > The
>>> >> > author will in no case be liable for any monetary damages arising
>>> from
>>> >> > such
>>> >> > loss, 

Re: running spark job with fat jar file

2017-07-17 Thread Mich Talebzadeh
great Ayan.

Is that local folder on HDFS? Will that be a hidden folder specific to the
user executing the spark job?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 17 July 2017 at 19:34, ayan guha  wrote:

> Hi
>
> Here is my understanding:
>
> 1. For each container, there will be a local folder be created and
> application jar will be copied over there
> 2. Jars mentioned in --jars switch will be copied over to container to the
> class path of the application.
>
> So to your question, --jars is not required to be copied over to all nodes
> during submission time. YARN will take care of it.
>
> Best
> Ayan
>
> On Tue, Jul 18, 2017 at 4:10 AM, Marcelo Vanzin 
> wrote:
>
>> Yes.
>>
>> On Mon, Jul 17, 2017 at 10:47 AM, Mich Talebzadeh
>>  wrote:
>> > thanks Marcelo.
>> >
>> > are these files distributed through hdfs?
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJ
>> d6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> > Disclaimer: Use it at your own risk. Any and all responsibility for any
>> > loss, damage or destruction of data or any other property which may
>> arise
>> > from relying on this email's technical content is explicitly
>> disclaimed. The
>> > author will in no case be liable for any monetary damages arising from
>> such
>> > loss, damage or destruction.
>> >
>> >
>> >
>> >
>> > On 17 July 2017 at 18:46, Marcelo Vanzin  wrote:
>> >>
>> >> The YARN backend distributes all files and jars you submit with your
>> >> application.
>> >>
>> >> On Mon, Jul 17, 2017 at 10:45 AM, Mich Talebzadeh
>> >>  wrote:
>> >> > thanks guys.
>> >> >
>> >> > just to clarify let us assume i am doing spark-submit as below:
>> >> >
>> >> > ${SPARK_HOME}/bin/spark-submit \
>> >> > --packages ${PACKAGES} \
>> >> > --driver-memory 2G \
>> >> > --num-executors 2 \
>> >> > --executor-memory 2G \
>> >> > --executor-cores 2 \
>> >> > --master yarn \
>> >> > --deploy-mode client \
>> >> > --conf "${SCHEDULER}" \
>> >> > --conf "${EXTRAJAVAOPTIONS}" \
>> >> > --jars ${JARS} \
>> >> > --class "${FILE_NAME}" \
>> >> > --conf "${SPARKUIPORT}" \
>> >> > --conf "${SPARKDRIVERPORT}" \
>> >> > --conf "${SPARKFILESERVERPORT}" \
>> >> > --conf "${SPARKBLOCKMANAGERPORT}" \
>> >> > --conf "${SPARKKRYOSERIALIZERBUFFERMAX}" \
>> >> > ${JAR_FILE}
>> >> >
>> >> > The ${JAR_FILE} is the one. As I understand Spark should distribute
>> that
>> >> > ${JAR_FILE} to each container?
>> >> >
>> >> > Also --jars ${JARS} are the list of normal jar files that need to
>> exist
>> >> > in
>> >> > the same directory on each executor node?
>> >> >
>> >> > cheers,
>> >> >
>> >> >
>> >> >
>> >> > Dr Mich Talebzadeh
>> >> >
>> >> >
>> >> >
>> >> > LinkedIn
>> >> >
>> >> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJ
>> d6zP6AcPCCdOABUrV8Pw
>> >> >
>> >> >
>> >> >
>> >> > http://talebzadehmich.wordpress.com
>> >> >
>> >> >
>> >> > Disclaimer: Use it at your own risk. Any and all responsibility for
>> any
>> >> > loss, damage or destruction of data or any other property which may
>> >> > arise
>> >> > from relying on this email's technical content is explicitly
>> disclaimed.
>> >> > The
>> >> > author will in no case be liable for any monetary damages arising
>> from
>> >> > such
>> >> > loss, damage or destruction.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On 17 July 2017 at 18:18, ayan guha  wrote:
>> >> >>
>> >> >> Hi Mitch
>> >> >>
>> >> >> your jar file can be anywhere in the file system, including hdfs.
>> >> >>
>> >> >> If using yarn, preferably use cluster mode in terms of deployment.
>> >> >>
>> >> >> Yarn will distribute the jar to each container.
>> >> >>
>> >> >> Best
>> >> >> Ayan
>> >> >>
>> >> >> On Tue, 18 Jul 2017 at 2:17 am, Marcelo Vanzin > >
>> >> >> wrote:
>> >> >>>
>> >> >>> Spark distributes your application jar for you.
>> >> >>>
>> >> >>> On Mon, Jul 17, 2017 at 8:41 AM, Mich Talebzadeh
>> >> >>>  wrote:
>> >> >>> > hi guys,
>> >> >>> >
>> 

Re: splitting columns into new columns

2017-07-17 Thread ayan guha
Hi

Please use explode, which is written to solve exactly your problem.

Consider below:

>>> s = ["ERN~58XX7~^EPN~5X551~|1000"]

>>> df = sc.parallelize(s).map(lambda t: t.split('|')).toDF(['phone','id'])

>>> df.registerTempTable("t")

>>> resDF = sqlContext.sql("select id,explode(phone) phones from (select
id, split(phone,'~') as phone from t) x")

>>> resDF.show(truncate=False)

++-+

|id  |phones   |

++-+

|1000|ERN  |

|1000|58XX7|

|1000|^EPN |

|1000|5X551|

|1000| |

++-+

HTH.



On Tue, Jul 18, 2017 at 3:15 AM, nayan sharma 
wrote:

> Hi Pralabh,
>
> Thanks for your help.
>
> val xx = columnList.map(x => x->0).toMap
> val opMap = dataFrame.rdd.flatMap { row =>
> columnList.foldLeft(xx) { case (y, col) =>
> val s = row.getAs[String](col).split("\\^").length
> if (y(col) < s)
> y.updated(col, s)
> else
> y
> }.toList
> }
>
>
> val colMaxSizeMap = opMap.groupBy(x => x._1).map(x => x._2.toList.maxBy(x
> => x._2)).collect().toMap
> val x = dataFrame.rdd.map{x =>
> val op = columnList.flatMap{ y =>
> val op = x.getAs[String](y).split("\\^")
> op++List.fill(colMaxSizeMap(y)-op.size)("")
> }
> Row.fromSeq(op)
> }
>
> val structFieldList = columnList.flatMap{colName =>
> List.range(0,colMaxSizeMap(colName),1).map{ i =>
> StructField(s"$colName"+s"$i",StringType)
> }
> }
> val schema = StructType(structFieldList)
> val data1=spark.createDataFrame(x,schema)
>
> opMap
> res13: org.apache.spark.rdd.RDD[(String, Int)]
>
> But It is failing when opMap has null value.It is throwing java.lang.
> NullPointerException
> trying to figure out.
>
> val opMap1=opMap.filter(_._2 !="")
>
> tried doing this but it is also failing with same exception.
>
> Thanks,
> Nayan
>
>
>
>
> On 17-Jul-2017, at 4:54 PM, Pralabh Kumar  wrote:
>
> Hi Nayan
>
> Please find the solution of your problem which work on spark 2.
>
> val spark = SparkSession.builder().appName("practice").
> enableHiveSupport().getOrCreate()
>   val sc = spark.sparkContext
>   val sqlContext = spark.sqlContext
>   import spark.implicits._
>   val dataFrame = sc.parallelize(List("ERN~58XX7~^EPN~5X551~|C~
> MXXX~MSO~^CAxxE~~3XXX5"))
>   .map(s=>s.split("\\|")).map(s=>(s(0),s(1)))
> .toDF("phone","contact")
>   dataFrame.show()
>   val newDataSet= dataFrame.rdd.map(data=>{
> val  t1 =  ArrayBuffer[String] ()
> for (i <- 0.to(1)) {
>   val col = data.get(i).asInstanceOf[String]
>   val dd= col.split("\\^").toSeq
>   for(col<-dd){
> t1 +=(col)
>   }
> }
> Row.fromSeq(t1.seq)
>   })
>
>   val firtRow = dataFrame.select("*").take(1)(0)
>   dataFrame.schema.fieldNames
>   var schema =""
>
>   for ((colNames,idx) <- dataFrame.schema.fieldNames.zipWithIndex.view) {
> val data = firtRow(idx).asInstanceOf[String].split("\\^")
> var j = 0
> for(d<-data){
>   schema = schema + colNames + j + ","
>   j = j+1
> }
>   }
>   schema=schema.substring(0,schema.length-1)
>   val sqlSchema = StructType(schema.split(",").map(s=>StructField(s,
> StringType,false)))
>   sqlContext.createDataFrame(newDataSet,sqlSchema).show()
>
> Regards
> Pralabh Kumar
>
>
> On Mon, Jul 17, 2017 at 1:55 PM, nayan sharma 
> wrote:
>
>> If I have 2-3 values in a column then I can easily separate it and create
>> new columns with withColumn option.
>> but I am trying to achieve it in loop and dynamically generate the new
>> columns as many times the ^ has occurred in column values
>>
>> Can it be achieve in this way.
>>
>> On 17-Jul-2017, at 3:29 AM, ayan guha  wrote:
>>
>> You are looking for explode function.
>>
>> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma 
>> wrote:
>>
>>> I’ve a Dataframe where in some columns there are multiple values, always
>>> separated by ^
>>>
>>> phone|contact|
>>> ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5|
>>>
>>> phone1|phone2|contact1|contact2|
>>> ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5|
>>>
>>> How can this be achieved using loop as the separator between column
>>> values
>>> are not constant.
>>> data.withColumn("phone",split($"phone","\\^")).select($"phon‌
>>> ​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
>>>  I though of doing this way but the problem is  column are having 100+
>>> separator between the column values
>>>
>>>
>>>
>>> Thank you,
>>> Nayan
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: running spark job with fat jar file

2017-07-17 Thread ayan guha
Hi

Here is my understanding:

1. For each container, there will be a local folder be created and
application jar will be copied over there
2. Jars mentioned in --jars switch will be copied over to container to the
class path of the application.

So to your question, --jars is not required to be copied over to all nodes
during submission time. YARN will take care of it.

Best
Ayan

On Tue, Jul 18, 2017 at 4:10 AM, Marcelo Vanzin  wrote:

> Yes.
>
> On Mon, Jul 17, 2017 at 10:47 AM, Mich Talebzadeh
>  wrote:
> > thanks Marcelo.
> >
> > are these files distributed through hdfs?
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
> OABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > Disclaimer: Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> The
> > author will in no case be liable for any monetary damages arising from
> such
> > loss, damage or destruction.
> >
> >
> >
> >
> > On 17 July 2017 at 18:46, Marcelo Vanzin  wrote:
> >>
> >> The YARN backend distributes all files and jars you submit with your
> >> application.
> >>
> >> On Mon, Jul 17, 2017 at 10:45 AM, Mich Talebzadeh
> >>  wrote:
> >> > thanks guys.
> >> >
> >> > just to clarify let us assume i am doing spark-submit as below:
> >> >
> >> > ${SPARK_HOME}/bin/spark-submit \
> >> > --packages ${PACKAGES} \
> >> > --driver-memory 2G \
> >> > --num-executors 2 \
> >> > --executor-memory 2G \
> >> > --executor-cores 2 \
> >> > --master yarn \
> >> > --deploy-mode client \
> >> > --conf "${SCHEDULER}" \
> >> > --conf "${EXTRAJAVAOPTIONS}" \
> >> > --jars ${JARS} \
> >> > --class "${FILE_NAME}" \
> >> > --conf "${SPARKUIPORT}" \
> >> > --conf "${SPARKDRIVERPORT}" \
> >> > --conf "${SPARKFILESERVERPORT}" \
> >> > --conf "${SPARKBLOCKMANAGERPORT}" \
> >> > --conf "${SPARKKRYOSERIALIZERBUFFERMAX}" \
> >> > ${JAR_FILE}
> >> >
> >> > The ${JAR_FILE} is the one. As I understand Spark should distribute
> that
> >> > ${JAR_FILE} to each container?
> >> >
> >> > Also --jars ${JARS} are the list of normal jar files that need to
> exist
> >> > in
> >> > the same directory on each executor node?
> >> >
> >> > cheers,
> >> >
> >> >
> >> >
> >> > Dr Mich Talebzadeh
> >> >
> >> >
> >> >
> >> > LinkedIn
> >> >
> >> > https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> >
> >> >
> >> >
> >> > http://talebzadehmich.wordpress.com
> >> >
> >> >
> >> > Disclaimer: Use it at your own risk. Any and all responsibility for
> any
> >> > loss, damage or destruction of data or any other property which may
> >> > arise
> >> > from relying on this email's technical content is explicitly
> disclaimed.
> >> > The
> >> > author will in no case be liable for any monetary damages arising from
> >> > such
> >> > loss, damage or destruction.
> >> >
> >> >
> >> >
> >> >
> >> > On 17 July 2017 at 18:18, ayan guha  wrote:
> >> >>
> >> >> Hi Mitch
> >> >>
> >> >> your jar file can be anywhere in the file system, including hdfs.
> >> >>
> >> >> If using yarn, preferably use cluster mode in terms of deployment.
> >> >>
> >> >> Yarn will distribute the jar to each container.
> >> >>
> >> >> Best
> >> >> Ayan
> >> >>
> >> >> On Tue, 18 Jul 2017 at 2:17 am, Marcelo Vanzin 
> >> >> wrote:
> >> >>>
> >> >>> Spark distributes your application jar for you.
> >> >>>
> >> >>> On Mon, Jul 17, 2017 at 8:41 AM, Mich Talebzadeh
> >> >>>  wrote:
> >> >>> > hi guys,
> >> >>> >
> >> >>> >
> >> >>> > an uber/fat jar file has been created to run with spark in CDH
> yarc
> >> >>> > client
> >> >>> > mode.
> >> >>> >
> >> >>> > As usual job is submitted to the edge node.
> >> >>> >
> >> >>> > does the jar file has to be placed in the same directory ewith
> spark
> >> >>> > is
> >> >>> > running in the cluster to make it work?
> >> >>> >
> >> >>> > Also what will happen if say out of 9 nodes running spark, 3 have
> >> >>> > not
> >> >>> > got
> >> >>> > the jar file. will that job fail or it will carry on on the
> fremaing
> >> >>> > 6
> >> >>> > nodes
> >> >>> > that have that jar file?
> >> >>> >
> >> >>> > thanks
> >> >>> >
> >> >>> > Dr Mich Talebzadeh
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> > LinkedIn
> >> >>> >
> >> >>> >
> >> >>> > https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> > 

Re: running spark job with fat jar file

2017-07-17 Thread Marcelo Vanzin
Yes.

On Mon, Jul 17, 2017 at 10:47 AM, Mich Talebzadeh
 wrote:
> thanks Marcelo.
>
> are these files distributed through hdfs?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>
>
>
> On 17 July 2017 at 18:46, Marcelo Vanzin  wrote:
>>
>> The YARN backend distributes all files and jars you submit with your
>> application.
>>
>> On Mon, Jul 17, 2017 at 10:45 AM, Mich Talebzadeh
>>  wrote:
>> > thanks guys.
>> >
>> > just to clarify let us assume i am doing spark-submit as below:
>> >
>> > ${SPARK_HOME}/bin/spark-submit \
>> > --packages ${PACKAGES} \
>> > --driver-memory 2G \
>> > --num-executors 2 \
>> > --executor-memory 2G \
>> > --executor-cores 2 \
>> > --master yarn \
>> > --deploy-mode client \
>> > --conf "${SCHEDULER}" \
>> > --conf "${EXTRAJAVAOPTIONS}" \
>> > --jars ${JARS} \
>> > --class "${FILE_NAME}" \
>> > --conf "${SPARKUIPORT}" \
>> > --conf "${SPARKDRIVERPORT}" \
>> > --conf "${SPARKFILESERVERPORT}" \
>> > --conf "${SPARKBLOCKMANAGERPORT}" \
>> > --conf "${SPARKKRYOSERIALIZERBUFFERMAX}" \
>> > ${JAR_FILE}
>> >
>> > The ${JAR_FILE} is the one. As I understand Spark should distribute that
>> > ${JAR_FILE} to each container?
>> >
>> > Also --jars ${JARS} are the list of normal jar files that need to exist
>> > in
>> > the same directory on each executor node?
>> >
>> > cheers,
>> >
>> >
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> > Disclaimer: Use it at your own risk. Any and all responsibility for any
>> > loss, damage or destruction of data or any other property which may
>> > arise
>> > from relying on this email's technical content is explicitly disclaimed.
>> > The
>> > author will in no case be liable for any monetary damages arising from
>> > such
>> > loss, damage or destruction.
>> >
>> >
>> >
>> >
>> > On 17 July 2017 at 18:18, ayan guha  wrote:
>> >>
>> >> Hi Mitch
>> >>
>> >> your jar file can be anywhere in the file system, including hdfs.
>> >>
>> >> If using yarn, preferably use cluster mode in terms of deployment.
>> >>
>> >> Yarn will distribute the jar to each container.
>> >>
>> >> Best
>> >> Ayan
>> >>
>> >> On Tue, 18 Jul 2017 at 2:17 am, Marcelo Vanzin 
>> >> wrote:
>> >>>
>> >>> Spark distributes your application jar for you.
>> >>>
>> >>> On Mon, Jul 17, 2017 at 8:41 AM, Mich Talebzadeh
>> >>>  wrote:
>> >>> > hi guys,
>> >>> >
>> >>> >
>> >>> > an uber/fat jar file has been created to run with spark in CDH yarc
>> >>> > client
>> >>> > mode.
>> >>> >
>> >>> > As usual job is submitted to the edge node.
>> >>> >
>> >>> > does the jar file has to be placed in the same directory ewith spark
>> >>> > is
>> >>> > running in the cluster to make it work?
>> >>> >
>> >>> > Also what will happen if say out of 9 nodes running spark, 3 have
>> >>> > not
>> >>> > got
>> >>> > the jar file. will that job fail or it will carry on on the fremaing
>> >>> > 6
>> >>> > nodes
>> >>> > that have that jar file?
>> >>> >
>> >>> > thanks
>> >>> >
>> >>> > Dr Mich Talebzadeh
>> >>> >
>> >>> >
>> >>> >
>> >>> > LinkedIn
>> >>> >
>> >>> >
>> >>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >>> >
>> >>> >
>> >>> >
>> >>> > http://talebzadehmich.wordpress.com
>> >>> >
>> >>> >
>> >>> > Disclaimer: Use it at your own risk. Any and all responsibility for
>> >>> > any
>> >>> > loss, damage or destruction of data or any other property which may
>> >>> > arise
>> >>> > from relying on this email's technical content is explicitly
>> >>> > disclaimed. The
>> >>> > author will in no case be liable for any monetary damages arising
>> >>> > from
>> >>> > such
>> >>> > loss, damage or destruction.
>> >>> >
>> >>> >
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Marcelo
>> >>>
>> >>> -
>> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>>
>> >> --
>> >> Best Regards,
>> >> Ayan Guha
>> >
>> >
>>
>>
>>
>> --
>> Marcelo
>
>



-- 
Marcelo


Re: running spark job with fat jar file

2017-07-17 Thread Mich Talebzadeh
thanks Marcelo.

are these files distributed through hdfs?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 17 July 2017 at 18:46, Marcelo Vanzin  wrote:

> The YARN backend distributes all files and jars you submit with your
> application.
>
> On Mon, Jul 17, 2017 at 10:45 AM, Mich Talebzadeh
>  wrote:
> > thanks guys.
> >
> > just to clarify let us assume i am doing spark-submit as below:
> >
> > ${SPARK_HOME}/bin/spark-submit \
> > --packages ${PACKAGES} \
> > --driver-memory 2G \
> > --num-executors 2 \
> > --executor-memory 2G \
> > --executor-cores 2 \
> > --master yarn \
> > --deploy-mode client \
> > --conf "${SCHEDULER}" \
> > --conf "${EXTRAJAVAOPTIONS}" \
> > --jars ${JARS} \
> > --class "${FILE_NAME}" \
> > --conf "${SPARKUIPORT}" \
> > --conf "${SPARKDRIVERPORT}" \
> > --conf "${SPARKFILESERVERPORT}" \
> > --conf "${SPARKBLOCKMANAGERPORT}" \
> > --conf "${SPARKKRYOSERIALIZERBUFFERMAX}" \
> > ${JAR_FILE}
> >
> > The ${JAR_FILE} is the one. As I understand Spark should distribute that
> > ${JAR_FILE} to each container?
> >
> > Also --jars ${JARS} are the list of normal jar files that need to exist
> in
> > the same directory on each executor node?
> >
> > cheers,
> >
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
> OABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > Disclaimer: Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> The
> > author will in no case be liable for any monetary damages arising from
> such
> > loss, damage or destruction.
> >
> >
> >
> >
> > On 17 July 2017 at 18:18, ayan guha  wrote:
> >>
> >> Hi Mitch
> >>
> >> your jar file can be anywhere in the file system, including hdfs.
> >>
> >> If using yarn, preferably use cluster mode in terms of deployment.
> >>
> >> Yarn will distribute the jar to each container.
> >>
> >> Best
> >> Ayan
> >>
> >> On Tue, 18 Jul 2017 at 2:17 am, Marcelo Vanzin 
> >> wrote:
> >>>
> >>> Spark distributes your application jar for you.
> >>>
> >>> On Mon, Jul 17, 2017 at 8:41 AM, Mich Talebzadeh
> >>>  wrote:
> >>> > hi guys,
> >>> >
> >>> >
> >>> > an uber/fat jar file has been created to run with spark in CDH yarc
> >>> > client
> >>> > mode.
> >>> >
> >>> > As usual job is submitted to the edge node.
> >>> >
> >>> > does the jar file has to be placed in the same directory ewith spark
> is
> >>> > running in the cluster to make it work?
> >>> >
> >>> > Also what will happen if say out of 9 nodes running spark, 3 have not
> >>> > got
> >>> > the jar file. will that job fail or it will carry on on the fremaing
> 6
> >>> > nodes
> >>> > that have that jar file?
> >>> >
> >>> > thanks
> >>> >
> >>> > Dr Mich Talebzadeh
> >>> >
> >>> >
> >>> >
> >>> > LinkedIn
> >>> >
> >>> > https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >>> >
> >>> >
> >>> >
> >>> > http://talebzadehmich.wordpress.com
> >>> >
> >>> >
> >>> > Disclaimer: Use it at your own risk. Any and all responsibility for
> any
> >>> > loss, damage or destruction of data or any other property which may
> >>> > arise
> >>> > from relying on this email's technical content is explicitly
> >>> > disclaimed. The
> >>> > author will in no case be liable for any monetary damages arising
> from
> >>> > such
> >>> > loss, damage or destruction.
> >>> >
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> Marcelo
> >>>
> >>> -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>
> >> --
> >> Best Regards,
> >> Ayan Guha
> >
> >
>
>
>
> --
> Marcelo
>


Re: running spark job with fat jar file

2017-07-17 Thread Marcelo Vanzin
The YARN backend distributes all files and jars you submit with your
application.

On Mon, Jul 17, 2017 at 10:45 AM, Mich Talebzadeh
 wrote:
> thanks guys.
>
> just to clarify let us assume i am doing spark-submit as below:
>
> ${SPARK_HOME}/bin/spark-submit \
> --packages ${PACKAGES} \
> --driver-memory 2G \
> --num-executors 2 \
> --executor-memory 2G \
> --executor-cores 2 \
> --master yarn \
> --deploy-mode client \
> --conf "${SCHEDULER}" \
> --conf "${EXTRAJAVAOPTIONS}" \
> --jars ${JARS} \
> --class "${FILE_NAME}" \
> --conf "${SPARKUIPORT}" \
> --conf "${SPARKDRIVERPORT}" \
> --conf "${SPARKFILESERVERPORT}" \
> --conf "${SPARKBLOCKMANAGERPORT}" \
> --conf "${SPARKKRYOSERIALIZERBUFFERMAX}" \
> ${JAR_FILE}
>
> The ${JAR_FILE} is the one. As I understand Spark should distribute that
> ${JAR_FILE} to each container?
>
> Also --jars ${JARS} are the list of normal jar files that need to exist in
> the same directory on each executor node?
>
> cheers,
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>
>
>
> On 17 July 2017 at 18:18, ayan guha  wrote:
>>
>> Hi Mitch
>>
>> your jar file can be anywhere in the file system, including hdfs.
>>
>> If using yarn, preferably use cluster mode in terms of deployment.
>>
>> Yarn will distribute the jar to each container.
>>
>> Best
>> Ayan
>>
>> On Tue, 18 Jul 2017 at 2:17 am, Marcelo Vanzin 
>> wrote:
>>>
>>> Spark distributes your application jar for you.
>>>
>>> On Mon, Jul 17, 2017 at 8:41 AM, Mich Talebzadeh
>>>  wrote:
>>> > hi guys,
>>> >
>>> >
>>> > an uber/fat jar file has been created to run with spark in CDH yarc
>>> > client
>>> > mode.
>>> >
>>> > As usual job is submitted to the edge node.
>>> >
>>> > does the jar file has to be placed in the same directory ewith spark is
>>> > running in the cluster to make it work?
>>> >
>>> > Also what will happen if say out of 9 nodes running spark, 3 have not
>>> > got
>>> > the jar file. will that job fail or it will carry on on the fremaing 6
>>> > nodes
>>> > that have that jar file?
>>> >
>>> > thanks
>>> >
>>> > Dr Mich Talebzadeh
>>> >
>>> >
>>> >
>>> > LinkedIn
>>> >
>>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> >
>>> >
>>> >
>>> > http://talebzadehmich.wordpress.com
>>> >
>>> >
>>> > Disclaimer: Use it at your own risk. Any and all responsibility for any
>>> > loss, damage or destruction of data or any other property which may
>>> > arise
>>> > from relying on this email's technical content is explicitly
>>> > disclaimed. The
>>> > author will in no case be liable for any monetary damages arising from
>>> > such
>>> > loss, damage or destruction.
>>> >
>>> >
>>>
>>>
>>>
>>> --
>>> Marcelo
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>> --
>> Best Regards,
>> Ayan Guha
>
>



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: running spark job with fat jar file

2017-07-17 Thread Mich Talebzadeh
thanks guys.

just to clarify let us assume i am doing spark-submit as below:

${SPARK_HOME}/bin/spark-submit \
--packages ${PACKAGES} \
--driver-memory 2G \
--num-executors 2 \
--executor-memory 2G \
--executor-cores 2 \
--master yarn \
--deploy-mode client \
--conf "${SCHEDULER}" \
--conf "${EXTRAJAVAOPTIONS}" \
--jars ${JARS} \
--class "${FILE_NAME}" \
--conf "${SPARKUIPORT}" \
--conf "${SPARKDRIVERPORT}" \
--conf "${SPARKFILESERVERPORT}" \
--conf "${SPARKBLOCKMANAGERPORT}" \
--conf "${SPARKKRYOSERIALIZERBUFFERMAX}" \
*${JAR_FILE}*

The* ${JAR_FILE}* is the one. As I understand Spark should distribute that
${JAR_FILE} to each container?

Also --jars ${JARS} are the list of normal jar files that need to exist in
the same directory on each executor node?

cheers,



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 17 July 2017 at 18:18, ayan guha  wrote:

> Hi Mitch
>
> your jar file can be anywhere in the file system, including hdfs.
>
> If using yarn, preferably use cluster mode in terms of deployment.
>
> Yarn will distribute the jar to each container.
>
> Best
> Ayan
>
> On Tue, 18 Jul 2017 at 2:17 am, Marcelo Vanzin 
> wrote:
>
>> Spark distributes your application jar for you.
>>
>> On Mon, Jul 17, 2017 at 8:41 AM, Mich Talebzadeh
>>  wrote:
>> > hi guys,
>> >
>> >
>> > an uber/fat jar file has been created to run with spark in CDH yarc
>> client
>> > mode.
>> >
>> > As usual job is submitted to the edge node.
>> >
>> > does the jar file has to be placed in the same directory ewith spark is
>> > running in the cluster to make it work?
>> >
>> > Also what will happen if say out of 9 nodes running spark, 3 have not
>> got
>> > the jar file. will that job fail or it will carry on on the fremaing 6
>> nodes
>> > that have that jar file?
>> >
>> > thanks
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
>> OABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> > Disclaimer: Use it at your own risk. Any and all responsibility for any
>> > loss, damage or destruction of data or any other property which may
>> arise
>> > from relying on this email's technical content is explicitly
>> disclaimed. The
>> > author will in no case be liable for any monetary damages arising from
>> such
>> > loss, damage or destruction.
>> >
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: running spark job with fat jar file

2017-07-17 Thread ayan guha
Hi Mitch

your jar file can be anywhere in the file system, including hdfs.

If using yarn, preferably use cluster mode in terms of deployment.

Yarn will distribute the jar to each container.

Best
Ayan

On Tue, 18 Jul 2017 at 2:17 am, Marcelo Vanzin  wrote:

> Spark distributes your application jar for you.
>
> On Mon, Jul 17, 2017 at 8:41 AM, Mich Talebzadeh
>  wrote:
> > hi guys,
> >
> >
> > an uber/fat jar file has been created to run with spark in CDH yarc
> client
> > mode.
> >
> > As usual job is submitted to the edge node.
> >
> > does the jar file has to be placed in the same directory ewith spark is
> > running in the cluster to make it work?
> >
> > Also what will happen if say out of 9 nodes running spark, 3 have not got
> > the jar file. will that job fail or it will carry on on the fremaing 6
> nodes
> > that have that jar file?
> >
> > thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > Disclaimer: Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> The
> > author will in no case be liable for any monetary damages arising from
> such
> > loss, damage or destruction.
> >
> >
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: splitting columns into new columns

2017-07-17 Thread nayan sharma
Hi Pralabh,

Thanks for your help.

val xx = columnList.map(x => x->0).toMap
val opMap = dataFrame.rdd.flatMap { row =>
columnList.foldLeft(xx) { case (y, col) =>
val s = row.getAs[String](col).split("\\^").length
if (y(col) < s)
y.updated(col, s)
else
y
}.toList
}


val colMaxSizeMap = opMap.groupBy(x => x._1).map(x => x._2.toList.maxBy(x => 
x._2)).collect().toMap
val x = dataFrame.rdd.map{x =>
val op = columnList.flatMap{ y =>
val op = x.getAs[String](y).split("\\^")
op++List.fill(colMaxSizeMap(y)-op.size)("")
}
Row.fromSeq(op)
}

val structFieldList = columnList.flatMap{colName =>
List.range(0,colMaxSizeMap(colName),1).map{ i =>
StructField(s"$colName"+s"$i",StringType)
}
}
val schema = StructType(structFieldList)
val data1=spark.createDataFrame(x,schema)

opMap
res13: org.apache.spark.rdd.RDD[(String, Int)]

But It is failing when opMap has null value.It is throwing 
java.lang.NullPointerException
trying to figure out.

val opMap1=opMap.filter(_._2 !="")

tried doing this but it is also failing with same exception.

Thanks,
Nayan




> On 17-Jul-2017, at 4:54 PM, Pralabh Kumar  wrote:
> 
> Hi Nayan
> 
> Please find the solution of your problem which work on spark 2.
> 
> val spark = 
> SparkSession.builder().appName("practice").enableHiveSupport().getOrCreate()
>   val sc = spark.sparkContext
>   val sqlContext = spark.sqlContext
>   import spark.implicits._
>   val dataFrame = 
> sc.parallelize(List("ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5"))
>   .map(s=>s.split("\\|")).map(s=>(s(0),s(1)))
> .toDF("phone","contact")
>   dataFrame.show()
>   val newDataSet= dataFrame.rdd.map(data=>{
> val  t1 =  ArrayBuffer[String] ()
> for (i <- 0.to (1)) {
>   val col = data.get(i).asInstanceOf[String]
>   val dd= col.split("\\^").toSeq
>   for(col<-dd){
> t1 +=(col)
>   }
> }
> Row.fromSeq(t1.seq)
>   })
> 
>   val firtRow = dataFrame.select("*").take(1)(0)
>   dataFrame.schema.fieldNames
>   var schema =""
> 
>   for ((colNames,idx) <- dataFrame.schema.fieldNames.zipWithIndex.view) {
> val data = firtRow(idx).asInstanceOf[String].split("\\^")
> var j = 0
> for(d<-data){
>   schema = schema + colNames + j + ","
>   j = j+1
> }
>   }
>   schema=schema.substring(0,schema.length-1)
>   val sqlSchema = 
> StructType(schema.split(",").map(s=>StructField(s,StringType,false)))
>   sqlContext.createDataFrame(newDataSet,sqlSchema).show()
> 
> Regards
> Pralabh Kumar
> 
> 
> On Mon, Jul 17, 2017 at 1:55 PM, nayan sharma  > wrote:
> If I have 2-3 values in a column then I can easily separate it and create new 
> columns with withColumn option.
> but I am trying to achieve it in loop and dynamically generate the new 
> columns as many times the ^ has occurred in column values
> 
> Can it be achieve in this way.
> 
>> On 17-Jul-2017, at 3:29 AM, ayan guha > > wrote:
>> 
>> You are looking for explode function.
>> 
>> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma > > wrote:
>> I’ve a Dataframe where in some columns there are multiple values, always 
>> separated by ^
>> 
>> phone|contact|
>> ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5|
>> 
>> phone1|phone2|contact1|contact2| 
>> ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5|
>> How can this be achieved using loop as the separator between column values
>> are not constant.
>> 
>> data.withColumn("phone",split($"phone","\\^")).select($"phon‌​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
>>  I though of doing this way but the problem is  column are having 100+ 
>> separator between the column values
>> 
>> 
>> 
>> Thank you,
>> Nayan
>> -- 
>> Best Regards,
>> Ayan Guha
> 
> 



Re: [ANNOUNCE] Announcing Apache Spark 2.2.0

2017-07-17 Thread kant kodali
+1

On Tue, Jul 11, 2017 at 3:56 PM, Jean Georges Perrin  wrote:

> Awesome! Congrats! Can't wait!!
>
> jg
>
>
> On Jul 11, 2017, at 18:48, Michael Armbrust 
> wrote:
>
> Hi all,
>
> Apache Spark 2.2.0 is the third release of the Spark 2.x line. This
> release removes the experimental tag from Structured Streaming. In
> addition, this release focuses on usability, stability, and polish,
> resolving over 1100 tickets.
>
> We'd like to thank our contributors and users for their contributions and
> early feedback to this release. This release would not have been possible
> without you.
>
> To download Spark 2.2.0, head over to the download page:
> http://spark.apache.org/downloads.html
>
> To view the release notes: https://spark.apache.or
> g/releases/spark-release-2-2-0.html
>
> *(note: If you see any issues with the release notes, webpage or published
> artifacts, please contact me directly off-list) *
>
> Michael
>
>


Re: running spark job with fat jar file

2017-07-17 Thread Marcelo Vanzin
Spark distributes your application jar for you.

On Mon, Jul 17, 2017 at 8:41 AM, Mich Talebzadeh
 wrote:
> hi guys,
>
>
> an uber/fat jar file has been created to run with spark in CDH yarc client
> mode.
>
> As usual job is submitted to the edge node.
>
> does the jar file has to be placed in the same directory ewith spark is
> running in the cluster to make it work?
>
> Also what will happen if say out of 9 nodes running spark, 3 have not got
> the jar file. will that job fail or it will carry on on the fremaing 6 nodes
> that have that jar file?
>
> thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Reading Hive tables Parallel in Spark

2017-07-17 Thread Fretz Nuson
I was getting NullPointerException when trying to call SparkSQL from
foreach. After debugging, i got to know spark session is not available in
executor and could not successfully pass it.

What i am doing is  tablesRDD.foreach.collect() and it works but goes
sequential

On Mon, Jul 17, 2017 at 5:58 PM, Simon Kitching <
simon.kitch...@unbelievable-machine.com> wrote:

> Have you tried simply making a list with your tables in it, then using
> SparkContext.makeRDD(Seq)? ie
>
> val tablenames = List("table1", "table2", "table3", ...)
> val tablesRDD = sc.makeRDD(tablenames, nParallelTasks)
> tablesRDD.foreach()
>
> > Am 17.07.2017 um 14:12 schrieb FN :
> >
> > Hi
> > I am currently trying to parallelize reading multiple tables from Hive .
> As
> > part of an archival framework, i need to convert few hundred tables which
> > are in txt format to Parquet. For now i am calling a Spark SQL inside a
> for
> > loop for conversion. But this execute sequential and entire process takes
> > longer time to finish.
> >
> > I tired  submitting 4 different Spark jobs ( each having set of tables
> to be
> > converted), it did give me some parallelism , but i would like to do
> this in
> > single Spark job due to few limitation of our cluster and process
> >
> > Any help will be greatly appreciated
> >
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Reading-Hive-tables-Parallel-in-Spark-tp28869.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>


Re: Reading Hive tables Parallel in Spark

2017-07-17 Thread Fretz Nuson
I did threading but got many failed tasks and they were not reprocessed. I
am guessing driver lost track of threaded tasks. I had also tired Future
and par of scala and same result as above

On Mon, Jul 17, 2017 at 5:56 PM, Pralabh Kumar 
wrote:

> Run the spark context in multithreaded way .
>
> Something like this
>
> val spark =  SparkSession.builder()
>   .appName("practice")
>   .config("spark.scheduler.mode","FAIR")
>   .enableHiveSupport().getOrCreate()
> val sc = spark.sparkContext
> val hc = spark.sqlContext
>
>
> val thread1 = new Thread {
>  override def run {
>hc.sql("select * from table1")
>  }
>}
>
>val thread2 = new Thread {
>  override def run {
>hc.sql("select * from table2")
>  }
>}
>
>thread1.start()
>thread2.start()
>
>
>
> On Mon, Jul 17, 2017 at 5:42 PM, FN  wrote:
>
>> Hi
>> I am currently trying to parallelize reading multiple tables from Hive .
>> As
>> part of an archival framework, i need to convert few hundred tables which
>> are in txt format to Parquet. For now i am calling a Spark SQL inside a
>> for
>> loop for conversion. But this execute sequential and entire process takes
>> longer time to finish.
>>
>> I tired  submitting 4 different Spark jobs ( each having set of tables to
>> be
>> converted), it did give me some parallelism , but i would like to do this
>> in
>> single Spark job due to few limitation of our cluster and process
>>
>> Any help will be greatly appreciated
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Reading-Hive-tables-Parallel-in-Spark-tp28869.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


running spark job with fat jar file

2017-07-17 Thread Mich Talebzadeh
hi guys,


an uber/fat jar file has been created to run with spark in CDH yarc client
mode.

As usual job is submitted to the edge node.

does the jar file has to be placed in the same directory ewith spark is
running in the cluster to make it work?

Also what will happen if say out of 9 nodes running spark, 3 have not got
the jar file. will that job fail or it will carry on on the fremaing 6
nodes that have that jar file?

thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


?????? Spark 2.1.1 Error:java.lang.NoSuchMethodError: org.apache.spark.network.client.TransportClient.getChannel()Lio/netty/channel/Channel;

2017-07-17 Thread ????
Thanks for your reply. Can you describe it in more detail? Which dependency 
mismatch?
It works well sometimes, but somtimes fails becaus of error 'NoSuchMethodError'.


Thanks.




--  --
??: "vaquar khan";;
: 2017??7??17??(??) 9:38
??: ""<441586...@qq.com>; 
: "User"; 
: Re: Spark 2.1.1 Error:java.lang.NoSuchMethodError: 
org.apache.spark.network.client.TransportClient.getChannel()Lio/netty/channel/Channel;



Following error we are getting because of dependency mismatch.

Regards, 
vaquar khan 


On Jul 17, 2017 3:50 AM, "zzcclp" <441586...@qq.com> wrote:
Hi guys:
   I am using spark 2.1.1 to test on CDH 5.7.1, when i run on yarn with
 following command, error 'NoSuchMethodError:
 
org.apache.spark.network.client.TransportClient.getChannel()Lio/netty/channel/Channel;'
 appears sometimes:
 
   command:
   *su cloudera-scm -s "/bin/sh" -c "/opt/spark2/bin/spark-shell --master
 yarn --deploy-mode client --files
 /opt/spark2/conf/log4j_all.properties#log4j.properties --driver-memory 8g
 --num-executors 2 --executor-memory 8g --executor-cores 5
 --driver-library-path :/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
 --driver-class-path /opt/spark2/libs/mysql-connector-java-5.1.36.jar --jars
 /opt/spark2/libs/mysql-connector-java-5.1.36.jar " *
 
   error messages:
   2017-07-17 17:15:25,255 - WARN -
 
org.apache.spark.network.server.TransportChannelHandler.exceptionCaught(TransportChannelHandler.java:78)
 - rpc-client-1-1 -Exception in connection from /ip:60099
 java.lang.NoSuchMethodError:
 
org.apache.spark.network.client.TransportClient.getChannel()Lio/netty/channel/Channel;
 at
 org.apache.spark.rpc.netty.NettyRpcHandler.channelActive(NettyRpcEnv.scala:614)
 at
 
org.apache.spark.network.server.TransportRequestHandler.channelActive(TransportRequestHandler.java:87)
 at
 
org.apache.spark.network.server.TransportChannelHandler.channelActive(TransportChannelHandler.java:88)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:219)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:205)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:198)
 at
 
org.spark_project.io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64)
 at
 
org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelActive(IdleStateHandler.java:251)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:219)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:205)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:198)
 at
 
org.spark_project.io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:219)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:205)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:198)
 at
 
org.spark_project.io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:219)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:205)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:198)
 at
 
org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1282)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:219)
 at
 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:205)
 at
 
org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:887)
 at
 
org.spark_project.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:262)
 at
 

Re: Spark 2.1.1 Error:java.lang.NoSuchMethodError: org.apache.spark.network.client.TransportClient.getChannel()Lio/netty/channel/Channel;

2017-07-17 Thread vaquar khan
Following error we are getting because of dependency mismatch.

Regards,
vaquar khan

On Jul 17, 2017 3:50 AM, "zzcclp" <441586...@qq.com> wrote:

Hi guys:
  I am using spark 2.1.1 to test on CDH 5.7.1, when i run on yarn with
following command, error 'NoSuchMethodError:
org.apache.spark.network.client.TransportClient.
getChannel()Lio/netty/channel/Channel;'
appears sometimes:

  command:
  *su cloudera-scm -s "/bin/sh" -c "/opt/spark2/bin/spark-shell --master
yarn --deploy-mode client --files
/opt/spark2/conf/log4j_all.properties#log4j.properties --driver-memory 8g
--num-executors 2 --executor-memory 8g --executor-cores 5
--driver-library-path :/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
--driver-class-path /opt/spark2/libs/mysql-connector-java-5.1.36.jar --jars
/opt/spark2/libs/mysql-connector-java-5.1.36.jar " *

  error messages:
  2017-07-17 17:15:25,255 - WARN -
org.apache.spark.network.server.TransportChannelHandler.exceptionCaught(
TransportChannelHandler.java:78)
- rpc-client-1-1 -Exception in connection from /ip:60099
java.lang.NoSuchMethodError:
org.apache.spark.network.client.TransportClient.
getChannel()Lio/netty/channel/Channel;
at
org.apache.spark.rpc.netty.NettyRpcHandler.channelActive(
NettyRpcEnv.scala:614)
at
org.apache.spark.network.server.TransportRequestHandler.channelActive(
TransportRequestHandler.java:87)
at
org.apache.spark.network.server.TransportChannelHandler.channelActive(
TransportChannelHandler.java:88)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
invokeChannelActive(AbstractChannelHandlerContext.java:219)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
invokeChannelActive(AbstractChannelHandlerContext.java:205)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
fireChannelActive(AbstractChannelHandlerContext.java:198)
at
org.spark_project.io.netty.channel.ChannelInboundHandlerAdapter.
channelActive(ChannelInboundHandlerAdapter.java:64)
at
org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelActive(
IdleStateHandler.java:251)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
invokeChannelActive(AbstractChannelHandlerContext.java:219)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
invokeChannelActive(AbstractChannelHandlerContext.java:205)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
fireChannelActive(AbstractChannelHandlerContext.java:198)
at
org.spark_project.io.netty.channel.ChannelInboundHandlerAdapter.
channelActive(ChannelInboundHandlerAdapter.java:64)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
invokeChannelActive(AbstractChannelHandlerContext.java:219)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
invokeChannelActive(AbstractChannelHandlerContext.java:205)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
fireChannelActive(AbstractChannelHandlerContext.java:198)
at
org.spark_project.io.netty.channel.ChannelInboundHandlerAdapter.
channelActive(ChannelInboundHandlerAdapter.java:64)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
invokeChannelActive(AbstractChannelHandlerContext.java:219)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
invokeChannelActive(AbstractChannelHandlerContext.java:205)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
fireChannelActive(AbstractChannelHandlerContext.java:198)
at
org.spark_project.io.netty.channel.DefaultChannelPipeline$
HeadContext.channelActive(DefaultChannelPipeline.java:1282)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
invokeChannelActive(AbstractChannelHandlerContext.java:219)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.
invokeChannelActive(AbstractChannelHandlerContext.java:205)
at
org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelActive(
DefaultChannelPipeline.java:887)
at
org.spark_project.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.
fulfillConnectPromise(AbstractNioChannel.java:262)
at
org.spark_project.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.
finishConnect(AbstractNioChannel.java:292)
at
org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(
NioEventLoop.java:640)
at
org.spark_project.io.netty.channel.nio.NioEventLoop.
processSelectedKeysOptimized(NioEventLoop.java:575)
at
org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(
NioEventLoop.java:489)
at
org.spark_project.io.netty.channel.nio.NioEventLoop.run(
NioEventLoop.java:451)
at
org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$2.
run(SingleThreadEventExecutor.java:140)
at

Re: Reading Hive tables Parallel in Spark

2017-07-17 Thread Rick Moritz
Put your jobs into a parallel collection using .par -- then you can submit
them very easily to Spark, using .foreach. The jobs will then run using the
FIFO scheduler in Spark.

The advantage over the prior approaches are, that you won't have to deal
with Threads, and that you can leave parallelism completely to Spark.

On Mon, Jul 17, 2017 at 2:28 PM, Simon Kitching <
simon.kitch...@unbelievable-machine.com> wrote:

> Have you tried simply making a list with your tables in it, then using
> SparkContext.makeRDD(Seq)? ie
>
> val tablenames = List("table1", "table2", "table3", ...)
> val tablesRDD = sc.makeRDD(tablenames, nParallelTasks)
> tablesRDD.foreach()
>
> > Am 17.07.2017 um 14:12 schrieb FN :
> >
> > Hi
> > I am currently trying to parallelize reading multiple tables from Hive .
> As
> > part of an archival framework, i need to convert few hundred tables which
> > are in txt format to Parquet. For now i am calling a Spark SQL inside a
> for
> > loop for conversion. But this execute sequential and entire process takes
> > longer time to finish.
> >
> > I tired  submitting 4 different Spark jobs ( each having set of tables
> to be
> > converted), it did give me some parallelism , but i would like to do
> this in
> > single Spark job due to few limitation of our cluster and process
> >
> > Any help will be greatly appreciated
> >
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Reading-Hive-tables-Parallel-in-Spark-tp28869.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Reading Hive tables Parallel in Spark

2017-07-17 Thread Simon Kitching
Have you tried simply making a list with your tables in it, then using 
SparkContext.makeRDD(Seq)? ie

val tablenames = List("table1", "table2", "table3", ...)
val tablesRDD = sc.makeRDD(tablenames, nParallelTasks)
tablesRDD.foreach()

> Am 17.07.2017 um 14:12 schrieb FN :
> 
> Hi
> I am currently trying to parallelize reading multiple tables from Hive . As
> part of an archival framework, i need to convert few hundred tables which
> are in txt format to Parquet. For now i am calling a Spark SQL inside a for
> loop for conversion. But this execute sequential and entire process takes
> longer time to finish.
> 
> I tired  submitting 4 different Spark jobs ( each having set of tables to be
> converted), it did give me some parallelism , but i would like to do this in
> single Spark job due to few limitation of our cluster and process
> 
> Any help will be greatly appreciated 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Reading-Hive-tables-Parallel-in-Spark-tp28869.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how to identify the alive master spark via Zookeeper ?

2017-07-17 Thread Alonso Isidoro Roman
Not sure if this can help, but a quick search on stackoverflow return this

and this one

...



Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2017-07-17 14:21 GMT+02:00 :

> Hello,
>
>
>
>
>
> In our project, we have a Spark cluster with 2 master and 4 workers and
> Zookeeper decides which master is alive.
>
> We have a problem with our reverse proxy to display the Spark Web UI. The
> RP redirect on a master with IP address configured in initial configuration
> but if Zookeeper decides to change the master, our spark Web UI is not
> accessible because the IP address of master changed.
>
> We want to find dynamically which master is elected every time.
>
> We search in Internet a solution to know with Zookeeper which master is
> alive but we don’t find anything. It is possible with confd to search if
> property changed but none property is saved in Zookeeper. In folder /spark
> in Zookeeper, nothing is logged.
>
> Why Spark does not send property to Zookeeper to indicate which ip address
> or hostname is elected ? In your class ZooKeeperLeaderElectionAgent.scala,
> you logged which master is elected but perhaps it will be also a good
> solution to send a property to Zookeeper to indicate host.
>
>
>
> We already asked to Zookeeper user mailing list and they said that:
>
> “This question may be better suited for the Spark mailing lists as
> Zookeeper doesn't really "decide" which master is alive but rather provides
> a mechanism for the application to make the correct decision.”
>
>
>
> So, we think that we are not alone with this type of problem but we can’t
> find anything on Internet.
>
>
>
> Can you help us to solve this problem ?
>
> Regards,
>
> Marina
>
> _
>
> Ce message et ses pieces jointes peuvent contenir des informations 
> confidentielles ou privilegiees et ne doivent donc
> pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu 
> ce message par erreur, veuillez le signaler
> a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
> electroniques etant susceptibles d'alteration,
> Orange decline toute responsabilite si ce message a ete altere, deforme ou 
> falsifie. Merci.
>
> This message and its attachments may contain confidential or privileged 
> information that may be protected by law;
> they should not be distributed, used or copied without authorisation.
> If you have received this email in error, please notify the sender and delete 
> this message and its attachments.
> As emails may be altered, Orange is not liable for messages that have been 
> modified, changed or falsified.
> Thank you.
>
>


Re: Reading Hive tables Parallel in Spark

2017-07-17 Thread Pralabh Kumar
Run the spark context in multithreaded way .

Something like this

val spark =  SparkSession.builder()
  .appName("practice")
  .config("spark.scheduler.mode","FAIR")
  .enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
val hc = spark.sqlContext


val thread1 = new Thread {
 override def run {
   hc.sql("select * from table1")
 }
   }

   val thread2 = new Thread {
 override def run {
   hc.sql("select * from table2")
 }
   }

   thread1.start()
   thread2.start()



On Mon, Jul 17, 2017 at 5:42 PM, FN  wrote:

> Hi
> I am currently trying to parallelize reading multiple tables from Hive . As
> part of an archival framework, i need to convert few hundred tables which
> are in txt format to Parquet. For now i am calling a Spark SQL inside a for
> loop for conversion. But this execute sequential and entire process takes
> longer time to finish.
>
> I tired  submitting 4 different Spark jobs ( each having set of tables to
> be
> converted), it did give me some parallelism , but i would like to do this
> in
> single Spark job due to few limitation of our cluster and process
>
> Any help will be greatly appreciated
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Reading-Hive-tables-Parallel-in-Spark-tp28869.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Reading Hive tables Parallel in Spark

2017-07-17 Thread Matteo Cossu
Hello,
have you tried to use threads instead of the loop?

On 17 July 2017 at 14:12, FN  wrote:

> Hi
> I am currently trying to parallelize reading multiple tables from Hive . As
> part of an archival framework, i need to convert few hundred tables which
> are in txt format to Parquet. For now i am calling a Spark SQL inside a for
> loop for conversion. But this execute sequential and entire process takes
> longer time to finish.
>
> I tired  submitting 4 different Spark jobs ( each having set of tables to
> be
> converted), it did give me some parallelism , but i would like to do this
> in
> single Spark job due to few limitation of our cluster and process
>
> Any help will be greatly appreciated
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Reading-Hive-tables-Parallel-in-Spark-tp28869.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


how to identify the alive master spark via Zookeeper ?

2017-07-17 Thread marina.brunel
Hello,




In our project, we have a Spark cluster with 2 master and 4 workers and 
Zookeeper decides which master is alive.
We have a problem with our reverse proxy to display the Spark Web UI. The RP 
redirect on a master with IP address configured in initial configuration but if 
Zookeeper decides to change the master, our spark Web UI is not accessible 
because the IP address of master changed.
We want to find dynamically which master is elected every time.
We search in Internet a solution to know with Zookeeper which master is alive 
but we don't find anything. It is possible with confd to search if property 
changed but none property is saved in Zookeeper. In folder /spark in Zookeeper, 
nothing is logged.
Why Spark does not send property to Zookeeper to indicate which ip address or 
hostname is elected ? In your class ZooKeeperLeaderElectionAgent.scala, you 
logged which master is elected but perhaps it will be also a good solution to 
send a property to Zookeeper to indicate host.

We already asked to Zookeeper user mailing list and they said that:
"This question may be better suited for the Spark mailing lists as Zookeeper 
doesn't really "decide" which master is alive but rather provides a mechanism 
for the application to make the correct decision."

So, we think that we are not alone with this type of problem but we can't find 
anything on Internet.

Can you help us to solve this problem ?
Regards,
Marina

_

Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.
Thank you.



Reading Hive tables Parallel in Spark

2017-07-17 Thread FN
Hi
I am currently trying to parallelize reading multiple tables from Hive . As
part of an archival framework, i need to convert few hundred tables which
are in txt format to Parquet. For now i am calling a Spark SQL inside a for
loop for conversion. But this execute sequential and entire process takes
longer time to finish.

I tired  submitting 4 different Spark jobs ( each having set of tables to be
converted), it did give me some parallelism , but i would like to do this in
single Spark job due to few limitation of our cluster and process

Any help will be greatly appreciated 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-Hive-tables-Parallel-in-Spark-tp28869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: splitting columns into new columns

2017-07-17 Thread Pralabh Kumar
Hi Nayan

Please find the solution of your problem which work on spark 2.

val spark =
SparkSession.builder().appName("practice").enableHiveSupport().getOrCreate()
  val sc = spark.sparkContext
  val sqlContext = spark.sqlContext
  import spark.implicits._
  val dataFrame =
sc.parallelize(List("ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5"))
  .map(s=>s.split("\\|")).map(s=>(s(0),s(1)))
.toDF("phone","contact")
  dataFrame.show()
  val newDataSet= dataFrame.rdd.map(data=>{
val  t1 =  ArrayBuffer[String] ()
for (i <- 0.to(1)) {
  val col = data.get(i).asInstanceOf[String]
  val dd= col.split("\\^").toSeq
  for(col<-dd){
t1 +=(col)
  }
}
Row.fromSeq(t1.seq)
  })

  val firtRow = dataFrame.select("*").take(1)(0)
  dataFrame.schema.fieldNames
  var schema =""

  for ((colNames,idx) <- dataFrame.schema.fieldNames.zipWithIndex.view) {
val data = firtRow(idx).asInstanceOf[String].split("\\^")
var j = 0
for(d<-data){
  schema = schema + colNames + j + ","
  j = j+1
}
  }
  schema=schema.substring(0,schema.length-1)
  val sqlSchema =
StructType(schema.split(",").map(s=>StructField(s,StringType,false)))
  sqlContext.createDataFrame(newDataSet,sqlSchema).show()

Regards
Pralabh Kumar


On Mon, Jul 17, 2017 at 1:55 PM, nayan sharma 
wrote:

> If I have 2-3 values in a column then I can easily separate it and create
> new columns with withColumn option.
> but I am trying to achieve it in loop and dynamically generate the new
> columns as many times the ^ has occurred in column values
>
> Can it be achieve in this way.
>
> On 17-Jul-2017, at 3:29 AM, ayan guha  wrote:
>
> You are looking for explode function.
>
> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma 
> wrote:
>
>> I’ve a Dataframe where in some columns there are multiple values, always
>> separated by ^
>>
>> phone|contact|
>> ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5|
>>
>> phone1|phone2|contact1|contact2|
>> ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5|
>>
>> How can this be achieved using loop as the separator between column values
>> are not constant.
>> data.withColumn("phone",split($"phone","\\^")).select($"
>> phon‌​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
>>  I though of doing this way but the problem is  column are having 100+
>> separator between the column values
>>
>>
>>
>> Thank you,
>> Nayan
>>
> --
> Best Regards,
> Ayan Guha
>
>
>


Spark 2.1.1 Error:java.lang.NoSuchMethodError: org.apache.spark.network.client.TransportClient.getChannel()Lio/netty/channel/Channel;

2017-07-17 Thread zzcclp
Hi guys:
  I am using spark 2.1.1 to test on CDH 5.7.1, when i run on yarn with
following command, error 'NoSuchMethodError:
org.apache.spark.network.client.TransportClient.getChannel()Lio/netty/channel/Channel;'
appears sometimes:

  command:
  *su cloudera-scm -s "/bin/sh" -c "/opt/spark2/bin/spark-shell --master
yarn --deploy-mode client --files
/opt/spark2/conf/log4j_all.properties#log4j.properties --driver-memory 8g
--num-executors 2 --executor-memory 8g --executor-cores 5
--driver-library-path :/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
--driver-class-path /opt/spark2/libs/mysql-connector-java-5.1.36.jar --jars
/opt/spark2/libs/mysql-connector-java-5.1.36.jar " *

  error messages:
  2017-07-17 17:15:25,255 - WARN -
org.apache.spark.network.server.TransportChannelHandler.exceptionCaught(TransportChannelHandler.java:78)
- rpc-client-1-1 -Exception in connection from /ip:60099
java.lang.NoSuchMethodError:
org.apache.spark.network.client.TransportClient.getChannel()Lio/netty/channel/Channel;
at
org.apache.spark.rpc.netty.NettyRpcHandler.channelActive(NettyRpcEnv.scala:614)
at
org.apache.spark.network.server.TransportRequestHandler.channelActive(TransportRequestHandler.java:87)
at
org.apache.spark.network.server.TransportChannelHandler.channelActive(TransportChannelHandler.java:88)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:219)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:205)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:198)
at
org.spark_project.io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64)
at
org.spark_project.io.netty.handler.timeout.IdleStateHandler.channelActive(IdleStateHandler.java:251)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:219)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:205)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:198)
at
org.spark_project.io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:219)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:205)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:198)
at
org.spark_project.io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:219)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:205)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:198)
at
org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1282)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:219)
at
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:205)
at
org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:887)
at
org.spark_project.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:262)
at
org.spark_project.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:292)
at
org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:640)
at
org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at
org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at
org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at
org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at
org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
2017-07-17 17:15:25,256 - DEBUG -

RE: how to identify the alive master spark via Zookeeper ?

2017-07-17 Thread marina.brunel
Hello,

I send another time this mail, no response abpit my question.

Regards

Marina

De : BRUNEL Marina OBS/OAB
Envoyé : jeudi 13 juillet 2017 10:43
À : user@spark.apache.org
Cc : DL PINK MALIMA
Objet : how to identify the alive master spark via Zookeeper ?


Hello,




In our project, we have a Spark cluster with 2 master and 4 workers and 
Zookeeper decides which master is alive.
We have a problem with our reverse proxy to display the Spark Web UI. The RP 
redirect on a master with IP address configured in initial configuration but if 
Zookeeper decides to change the master, our spark Web UI is not accessible 
because the IP address of master changed.
We want to find dynamically which master is elected every time.
We search in Internet a solution to know with Zookeeper which master is alive 
but we don't find anything. It is possible with confd to search if property 
changed but none property is saved in Zookeeper. In folder /spark in Zookeeper, 
nothing is logged.
Why Spark does not send property to Zookeeper to indicate which ip address or 
hostname is elected ? In your class ZooKeeperLeaderElectionAgent.scala, you 
logged which master is elected but perhaps it will be also a good solution to 
send a property to Zookeeper to indicate host.

We already asked to Zookeeper user mailing list and they said that:
"This question may be better suited for the Spark mailing lists as Zookeeper 
doesn't really "decide" which master is alive but rather provides a mechanism 
for the application to make the correct decision."

So, we think that we are not alone with this type of problem but we can't find 
anything on Internet.

Can you help us to solve this problem ?
Regards,
Marina

_

Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.
Thank you.



Re: splitting columns into new columns

2017-07-17 Thread nayan sharma
If I have 2-3 values in a column then I can easily separate it and create new 
columns with withColumn option.
but I am trying to achieve it in loop and dynamically generate the new columns 
as many times the ^ has occurred in column values

Can it be achieve in this way.

> On 17-Jul-2017, at 3:29 AM, ayan guha  wrote:
> 
> You are looking for explode function.
> 
> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma  > wrote:
> I’ve a Dataframe where in some columns there are multiple values, always 
> separated by ^
> 
> phone|contact|
> ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5|
> 
> phone1|phone2|contact1|contact2| 
> ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5|
> How can this be achieved using loop as the separator between column values
> are not constant.
> 
> data.withColumn("phone",split($"phone","\\^")).select($"phon‌​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
>  I though of doing this way but the problem is  column are having 100+ 
> separator between the column values
> 
> 
> 
> Thank you,
> Nayan
> -- 
> Best Regards,
> Ayan Guha