Dynamic executor scaling spark/Kubernetes

2019-04-16 Thread purna pradeep
Hello,

Is Kubernetes Dynamic executor scaling for spark  is available in latest
release of spark

I mean scaling the executors based on the work load vs preallocating number
of executors for a spark job

Thanks,
Purna


Re: [ANNOUNCE] Announcing Apache Spark 2.4.0

2018-11-09 Thread purna pradeep
Thanks this is a great news

Can you please lemme if dynamic resource allocation is available in spark
2.4?

I’m using spark 2.3.2 on Kubernetes, do I still need to provide executor
memory options as part of spark submit command or spark will manage
required executor memory based on the spark job size ?

On Thu, Nov 8, 2018 at 2:18 PM Marcelo Vanzin 
wrote:

> +user@
>
> >> -- Forwarded message -
> >> From: Wenchen Fan 
> >> Date: Thu, Nov 8, 2018 at 10:55 PM
> >> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0
> >> To: Spark dev list 
> >>
> >>
> >> Hi all,
> >>
> >> Apache Spark 2.4.0 is the fifth release in the 2.x line. This release
> adds Barrier Execution Mode for better integration with deep learning
> frameworks, introduces 30+ built-in and higher-order functions to deal with
> complex data type easier, improves the K8s integration, along with
> experimental Scala 2.12 support. Other major updates include the built-in
> Avro data source, Image data source, flexible streaming sinks, elimination
> of the 2GB block size limitation during transfer, Pandas UDF improvements.
> In addition, this release continues to focus on usability, stability, and
> polish while resolving around 1100 tickets.
> >>
> >> We'd like to thank our contributors and users for their contributions
> and early feedback to this release. This release would not have been
> possible without you.
> >>
> >> To download Spark 2.4.0, head over to the download page:
> http://spark.apache.org/downloads.html
> >>
> >> To view the release notes:
> https://spark.apache.org/releases/spark-release-2-4-0.html
> >>
> >> Thanks,
> >> Wenchen
> >>
> >> PS: If you see any issues with the release notes, webpage or published
> artifacts, please contact me directly off-list.
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [ANNOUNCE] Announcing Apache Spark 2.4.0

2018-11-09 Thread purna pradeep
Thanks this is a great news

Can you please lemme if dynamic resource allocation is available in spark
2.4?

I’m using spark 2.3.2 on Kubernetes, do I still need to provide executor
memory options as part of spark submit command or spark will manage
required executor memory based on the spark job size ?

On Thu, Nov 8, 2018 at 2:18 PM Marcelo Vanzin 
wrote:

> +user@
>
> >> -- Forwarded message -
> >> From: Wenchen Fan 
> >> Date: Thu, Nov 8, 2018 at 10:55 PM
> >> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0
> >> To: Spark dev list 
> >>
> >>
> >> Hi all,
> >>
> >> Apache Spark 2.4.0 is the fifth release in the 2.x line. This release
> adds Barrier Execution Mode for better integration with deep learning
> frameworks, introduces 30+ built-in and higher-order functions to deal with
> complex data type easier, improves the K8s integration, along with
> experimental Scala 2.12 support. Other major updates include the built-in
> Avro data source, Image data source, flexible streaming sinks, elimination
> of the 2GB block size limitation during transfer, Pandas UDF improvements.
> In addition, this release continues to focus on usability, stability, and
> polish while resolving around 1100 tickets.
> >>
> >> We'd like to thank our contributors and users for their contributions
> and early feedback to this release. This release would not have been
> possible without you.
> >>
> >> To download Spark 2.4.0, head over to the download page:
> http://spark.apache.org/downloads.html
> >>
> >> To view the release notes:
> https://spark.apache.org/releases/spark-release-2-4-0.html
> >>
> >> Thanks,
> >> Wenchen
> >>
> >> PS: If you see any issues with the release notes, webpage or published
> artifacts, please contact me directly off-list.
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark 2.3.1: k8s driver pods stuck in Initializing state

2018-09-26 Thread purna pradeep
Hello ,


We're running spark 2.3.1 on kubernetes v1.11.0 and our driver pods from
k8s are getting stuck in initializing state like so:

NAME
READY STATUS RESTARTS   AGE

my-pod-fd79926b819d3b34b05250e23347d0e7-driver   0/1   Init:0/1   0
  18h


And from *kubectl describe pod*:

*Warning  FailedMount  9m (x128 over 4h) * kubelet, 10.47.96.167  Unable to
mount volumes for pod
"my-pod-fd79926b819d3b34b05250e23347d0e7-driver_spark(1f3aba7b-c10f-11e8-bcec-1292fec79aba)":
timeout expired waiting for volumes to attach or mount for pod
"spark"/"my-pod-fd79926b819d3b34b05250e23347d0e7-driver". list of unmounted
volumes=[spark-init-properties]. list of unattached
volumes=[spark-init-properties download-jars-volume download-files-volume
spark-token-tfpvp]
  *Warning  FailedMount  4m (x153 over 4h)  kubelet,* 10.47.96.167
MountVolume.SetUp failed for volume "spark-init-properties" : configmaps
"my-pod-fd79926b819d3b34b05250e23347d0e7-init-config" not found

>From what I can see in *kubectl get configmap* the init config map for the
driver pod isn't there.

Am I correct in assuming since the configmap isn't being created the driver
pod will never start (hence stuck in init)?

Where does the init config map come from?

Why would it not be created?


Please suggest

Thanks,
Purna


Spark 2.3.1: k8s driver pods stuck in Initializing state

2018-09-26 Thread Purna Pradeep Mamillapalli
We're running spark 2.3.1 on kubernetes v1.11.0 and our driver pods from
k8s are getting stuck in initializing state like so:

NAME
READY STATUS RESTARTS   AGE

my-pod-fd79926b819d3b34b05250e23347d0e7-driver   0/1   Init:0/1   0
  18h


And from *kubectl describe pod*:

*Warning  FailedMount  9m (x128 over 4h) * kubelet, 10.47.96.167  Unable to
mount volumes for pod
"my-pod-fd79926b819d3b34b05250e23347d0e7-driver_spark(1f3aba7b-c10f-11e8-bcec-1292fec79aba)":
timeout expired waiting for volumes to attach or mount for pod
"spark"/"my-pod-fd79926b819d3b34b05250e23347d0e7-driver". list of unmounted
volumes=[spark-init-properties]. list of unattached
volumes=[spark-init-properties download-jars-volume download-files-volume
spark-token-tfpvp]
  *Warning  FailedMount  4m (x153 over 4h)  kubelet,* 10.47.96.167
MountVolume.SetUp failed for volume "spark-init-properties" : configmaps
"my-pod-fd79926b819d3b34b05250e23347d0e7-init-config" not found

From what I can see in *kubectl get configmap* the init config map for the
driver pod isn't there.

Am I correct in assuming since the configmap isn't being created the driver
pod will never start (hence stuck in init)?

Where does the init config map come from?

Why would it not be created?

Thanks,
Christopher Carney


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes

2018-08-17 Thread purna pradeep
Resurfacing The question to get more attention

Hello,
>
> im running Spark 2.3 job on kubernetes cluster
>>
>> kubectl version
>>
>> Client Version: version.Info{Major:"1", Minor:"9",
>> GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b",
>> GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z",
>> GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"}
>>
>> Server Version: version.Info{Major:"1", Minor:"8",
>> GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd",
>> GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z",
>> GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"}
>>
>>
>>
>> when i ran spark submit on k8s master the driverpod is stuck in Waiting:
>> PodInitializing state.
>> I had to manually kill the driver pod and submit new job in this case
>> ,then it works.How this can be handled in production ?
>>
> This happens with executor pods as well
>

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128
>
>
>>
>> This is happening if i submit the jobs almost parallel ie submit 5 jobs
>> one after the other simultaneously.
>>
>> I'm running spark jobs on 20 nodes each having below configuration
>>
>> I tried kubectl describe node on the node where trhe driver pod is
>> running this is what i got ,i do see there is overcommit on resources but i
>> expected kubernetes scheduler not to schedule if resources in node are
>> overcommitted or node is in Not Ready state ,in this case node is in Ready
>> State but i observe same behaviour if node is in "Not Ready" state
>>
>>
>>
>> Name:   **
>>
>> Roles:  worker
>>
>> Labels: beta.kubernetes.io/arch=amd64
>>
>> beta.kubernetes.io/os=linux
>>
>> kubernetes.io/hostname=
>>
>> node-role.kubernetes.io/worker=true
>>
>> Annotations:node.alpha.kubernetes.io/ttl=0
>>
>>
>> volumes.kubernetes.io/controller-managed-attach-detach=true
>>
>> Taints: 
>>
>> CreationTimestamp:  Tue, 31 Jul 2018 09:59:24 -0400
>>
>> Conditions:
>>
>>   Type Status  LastHeartbeatTime
>> LastTransitionTimeReason   Message
>>
>>    --  -
>> ----   ---
>>
>>   OutOfDiskFalse   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
>> Jul 2018 09:59:24 -0400   KubeletHasSufficientDisk kubelet has
>> sufficient disk space available
>>
>>   MemoryPressure   False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
>> Jul 2018 09:59:24 -0400   KubeletHasSufficientMemory   kubelet has
>> sufficient memory available
>>
>>   DiskPressure False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
>> Jul 2018 09:59:24 -0400   KubeletHasNoDiskPressure kubelet has no disk
>> pressure
>>
>>   ReadyTrueTue, 14 Aug 2018 09:31:20 -0400   Sat, 11
>> Aug 2018 00:41:27 -0400   KubeletReady kubelet is posting
>> ready status. AppArmor enabled
>>
>> Addresses:
>>
>>   InternalIP:  *
>>
>>   Hostname:**
>>
>> Capacity:
>>
>>  cpu: 16
>>
>>  memory:  125827288Ki
>>
>>  pods:110
>>
>> Allocatable:
>>
>>  cpu: 16
>>
>>  memory:  125724888Ki
>>
>>  pods:110
>>
>> System Info:
>>
>>  Machine ID: *
>>
>>  System UUID:**
>>
>>  Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f
>>
>>  Kernel Version: 4.4.0-1062-aws
>>
>>  OS Image:   Ubuntu 16.04.4 LTS
>>
>>  Operating System:   linux
>>
>>  Architecture:   amd64
>>
>>  Container Runtime Version:  docker://Unknown
>>
>>  Kubelet Version:v1.8.3
>>
>>  Kube-Proxy Version: v1.8.3
>>
>> PodCIDR: **
>>
>> ExternalID:  **
>>
>> Non-terminated Pods: (11 in total)
>>
>>   Namespace  Name
>>CPU Requests  CPU Limits  Memory Requests  Memory
>> Limits
>>
>>   -  
>>  --  ---
>>  -
>>
>>   kube-systemcalico-node-gj5mb
>> 250m (1%) 0 (0%)  0 (0%)   0 (0%)
>>
>>   kube-system
>>  kube-proxy- 100m (0%)
>> 0 (0%)  0 (0%)   0 (0%)
>>
>>   kube-system
>>  prometheus-prometheus-node-exporter-9cntq   100m (0%)
>> 200m (1%)   30Mi (0%)50Mi (0%)
>>
>>   logging
>>  elasticsearch-elasticsearch-data-69df997486-gqcwg   400m (2%)
>> 1 (6%)  8Gi (6%) 16Gi (13%)
>>
>>   logging

Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes

2018-08-16 Thread purna pradeep
Hello,

im running Spark 2.3 job on kubernetes cluster
>
> kubectl version
>
> Client Version: version.Info{Major:"1", Minor:"9",
> GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b",
> GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z",
> GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"}
>
> Server Version: version.Info{Major:"1", Minor:"8",
> GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd",
> GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z",
> GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"}
>
>
>
> when i ran spark submit on k8s master the driver pod is stuck in Waiting:
> PodInitializing state.
> I had to manually kill the driver pod and submit new job in this case
> ,then it works.How this can be handled in production ?
>

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128


>
> This is happening if i submit the jobs almost parallel ie submit 5 jobs
> one after the other simultaneously.
>
> I'm running spark jobs on 20 nodes each having below configuration
>
> I tried kubectl describe node on the node where trhe driver pod is running
> this is what i got ,i do see there is overcommit on resources but i
> expected kubernetes scheduler not to schedule if resources in node are
> overcommitted or node is in Not Ready state ,in this case node is in Ready
> State but i observe same behaviour if node is in "Not Ready" state
>
>
>
> Name:   **
>
> Roles:  worker
>
> Labels: beta.kubernetes.io/arch=amd64
>
> beta.kubernetes.io/os=linux
>
> kubernetes.io/hostname=
>
> node-role.kubernetes.io/worker=true
>
> Annotations:node.alpha.kubernetes.io/ttl=0
>
>
> volumes.kubernetes.io/controller-managed-attach-detach=true
>
> Taints: 
>
> CreationTimestamp:  Tue, 31 Jul 2018 09:59:24 -0400
>
> Conditions:
>
>   Type Status  LastHeartbeatTime
> LastTransitionTimeReason   Message
>
>    --  -
> ----   ---
>
>   OutOfDiskFalse   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
> Jul 2018 09:59:24 -0400   KubeletHasSufficientDisk kubelet has
> sufficient disk space available
>
>   MemoryPressure   False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
> Jul 2018 09:59:24 -0400   KubeletHasSufficientMemory   kubelet has
> sufficient memory available
>
>   DiskPressure False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
> Jul 2018 09:59:24 -0400   KubeletHasNoDiskPressure kubelet has no disk
> pressure
>
>   ReadyTrueTue, 14 Aug 2018 09:31:20 -0400   Sat, 11
> Aug 2018 00:41:27 -0400   KubeletReady kubelet is posting
> ready status. AppArmor enabled
>
> Addresses:
>
>   InternalIP:  *
>
>   Hostname:**
>
> Capacity:
>
>  cpu: 16
>
>  memory:  125827288Ki
>
>  pods:110
>
> Allocatable:
>
>  cpu: 16
>
>  memory:  125724888Ki
>
>  pods:110
>
> System Info:
>
>  Machine ID: *
>
>  System UUID:**
>
>  Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f
>
>  Kernel Version: 4.4.0-1062-aws
>
>  OS Image:   Ubuntu 16.04.4 LTS
>
>  Operating System:   linux
>
>  Architecture:   amd64
>
>  Container Runtime Version:  docker://Unknown
>
>  Kubelet Version:v1.8.3
>
>  Kube-Proxy Version: v1.8.3
>
> PodCIDR: **
>
> ExternalID:  **
>
> Non-terminated Pods: (11 in total)
>
>   Namespace  Name
>CPU Requests  CPU Limits  Memory Requests  Memory
> Limits
>
>   -  
>  --  ---
>  -
>
>   kube-systemcalico-node-gj5mb
>   250m (1%) 0 (0%)  0 (0%)   0 (0%)
>
>   kube-system
>  kube-proxy- 100m (0%)
> 0 (0%)  0 (0%)   0 (0%)
>
>   kube-systemprometheus-prometheus-node-exporter-9cntq
>   100m (0%) 200m (1%)   30Mi (0%)50Mi (0%)
>
>   logging
>  elasticsearch-elasticsearch-data-69df997486-gqcwg   400m (2%)
> 1 (6%)  8Gi (6%) 16Gi (13%)
>
>   loggingfluentd-fluentd-elasticsearch-tj7nd
>   200m (1%) 0 (0%)  612Mi (0%)   0 (0%)
>
>   rook   rook-agent-6jtzm
>0 (0%)0 (0%)  0 (0%)   0 (0%)

spark driver pod stuck in Waiting: PodInitializing state in Kubernetes

2018-08-15 Thread purna pradeep
im running Spark 2.3 job on kubernetes cluster

kubectl version

Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.3",
GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", GitTreeState:"clean",
BuildDate:"2018-02-09T21:51:06Z", GoVersion:"go1.9.4", Compiler:"gc",
Platform:"darwin/amd64"}

Server Version: version.Info{Major:"1", Minor:"8", GitVersion:"v1.8.3",
GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", GitTreeState:"clean",
BuildDate:"2017-11-08T18:27:48Z", GoVersion:"go1.8.3", Compiler:"gc",
Platform:"linux/amd64"}



when i ran spark submit on k8s master the driver pod is stuck in Waiting:
PodInitializing state.
I had to manually kill the driver pod and submit new job in this case ,then
it works.


This is happening if i submit the jobs almost parallel ie submit 5 jobs one
after the other simultaneously.

I'm running spark jobs on 20 nodes each having below configuration

I tried kubectl describe node on the node where trhe driver pod is running
this is what i got ,i do see there is overcommit on resources but i
expected kubernetes scheduler not to schedule if resources in node are
overcommitted or node is in Not Ready state ,in this case node is in Ready
State but i observe same behaviour if node is in "Not Ready" state



Name:   **

Roles:  worker

Labels: beta.kubernetes.io/arch=amd64

beta.kubernetes.io/os=linux

kubernetes.io/hostname=

node-role.kubernetes.io/worker=true

Annotations:node.alpha.kubernetes.io/ttl=0


volumes.kubernetes.io/controller-managed-attach-detach=true

Taints: 

CreationTimestamp:  Tue, 31 Jul 2018 09:59:24 -0400

Conditions:

  Type Status  LastHeartbeatTime
LastTransitionTimeReason   Message

   --  -
----   ---

  OutOfDiskFalse   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
Jul 2018 09:59:24 -0400   KubeletHasSufficientDisk kubelet has
sufficient disk space available

  MemoryPressure   False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
Jul 2018 09:59:24 -0400   KubeletHasSufficientMemory   kubelet has
sufficient memory available

  DiskPressure False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
Jul 2018 09:59:24 -0400   KubeletHasNoDiskPressure kubelet has no disk
pressure

  ReadyTrueTue, 14 Aug 2018 09:31:20 -0400   Sat, 11
Aug 2018 00:41:27 -0400   KubeletReady kubelet is posting
ready status. AppArmor enabled

Addresses:

  InternalIP:  *

  Hostname:**

Capacity:

 cpu: 16

 memory:  125827288Ki

 pods:110

Allocatable:

 cpu: 16

 memory:  125724888Ki

 pods:110

System Info:

 Machine ID: *

 System UUID:**

 Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f

 Kernel Version: 4.4.0-1062-aws

 OS Image:   Ubuntu 16.04.4 LTS

 Operating System:   linux

 Architecture:   amd64

 Container Runtime Version:  docker://Unknown

 Kubelet Version:v1.8.3

 Kube-Proxy Version: v1.8.3

PodCIDR: **

ExternalID:  **

Non-terminated Pods: (11 in total)

  Namespace  Name
 CPU Requests  CPU Limits  Memory Requests  Memory
Limits

  -  
   --  ---
 -

  kube-systemcalico-node-gj5mb
  250m (1%) 0 (0%)  0 (0%)   0 (0%)

  kube-system
 kube-proxy- 100m (0%)
0 (0%)  0 (0%)   0 (0%)

  kube-systemprometheus-prometheus-node-exporter-9cntq
  100m (0%) 200m (1%)   30Mi (0%)50Mi (0%)

  logging
 elasticsearch-elasticsearch-data-69df997486-gqcwg   400m (2%)
1 (6%)  8Gi (6%) 16Gi (13%)

  loggingfluentd-fluentd-elasticsearch-tj7nd
  200m (1%) 0 (0%)  612Mi (0%)   0 (0%)

  rook   rook-agent-6jtzm
 0 (0%)0 (0%)  0 (0%)   0 (0%)

  rook
rook-ceph-osd-10-6-42-250.accel.aws-cardda.cb4good.com-gwb8j0 (0%)
   0 (0%)  0 (0%)   0 (0%)

  spark
 accelerate-test-5-a3bfb8a597e83d459193a183e17f13b5-exec-1   2 (12%)
0 (0%)  10Gi (8%)12Gi (10%)

  spark
 accelerate-testing-1-8ed0482f3bfb3c0a83da30bb7d433dff-exec-52 (12%)
0 (0%)  10Gi (8%)12Gi 

Re: Executor lost for unknown reasons error Spark 2.3 on kubernetes

2018-07-31 Thread purna pradeep
$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)

at java.lang.Thread.run(Thread.java:748)

2018-07-30 19:58:42 INFO  BlockManagerMasterEndpoint:54 - Trying to remove
executor 7 from BlockManagerMaster.

2018-07-30 19:58:42 WARN  BlockManagerMasterEndpoint:66 - No more replicas
available for rdd_9_37 !

MasterEndpoint:54 - Removing block manager BlockManagerId(7, 10.*.*.*.*,
43888, None)

2018-07-30 19:58:42 INFO  BlockManagerMaster:54 - Removed 7 successfully in
removeExecutor

2018-07-30 19:58:42 INFO  DAGScheduler:54 - Shuffle files lost for
executor: 7 (epoch 1)

2018-07-30 19:58:42 ERROR ContextCleaner:91 - Error cleaning broadcast 11

org.apache.spark.SparkException: Exception thrown in awaitResult:

at
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)

at
org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

at
org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:155)

at
org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:321)

at
org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)

at
org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)

at
org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:238)

at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$1.apply(ContextCleaner.scala:194)

at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$1.apply(ContextCleaner.scala:185)

at scala.Option.foreach(Option.scala:257)

at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:185)

at
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)

at org.apache.spark.ContextCleaner.org
<http://org.apache.spark.contextcleaner.org/>
$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)

at
org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)

Caused by: java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

at
sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

at sun.nio.ch.IOUtil.read(IOUtil.java:192)

at
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

at
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)

at
io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)

at
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)

at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)

at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)

at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)

at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)

at
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)

at
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)

at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)

at java.lang.Thread.run(Thread.java:

On Tue, Jul 31, 2018 at 8:32 AM purna pradeep 
wrote:

>
> Hello,
>>
>>
>>
>> I’m getting below error in spark driver pod logs and executor pods are
>> getting killed midway through while the job is running  and even driver pod
>> Terminated with below intermittent error ,this happens if I run multiple
>> jobs in parallel.
>>
>>
>>
>> Not able to see executor logs as executor pods are killed
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in
>> stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure
>> (executor 1 exited caused by one of the running tasks) Reason: Executor
>> lost for unknown reasons.
>>
>> Driver stacktrace:
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
>>
>> at
>> org.apache.spark.scheduler

Executor lost for unknown reasons error Spark 2.3 on kubernetes

2018-07-31 Thread purna pradeep
> Hello,
>
>
>
> I’m getting below error in spark driver pod logs and executor pods are
> getting killed midway through while the job is running  and even driver pod
> Terminated with below intermittent error ,this happens if I run multiple
> jobs in parallel.
>
>
>
> Not able to see executor logs as executor pods are killed
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 23
> in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage
> 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1
> exited caused by one of the running tasks) Reason: Executor lost for
> unknown reasons.
>
> Driver stacktrace:
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>
> at scala.Option.foreach(Option.scala:257)
>
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
>
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
>
> ... 42 mor
>


Executor lost for unknown reasons error Spark 2.3 on kubernetes

2018-07-30 Thread purna pradeep
Hello,



I’m getting below error in spark driver pod logs and executor pods are
getting killed midway through while the job is running  and even driver pod
Terminated with below intermittent error ,this happens if I run multiple
jobs in parallel.



Not able to see executor logs as executor pods are killed



org.apache.spark.SparkException: Job aborted due to stage failure: Task 23
in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage
36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1
exited caused by one of the running tasks) Reason: Executor lost for
unknown reasons.

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at scala.Option.foreach(Option.scala:257)

at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)

at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)

... 42 mor


Executor lost for unknown reasons error Spark 2.3 on kubernetes

2018-07-30 Thread Mamillapalli, Purna Pradeep
Hello,

I’m getting below error in spark driver pod logs and executor pods are getting 
killed midway through while the job is running  and even driver pod Terminated 
with below intermittent error ,this happens if I run multiple jobs in parallel.

Not able to see executor logs as executor pods are killed

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in 
stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 
(TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited 
caused by one of the running tasks) Reason: Executor lost for unknown reasons.
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
... 42 more


Thanks,
Purna


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Spark 2.3 Kubernetes error

2018-07-06 Thread purna pradeep
> Hello,
>
>
>
> When I’m trying to set below options to spark-submit command on k8s Master
> getting below error in spark-driver pod logs
>
>
>
> --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost
> -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \
>
> --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost
> -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \
>
>
>
> But when I tried to set these extraJavaoptions as system.properties in the
> spark application jar everything works fine.
>
>
>
> 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing
> SparkContext.
>
> org.apache.spark.SparkException: External scheduler cannot be instantiated
>
> at
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)
>
> at
> org.apache.spark.SparkContext.init(SparkContext.scala:492)
>
> at
> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)
>
> at
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)
>
> at
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)
>
> at scala.Option.getOrElse(Option.scala:121)
>
> at
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
>
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
> Operation: [get]  for kind: [Pod]  with name:
> [test-657e2f715ada3f91ae32c588aa178f63-driver]  in namespace: [test]
> failed.
>
> at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)
>
> at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)
>
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)
>
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)
>
> at
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70)
>
> at
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)
>
> at
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)
>
> ... 12 more
>
> Caused by: javax.net.ssl.SSLHandshakeException:
> sun.security.validator.ValidatorException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
> valid certification path to requested target
>
> at
> sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>
> at
> sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959)
>
> at
> sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)
>
> at
> sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>
> at
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
>
> at
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
>
> at
> sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
>
> at
> sun.security.ssl.Handshaker.process_record(Handshaker.java:961)
>
> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)
>
> at
> sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
>
> at
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
>
> at
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
>
> at
> okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281)
>
> at
> okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)
>
> at
> okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)
>
> at
> okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)
>
> at
> okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
>
> at
> okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
>
> at
> okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
>
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
>
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
>
> at
> okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
>
> at
> 

Spark 2.3 Kubernetes error

2018-07-05 Thread purna pradeep
Hello,



When I’m trying to set below options to spark-submit command on k8s Master
getting below error in spark-driver pod logs



--conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \

--conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \



But when I tried to set these extraJavaoptions as system.properties in the
spark application jar everything works fine.



2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext.

org.apache.spark.SparkException: External scheduler cannot be instantiated

at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)

at
org.apache.spark.SparkContext.init(SparkContext.scala:492)

at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)

at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)

at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)

at scala.Option.getOrElse(Option.scala:121)

at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)

Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
Operation: [get]  for kind: [Pod]  with name:
[test-657e2f715ada3f91ae32c588aa178f63-driver]  in namespace: [test]
failed.

at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)

at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)

at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)

at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)

at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70)

at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)

at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)

... 12 more

Caused by: javax.net.ssl.SSLHandshakeException:
sun.security.validator.ValidatorException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target

at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)

at
sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)

at
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)

at
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)

at
sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)

at
sun.security.ssl.Handshaker.process_record(Handshaker.java:961)

at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)

at
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)

at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)

at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)

at
okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281)

at
okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)

at
okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)

at
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)

at
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)

at
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)

at
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at

Spark 2.3 Kubernetes error

2018-07-05 Thread Mamillapalli, Purna Pradeep
Hello,

When I’m trying to set below options to spark-submit command on k8s Master 
getting below error in spark-driver pod logs



--conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost 
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \

--conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost 
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \


But when I tried to set these extraJavaoptions as system.properties in the 
spark application jar everything works fine.


2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext.

org.apache.spark.SparkException: External scheduler cannot be instantiated

at 
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)

at 
org.apache.spark.SparkContext.init(SparkContext.scala:492)

at 
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)

at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)

at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)

at scala.Option.getOrElse(Option.scala:121)

at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: 
[get]  for kind: [Pod]  with name: 
[test-657e2f715ada3f91ae32c588aa178f63-driver]  in namespace: [test]  failed.

at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)

at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)

at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)

at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)

at 
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70)

at 
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)

at 
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)

... 12 more

Caused by: javax.net.ssl.SSLHandshakeException: 
sun.security.validator.ValidatorException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target

at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)

at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)

at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)

at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)

at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)

at 
sun.security.ssl.Handshaker.process_record(Handshaker.java:961)

at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)

at 
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)

at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)

at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)

at 
okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281)

at 
okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)

at 
okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)

at 
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)

at 
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)

at 
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)

at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at 
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at 

Spark 2.3 driver pod stuck in Running state — Kubernetes

2018-06-08 Thread purna pradeep
Hello,

When I run spark-submit on k8s cluster I’m

Seeing driver pod stuck in Running state and when I pulled driver pod logs
I’m able to see below log

I do understand that this warning might be because of lack of cpu/ Memory ,
but I expect driver pod be in “Pending” state rather than “ Running” state
though actually it’s not Running

So I had kill the driver pod and resubmit the job

Please suggest here !

2018-06-08 14:38:01 WARN TaskSchedulerImpl:66 - Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources

2018-06-08 14:38:16 WARN TaskSchedulerImpl:66 - Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources

2018-06-08 14:38:31 WARN TaskSchedulerImpl:66 - Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources

2018-06-08 14:38:46 WARN TaskSchedulerImpl:66 - Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources

2018-06-08 14:39:01 WARN TaskSchedulerImpl:66 - Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources


spark partitionBy with partitioned column in json output

2018-06-04 Thread purna pradeep
im reading below json in spark

{"bucket": "B01", "actionType": "A1", "preaction": "NULL",
"postaction": "NULL"}
{"bucket": "B02", "actionType": "A2", "preaction": "NULL",
"postaction": "NULL"}
{"bucket": "B03", "actionType": "A3", "preaction": "NULL",
"postaction": "NULL"}

val df=spark.read.json("actions.json").toDF()

Now im writing the same to a json output as below

df.write. format("json"). mode("append").
partitionBy("bucket","actionType"). save("output.json")


and the output.json is as below

{"preaction":"NULL","postaction":"NULL"}

bucket,actionType columns are missing in the json output, i need
partitionby columns as well in the output


Re: Spark 2.3 error on Kubernetes

2018-05-29 Thread purna pradeep
Abirudh,

Thanks for your response

I’m running k8s cluster on AWS and kub-dns pods are running fine and also
as I mentioned only 1 executor pod is running though I requested for 5 and
rest 4 were killed with below error and I do have enough resources
available.

On Tue, May 29, 2018 at 6:28 PM Anirudh Ramanathan 
wrote:

> This looks to me like a kube-dns error that's causing the driver DNS
> address to not resolve.
> It would be worth double checking that kube-dns is indeed running (in the
> kube-system namespace).
> Often, with environments like minikube, kube-dns may exit/crashloop due to
> lack of resource.
>
> On Tue, May 29, 2018 at 3:18 PM, purna pradeep 
> wrote:
>
>> Hello,
>>
>> I’m getting below  error when I spark-submit a Spark 2.3 app on
>> Kubernetes *v1.8.3* , some of the executor pods  were killed with below
>> error as soon as they come up
>>
>> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
>>
>> at
>> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
>>
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
>>
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)
>>
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
>>
>> Caused by: org.apache.spark.SparkException: Exception thrown in
>> awaitResult:
>>
>> at
>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
>>
>> at
>> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
>>
>> at
>> org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
>>
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)
>>
>> at
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
>>
>> at
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
>>
>> at java.security.AccessController.doPrivileged(Native
>> Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>
>> ... 4 more
>>
>> Caused by: java.io.IOException: Failed to connect to
>> spark-1527629824987-driver-svc.spark.svc:7078
>>
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
>>
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
>>
>> at
>> org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
>>
>> at
>> org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
>>
>> at
>> org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
>>
>> at
>> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.net.UnknownHostException:
>> spark-1527629824987-driver-svc.spark.svc
>>
>> at
>> java.net.InetAddress.getAllByName0(InetAddress.java:1280)
>>
>> at
>> java.net.InetAddress.getAllByName(InetAddress.java:1192)
>>
>> at
>> java.net.InetAddress.getAllByName(InetAddress.java:1126)
>>
>> at java.net.InetAddress.getByName(InetAddress.java:1076)
>>
>> at
>> io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)
>>
>> at
>> io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)
>>
>> at java.security.AccessController.doPri

Spark 2.3 error on Kubernetes

2018-05-29 Thread purna pradeep
Hello,

I’m getting below  error when I spark-submit a Spark 2.3 app on Kubernetes
*v1.8.3* , some of the executor pods  were killed with below error as soon
as they come up

Exception in thread "main" java.lang.reflect.UndeclaredThrowableException

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)

at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)

at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)

at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)

at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)

Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:

at
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)

at
org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

at
org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)

at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)

at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)

at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)

at java.security.AccessController.doPrivileged(Native
Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

... 4 more

Caused by: java.io.IOException: Failed to connect to
spark-1527629824987-driver-svc.spark.svc:7078

at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)

at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)

at
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)

at
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)

at
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.net.UnknownHostException:
spark-1527629824987-driver-svc.spark.svc

at java.net.InetAddress.getAllByName0(InetAddress.java:1280)

at java.net.InetAddress.getAllByName(InetAddress.java:1192)

at java.net.InetAddress.getAllByName(InetAddress.java:1126)

at java.net.InetAddress.getByName(InetAddress.java:1076)

at
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)

at
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)

at java.security.AccessController.doPrivileged(Native
Method)

at
io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143)

at
io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43)

at
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63)

at
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55)

at
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57)

at
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32)

at
io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108)

at
io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208)

at
io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49)

at
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188)

at
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174)

at
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)

at
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)

at
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)

at
io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)

at
io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)

at

Spark 2.3 error on kubernetes

2018-05-29 Thread Mamillapalli, Purna Pradeep
Hello,


I’m getting below intermittent error when I spark-submit a Spark 2.3 app on 
Kubernetes v1.8.3 , some of the executor pods  were killed with below error as 
soon as they come up


Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at 
org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at 
org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
... 4 more
Caused by: java.io.IOException: Failed to connect to 
spark-1527629824987-driver-svc.spark.svc:7078
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
at 
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
at 
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: 
spark-1527629824987-driver-svc.spark.svc
at java.net.InetAddress.getAllByName0(InetAddress.java:1280)
at java.net.InetAddress.getAllByName(InetAddress.java:1192)
at java.net.InetAddress.getAllByName(InetAddress.java:1126)
at java.net.InetAddress.getByName(InetAddress.java:1076)
at 
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)
at 
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)
at java.security.AccessController.doPrivileged(Native Method)
at 
io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143)
at 
io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32)
at 
io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108)
at 
io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49)
at 
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188)
at 
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
at 
io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at 
io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)
at 

Spark 2.3 error on kubernetes

2018-05-29 Thread Mamillapalli, Purna Pradeep
Hello,


I’m getting below intermittent error when I spark-submit a Spark 2.3 app on 
Kubernetes v1.8.3 , some of the executor pods  were killed with below error as 
soon as they come up


Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at 
org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at 
org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
... 4 more
Caused by: java.io.IOException: Failed to connect to 
spark-1527629824987-driver-svc.spark.svc:7078
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
at 
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
at 
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: 
spark-1527629824987-driver-svc.spark.svc
at java.net.InetAddress.getAllByName0(InetAddress.java:1280)
at java.net.InetAddress.getAllByName(InetAddress.java:1192)
at java.net.InetAddress.getAllByName(InetAddress.java:1126)
at java.net.InetAddress.getByName(InetAddress.java:1076)
at 
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)
at 
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)
at java.security.AccessController.doPrivileged(Native Method)
at 
io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143)
at 
io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32)
at 
io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108)
at 
io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49)
at 
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188)
at 
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
at 
io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at 
io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)
at 

Spark driver pod garbage collection

2018-05-23 Thread purna pradeep
Hello,

Currently I observe dead pods are not getting garbage collected (aka spark
driver pods which have completed execution). So pods could sit in the
namespace for weeks potentially. This makes listing, parsing, and reading
pods slower and well as having junk sit on the cluster.

I believe minimum-container-ttl-duration kubelet flag is by default set to
0 minute but I don’t see the completed spark driver pods are garbage
collected

Do I need to set any flag explicitly @ kubelet level?


Spark driver pod eviction Kubernetes

2018-05-22 Thread purna pradeep
Hi,

What would be the recommended approach to wait for spark driver pod to
complete the currently running job before it gets evicted to new nodes
while maintenance on the current node is goingon (kernel upgrade,hardware
maintenance etc..) using drain command

I don’t think I can use PoDisruptionBudget as Spark pods deployment yaml(s)
is taken by Kubernetes

Please suggest !


Re: S3keysonsor

2018-05-21 Thread purna pradeep
+ Joe



On Mon, May 21, 2018 at 2:56 PM purna pradeep <purna2prad...@gmail.com>
wrote:

> I do know only to some extent , I mean If you see my sample s3 locations
>
> s3a://mybucket/20180425_111447_data1/_SUCCESS
>
> s3a://mybucket/20180424_111241_data1/_SUCCESS
>
>
>
> The only values which are static in above location are
>
> s3a://mybucket/
>
> data1/_SUCCESS
>
> Now I want to configure tolerance for _SUCCESS file as latest or 1 day
> older based on this configuration it should pick the right time stamp
> folder which has _SUCCESS file
>
> On Mon, May 21, 2018 at 2:35 PM Joe Napolitano <joe.napolit...@wework.com>
> wrote:
>
>> Purna, with regards to "this path is not completely static," can you
>> clarify what you mean?
>>
>> Do you mean that you don't know the actual key name beforehand? E.g.
>> pertaining to "111447", "111241", and "111035" in your example?
>>
>> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
>> br...@heisenbergwoodworking.com> wrote:
>>
>> > I suggest it’ll work for your needs.
>> >
>> > Sent from a device with less than stellar autocorrect
>> >
>> > > On May 21, 2018, at 10:16 AM, purna pradeep <purna2prad...@gmail.com>
>> > wrote:
>> > >
>> > > Hi ,
>> > >
>> > > I’m trying to evaluate airflow to see if it suits my needs.
>> > >
>> > > Basically i can have below steps in a DAG
>> > >
>> > >
>> > >
>> > > 1)Look for a file arrival on given s3 location (this path is not
>> > completely
>> > > static) (i can use S3Keysensor in this step)
>> > >
>> > >  i should be able to specify to look either for latest folder or
>> 24hrs or
>> > > n number of days older folder which has _SUCCESS file as mentioned
>> below
>> > >
>> > >  sample file location(s):
>> > >
>> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
>> > >
>> > >
>
>
> s3a://mybucket/20180424_111241_data1/_SUCCESS
>> > >
>> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
>> > >
>> > >
>> > >
>> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
>> > > dependency is met ,i can set upstream for step2 as step1
>> > >
>> > >
>> > >
>> > > Does S3keysensor supports step1 out of the box?
>> > >
>> > > Also in some cases i may to have a DAG without start date & end date
>> it
>> > > just needs to be triggered once file is available in a given s3
>> location
>> > >
>> > >
>> > >
>> > > *Please suggest !*
>> >
>>
>


Re: S3keysonsor

2018-05-21 Thread purna pradeep
I do know only to some extent , I mean If you see my sample s3 locations

s3a://mybucket/20180425_111447_data1/_SUCCESS

s3a://mybucket/20180424_111241_data1/_SUCCESS



The only values which are static in above location are

s3a://mybucket/

data1/_SUCCESS

Now I want to configure tolerance for _SUCCESS file as latest or 1 day
older based on this configuration it should pick the right time stamp
folder which has _SUCCESS file

On Mon, May 21, 2018 at 2:35 PM Joe Napolitano <joe.napolit...@wework.com>
wrote:

> Purna, with regards to "this path is not completely static," can you
> clarify what you mean?
>
> Do you mean that you don't know the actual key name beforehand? E.g.
> pertaining to "111447", "111241", and "111035" in your example?
>
> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
> br...@heisenbergwoodworking.com> wrote:
>
> > I suggest it’ll work for your needs.
> >
> > Sent from a device with less than stellar autocorrect
> >
> > > On May 21, 2018, at 10:16 AM, purna pradeep <purna2prad...@gmail.com>
> > wrote:
> > >
> > > Hi ,
> > >
> > > I’m trying to evaluate airflow to see if it suits my needs.
> > >
> > > Basically i can have below steps in a DAG
> > >
> > >
> > >
> > > 1)Look for a file arrival on given s3 location (this path is not
> > completely
> > > static) (i can use S3Keysensor in this step)
> > >
> > >  i should be able to specify to look either for latest folder or 24hrs
> or
> > > n number of days older folder which has _SUCCESS file as mentioned
> below
> > >
> > >  sample file location(s):
> > >
> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
> > >
> > >


s3a://mybucket/20180424_111241_data1/_SUCCESS
> > >
> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
> > >
> > >
> > >
> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
> > > dependency is met ,i can set upstream for step2 as step1
> > >
> > >
> > >
> > > Does S3keysensor supports step1 out of the box?
> > >
> > > Also in some cases i may to have a DAG without start date & end date it
> > > just needs to be triggered once file is available in a given s3
> location
> > >
> > >
> > >
> > > *Please suggest !*
> >
>


Re: Oozie for spark jobs without Hadoop

2018-05-21 Thread purna pradeep
Here you go !


   - Add oozie.service.HadoopAccessorService.supported.filesystems as * in
   oozie-site.xml
   - include hadoop-aws-2.8.3.jar
   - Rebuild oozie with Dhttpclient.version=4.5.5 -Dhttpcore.version=4.4.9
   - Set jetty_opts with proxy values



On Sat, May 19, 2018 at 2:17 AM Peter Cseh <gezap...@cloudera.com> wrote:

> Wow, great work!
> Can you please summarize the required steps? This would be useful for
> others so we probably should add it to our documentation.
> Thanks in advance!
> Peter
>
> On Fri, May 18, 2018 at 11:33 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> I got this fixed by setting jetty_opts with proxy values.
>>
>> Thanks Peter!!
>>
>> On Thu, May 17, 2018 at 4:05 PM purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>> Ok I fixed this by adding aws keys in oozie
>>>
>>> But I’m getting below error
>>>
>>> I have tried setting proxy in core-site.xml but no luck
>>>
>>>
>>> 2018-05-17 15:39:20,602 ERROR CoordInputLogicEvaluatorPhaseOne:517 -
>>> SERVER[localhost] USER[-] GROUP[-] TOKEN[-] APP[-]
>>> JOB[000-180517144113498-oozie-xjt0-C] ACTION[000-
>>> 180517144113498-oozie-xjt0-C@2] 
>>> org.apache.oozie.service.HadoopAccessorException:
>>> E0902: Exception occurred: [doesBucketExist on cmsegmentation-qa:
>>> com.amazonaws.SdkClientException: Unable to execute HTTP request:
>>> Connect to mybucket.s3.amazonaws.com:443
>>> <http://cmsegmentation-qa.s3.amazonaws.com:443/> [mybucket.
>>> s3.amazonaws.com/52.216.165.155
>>> <http://cmsegmentation-qa.s3.amazonaws.com/52.216.165.155>] failed:
>>> connect timed out]
>>>
>>> org.apache.oozie.service.HadoopAccessorException: E0902: Exception
>>> occurred: [doesBucketExist on cmsegmentation-qa:
>>> com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect
>>> to mybucket.s3.amazonaws.com:443
>>> <http://cmsegmentation-qa.s3.amazonaws.com:443/> [mybucket
>>> .s3.amazonaws.com
>>> <http://cmsegmentation-qa.s3.amazonaws.com/52.216.165.155> failed:
>>> connect timed out]
>>>
>>> at
>>> org.apache.oozie.service.HadoopAccessorService.createFileSystem(HadoopAccessorService.java:630)
>>>
>>> at
>>> org.apache.oozie.service.HadoopAccessorService.createFileSystem(HadoopAccessorService.java:594)
>>> at org.apache.oozie.dependency.
>>> FSURIHandler.getFileSystem(FSURIHandler.java:184)-env.sh
>>>
>>> But now I’m getting this error
>>>
>>>
>>>
>>> On Thu, May 17, 2018 at 2:53 PM purna pradeep <purna2prad...@gmail.com>
>>> wrote:
>>>
>>>> Ok I got passed this error
>>>>
>>>> By rebuilding oozie with Dhttpclient.version=4.5.5
>>>> -Dhttpcore.version=4.4.9
>>>>
>>>> now getting this error
>>>>
>>>>
>>>>
>>>> ACTION[000-180517144113498-oozie-xjt0-C@1]
>>>> org.apache.oozie.service.HadoopAccessorException: E0902: Exception
>>>> occurred: [doesBucketExist on mybucketcom.amazonaws.AmazonClientException:
>>>> No AWS Credentials provided by BasicAWSCredentialsProvider
>>>> EnvironmentVariableCredentialsProvider
>>>> SharedInstanceProfileCredentialsProvider :
>>>> com.amazonaws.SdkClientException: Unable to load credentials from service
>>>> endpoint]
>>>>
>>>> org.apache.oozie.service.HadoopAccessorException: E0902: Exception
>>>> occurred: [doesBucketExist on cmsegmentation-qa:
>>>> com.amazonaws.AmazonClientException: No AWS Credentials provided by
>>>> BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider
>>>> SharedInstanceProfileCredentialsProvider :
>>>> com.amazonaws.SdkClientException: Unable to load credentials from service
>>>> endpoint]
>>>>
>>>> On Thu, May 17, 2018 at 12:24 PM purna pradeep <purna2prad...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Peter,
>>>>>
>>>>> Also When I submit a job with new http client jar, I get
>>>>>
>>>>> ```Error: IO_ERROR : java.io.IOException: Error while connecting Oozie
>>>>> server. No of retries = 1. Exception = Could not authenticate,
>>>>> Authentication failed, status: 500, message: Server Error```
>>>>>
>>>&g

Event trigger Oozie datasets

2018-05-20 Thread purna pradeep
Hello ,

Event trigger Oozie datasets

 1) Does oozie supports event trigger?

   Trigger Workflow based on a file arrival on AWS s3

   As per my understanding based on start date mentioned on coordinator it
can poll for a file on s3 and once dependency is met it can execute an
action/SparkAction but my requirement is trigger workflow based on a file
arrival and compare currentdate with starttime(if startime is configured
else execute action based on event) and execute action/SparkAction if its
time to execute the same.



2)Also i see on datasets we need to specify initial-instance and dataset
location is derived from initial-instance value

for ex:





  

s3a://app/logs/${YEAR}_${MONTH}_${DAY}_${HOUR}

  



  

  



  ${coord:latest(0)}



  



Then, the dataset instances for the input events for the coordinator
action will be:

 s3a://app/logs/2009_01_10



But my requirement is im not sure of the dataset generation timestamp and
also im not sure of frequency of the dataset generation

My requirement is

 dataset location could be s3a://app/logs/2018_02_10 (ie it may be
generated everyday) and when i run my job on 2018/02/11 i should be able to
specify to consider either latest or 24hrs or n  number of days old (from
the day I run workflow )  datset as dependency for the action/SparkAction
which im trying to execute.

Please suggest !


Re: Oozie for spark jobs without Hadoop

2018-05-17 Thread purna pradeep
Ok I fixed this by adding aws keys in oozie

But I’m getting below error

I have tried setting proxy in core-site.xml but no luck


2018-05-17 15:39:20,602 ERROR CoordInputLogicEvaluatorPhaseOne:517 -
SERVER[localhost] USER[-] GROUP[-] TOKEN[-] APP[-]
JOB[000-180517144113498-oozie-xjt0-C] ACTION[000-
180517144113498-oozie-xjt0-C@2]
org.apache.oozie.service.HadoopAccessorException:
E0902: Exception occurred: [doesBucketExist on cmsegmentation-qa:
com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to
 mybucket.s3.amazonaws.com:443
<http://cmsegmentation-qa.s3.amazonaws.com:443/> [mybucket.
s3.amazonaws.com/52.216.165.155
<http://cmsegmentation-qa.s3.amazonaws.com/52.216.165.155>] failed: connect
timed out]

org.apache.oozie.service.HadoopAccessorException: E0902: Exception
occurred: [doesBucketExist on cmsegmentation-qa:
com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect
to mybucket.s3.amazonaws.com:443
<http://cmsegmentation-qa.s3.amazonaws.com:443/> [mybucket.s3.amazonaws.com
<http://cmsegmentation-qa.s3.amazonaws.com/52.216.165.155> failed: connect
timed out]

at
org.apache.oozie.service.HadoopAccessorService.createFileSystem(HadoopAccessorService.java:630)

at
org.apache.oozie.service.HadoopAccessorService.createFileSystem(HadoopAccessorService.java:594)
at org.apache.oozie.dependency.FSURIHandler.getFileSystem(
FSURIHandler.java:184)-env.sh

But now I’m getting this error



On Thu, May 17, 2018 at 2:53 PM purna pradeep <purna2prad...@gmail.com>
wrote:

> Ok I got passed this error
>
> By rebuilding oozie with Dhttpclient.version=4.5.5 -Dhttpcore.version=4.4.9
>
> now getting this error
>
>
>
> ACTION[000-180517144113498-oozie-xjt0-C@1]
> org.apache.oozie.service.HadoopAccessorException: E0902: Exception
> occurred: [doesBucketExist on mybucketcom.amazonaws.AmazonClientException:
> No AWS Credentials provided by BasicAWSCredentialsProvider
> EnvironmentVariableCredentialsProvider
> SharedInstanceProfileCredentialsProvider :
> com.amazonaws.SdkClientException: Unable to load credentials from service
> endpoint]
>
> org.apache.oozie.service.HadoopAccessorException: E0902: Exception
> occurred: [doesBucketExist on cmsegmentation-qa:
> com.amazonaws.AmazonClientException: No AWS Credentials provided by
> BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider
> SharedInstanceProfileCredentialsProvider :
> com.amazonaws.SdkClientException: Unable to load credentials from service
> endpoint]
>
> On Thu, May 17, 2018 at 12:24 PM purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>>
>> Peter,
>>
>> Also When I submit a job with new http client jar, I get
>>
>> ```Error: IO_ERROR : java.io.IOException: Error while connecting Oozie
>> server. No of retries = 1. Exception = Could not authenticate,
>> Authentication failed, status: 500, message: Server Error```
>>
>>
>> On Thu, May 17, 2018 at 12:14 PM purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>> Ok I have tried this
>>>
>>> It appears that s3a support requires httpclient 4.4.x and oozie is
>>> bundled with httpclient 4.3.6. When httpclient is upgraded, the ext UI
>>> stops loading.
>>>
>>>
>>>
>>> On Thu, May 17, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com>
>>> wrote:
>>>
>>>> Purna,
>>>>
>>>> Based on
>>>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3
>>>> you should try to go for s3a.
>>>> You'll have to include the aws-jdk as well if I see it correctly:
>>>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A
>>>> Also, the property names are slightly different so you'll have to
>>>> change the example I've given.
>>>>
>>>>
>>>>
>>>> On Thu, May 17, 2018 at 4:16 PM, purna pradeep <purna2prad...@gmail.com
>>>> > wrote:
>>>>
>>>>> Peter,
>>>>>
>>>>> I’m using latest oozie 5.0.0 and I have tried below changes but no
>>>>> luck
>>>>>
>>>>> Is this for s3 or s3a ?
>>>>>
>>>>> I’m using s3 but if this is for s3a do you know which jar I need to
>>>>> include I mean Hadoop-aws jar or any other jar if required
>>>>>
>>>>> Hadoop-aws-2.8.3.jar is what I’m using
>>>>>
>>>>> On Wed, May 16, 2018 at 5:19 PM Peter Cseh <gezap...@cloudera.com>
>>>>> 

Re: Oozie for spark jobs without Hadoop

2018-05-17 Thread purna pradeep
Ok I got passed this error

By rebuilding oozie with Dhttpclient.version=4.5.5 -Dhttpcore.version=4.4.9

now getting this error



ACTION[000-180517144113498-oozie-xjt0-C@1]
org.apache.oozie.service.HadoopAccessorException: E0902: Exception
occurred: [doesBucketExist on mybucketcom.amazonaws.AmazonClientException:
No AWS Credentials provided by BasicAWSCredentialsProvider
EnvironmentVariableCredentialsProvider
SharedInstanceProfileCredentialsProvider :
com.amazonaws.SdkClientException: Unable to load credentials from service
endpoint]

org.apache.oozie.service.HadoopAccessorException: E0902: Exception
occurred: [doesBucketExist on cmsegmentation-qa:
com.amazonaws.AmazonClientException: No AWS Credentials provided by
BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider
SharedInstanceProfileCredentialsProvider :
com.amazonaws.SdkClientException: Unable to load credentials from service
endpoint]

On Thu, May 17, 2018 at 12:24 PM purna pradeep <purna2prad...@gmail.com>
wrote:

>
> Peter,
>
> Also When I submit a job with new http client jar, I get
>
> ```Error: IO_ERROR : java.io.IOException: Error while connecting Oozie
> server. No of retries = 1. Exception = Could not authenticate,
> Authentication failed, status: 500, message: Server Error```
>
>
> On Thu, May 17, 2018 at 12:14 PM purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Ok I have tried this
>>
>> It appears that s3a support requires httpclient 4.4.x and oozie is
>> bundled with httpclient 4.3.6. When httpclient is upgraded, the ext UI
>> stops loading.
>>
>>
>>
>> On Thu, May 17, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com>
>> wrote:
>>
>>> Purna,
>>>
>>> Based on
>>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3
>>> you should try to go for s3a.
>>> You'll have to include the aws-jdk as well if I see it correctly:
>>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A
>>> Also, the property names are slightly different so you'll have to change
>>> the example I've given.
>>>
>>>
>>>
>>> On Thu, May 17, 2018 at 4:16 PM, purna pradeep <purna2prad...@gmail.com>
>>> wrote:
>>>
>>>> Peter,
>>>>
>>>> I’m using latest oozie 5.0.0 and I have tried below changes but no luck
>>>>
>>>> Is this for s3 or s3a ?
>>>>
>>>> I’m using s3 but if this is for s3a do you know which jar I need to
>>>> include I mean Hadoop-aws jar or any other jar if required
>>>>
>>>> Hadoop-aws-2.8.3.jar is what I’m using
>>>>
>>>> On Wed, May 16, 2018 at 5:19 PM Peter Cseh <gezap...@cloudera.com>
>>>> wrote:
>>>>
>>>>> Ok, I've found it:
>>>>>
>>>>> If you are using 4.3.0 or newer this is the part which checks for
>>>>> dependencies:
>>>>>
>>>>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java#L914-L926
>>>>> It passes the coordinator action's configuration and even does
>>>>> impersonation to check for the dependencies:
>>>>>
>>>>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java#L159
>>>>>
>>>>> Have you tried the following in the coordinator xml:
>>>>>
>>>>>  
>>>>> 
>>>>>   hdfs://bar:9000/usr/joe/logsprocessor-wf
>>>>>   
>>>>> 
>>>>>   fs.s3.awsAccessKeyId
>>>>>   [YOURKEYID]
>>>>> 
>>>>> 
>>>>>   fs.s3.awsSecretAccessKey
>>>>>   [YOURKEY]
>>>>> 
>>>>>  
>>>>>
>>>>>   
>>>>>
>>>>> Based on the source this should be able to poll s3 periodically.
>>>>>
>>>>> On Wed, May 16, 2018 at 10:57 PM, purna pradeep <
>>>>> purna2prad...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> I have tried with coordinator's configuration too but no luck ☹️
>>>>>>
>>>>>> On Wed, May 16, 2018 at 3:54 PM Peter Cseh <gezap...@cloudera.com>
>>>>>> wrote:
>>>>>>
>>>>

Re: Oozie for spark jobs without Hadoop

2018-05-17 Thread purna pradeep
Peter,

Also When I submit a job with new http client jar, I get

```Error: IO_ERROR : java.io.IOException: Error while connecting Oozie
server. No of retries = 1. Exception = Could not authenticate,
Authentication failed, status: 500, message: Server Error```


On Thu, May 17, 2018 at 12:14 PM purna pradeep <purna2prad...@gmail.com>
wrote:

> Ok I have tried this
>
> It appears that s3a support requires httpclient 4.4.x and oozie is bundled
> with httpclient 4.3.6. When httpclient is upgraded, the ext UI stops
> loading.
>
>
>
> On Thu, May 17, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> wrote:
>
>> Purna,
>>
>> Based on
>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3
>> you should try to go for s3a.
>> You'll have to include the aws-jdk as well if I see it correctly:
>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A
>> Also, the property names are slightly different so you'll have to change
>> the example I've given.
>>
>>
>>
>> On Thu, May 17, 2018 at 4:16 PM, purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>> Peter,
>>>
>>> I’m using latest oozie 5.0.0 and I have tried below changes but no luck
>>>
>>> Is this for s3 or s3a ?
>>>
>>> I’m using s3 but if this is for s3a do you know which jar I need to
>>> include I mean Hadoop-aws jar or any other jar if required
>>>
>>> Hadoop-aws-2.8.3.jar is what I’m using
>>>
>>> On Wed, May 16, 2018 at 5:19 PM Peter Cseh <gezap...@cloudera.com>
>>> wrote:
>>>
>>>> Ok, I've found it:
>>>>
>>>> If you are using 4.3.0 or newer this is the part which checks for
>>>> dependencies:
>>>>
>>>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java#L914-L926
>>>> It passes the coordinator action's configuration and even does
>>>> impersonation to check for the dependencies:
>>>>
>>>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java#L159
>>>>
>>>> Have you tried the following in the coordinator xml:
>>>>
>>>>  
>>>>     
>>>>   hdfs://bar:9000/usr/joe/logsprocessor-wf
>>>>   
>>>> 
>>>>   fs.s3.awsAccessKeyId
>>>>   [YOURKEYID]
>>>> 
>>>> 
>>>>   fs.s3.awsSecretAccessKey
>>>>   [YOURKEY]
>>>> 
>>>>  
>>>>
>>>>   
>>>>
>>>> Based on the source this should be able to poll s3 periodically.
>>>>
>>>> On Wed, May 16, 2018 at 10:57 PM, purna pradeep <
>>>> purna2prad...@gmail.com> wrote:
>>>>
>>>>>
>>>>> I have tried with coordinator's configuration too but no luck ☹️
>>>>>
>>>>> On Wed, May 16, 2018 at 3:54 PM Peter Cseh <gezap...@cloudera.com>
>>>>> wrote:
>>>>>
>>>>>> Great progress there purna! :)
>>>>>>
>>>>>> Have you tried adding these properites to the coordinator's
>>>>>> configuration? we usually use the action config to build up connection to
>>>>>> the distributed file system.
>>>>>> Although I'm not sure we're using these when polling the dependencies
>>>>>> for coordinators, but I'm excited about you trying to make it work!
>>>>>>
>>>>>> I'll get back with a - hopefully - more helpful answer soon, I have
>>>>>> to check the code in more depth first.
>>>>>> gp
>>>>>>
>>>>>> On Wed, May 16, 2018 at 9:45 PM, purna pradeep <
>>>>>> purna2prad...@gmail.com> wrote:
>>>>>>
>>>>>>> Peter,
>>>>>>>
>>>>>>> I got rid of this error by adding
>>>>>>> hadoop-aws-2.8.3.jar and jets3t-0.9.4.jar
>>>>>>>
>>>>>>> But I’m getting below error now
>>>>>>>
>>>>>>> java.lang.IllegalArgumentException: AWS Access Key ID and Secret
>>>>>>> Access Key must be specified by setting the fs.s3.awsAccessKeyI

Re: Oozie for spark jobs without Hadoop

2018-05-17 Thread purna pradeep
Ok I have tried this

It appears that s3a support requires httpclient 4.4.x and oozie is bundled
with httpclient 4.3.6. When httpclient is upgraded, the ext UI stops
loading.



On Thu, May 17, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> wrote:

> Purna,
>
> Based on
> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3
> you should try to go for s3a.
> You'll have to include the aws-jdk as well if I see it correctly:
> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A
> Also, the property names are slightly different so you'll have to change
> the example I've given.
>
>
>
> On Thu, May 17, 2018 at 4:16 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Peter,
>>
>> I’m using latest oozie 5.0.0 and I have tried below changes but no luck
>>
>> Is this for s3 or s3a ?
>>
>> I’m using s3 but if this is for s3a do you know which jar I need to
>> include I mean Hadoop-aws jar or any other jar if required
>>
>> Hadoop-aws-2.8.3.jar is what I’m using
>>
>> On Wed, May 16, 2018 at 5:19 PM Peter Cseh <gezap...@cloudera.com> wrote:
>>
>>> Ok, I've found it:
>>>
>>> If you are using 4.3.0 or newer this is the part which checks for
>>> dependencies:
>>>
>>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java#L914-L926
>>> It passes the coordinator action's configuration and even does
>>> impersonation to check for the dependencies:
>>>
>>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java#L159
>>>
>>> Have you tried the following in the coordinator xml:
>>>
>>>  
>>> 
>>>   hdfs://bar:9000/usr/joe/logsprocessor-wf
>>>   
>>> 
>>>   fs.s3.awsAccessKeyId
>>>   [YOURKEYID]
>>> 
>>> 
>>>   fs.s3.awsSecretAccessKey
>>>   [YOURKEY]
>>> 
>>>  
>>>
>>>   
>>>
>>> Based on the source this should be able to poll s3 periodically.
>>>
>>> On Wed, May 16, 2018 at 10:57 PM, purna pradeep <purna2prad...@gmail.com
>>> > wrote:
>>>
>>>>
>>>> I have tried with coordinator's configuration too but no luck ☹️
>>>>
>>>> On Wed, May 16, 2018 at 3:54 PM Peter Cseh <gezap...@cloudera.com>
>>>> wrote:
>>>>
>>>>> Great progress there purna! :)
>>>>>
>>>>> Have you tried adding these properites to the coordinator's
>>>>> configuration? we usually use the action config to build up connection to
>>>>> the distributed file system.
>>>>> Although I'm not sure we're using these when polling the dependencies
>>>>> for coordinators, but I'm excited about you trying to make it work!
>>>>>
>>>>> I'll get back with a - hopefully - more helpful answer soon, I have to
>>>>> check the code in more depth first.
>>>>> gp
>>>>>
>>>>> On Wed, May 16, 2018 at 9:45 PM, purna pradeep <
>>>>> purna2prad...@gmail.com> wrote:
>>>>>
>>>>>> Peter,
>>>>>>
>>>>>> I got rid of this error by adding
>>>>>> hadoop-aws-2.8.3.jar and jets3t-0.9.4.jar
>>>>>>
>>>>>> But I’m getting below error now
>>>>>>
>>>>>> java.lang.IllegalArgumentException: AWS Access Key ID and Secret
>>>>>> Access Key must be specified by setting the fs.s3.awsAccessKeyId and
>>>>>> fs.s3.awsSecretAccessKey properties (respectively)
>>>>>>
>>>>>> I have tried adding AWS access ,secret keys in
>>>>>>
>>>>>> oozie-site.xml and hadoop core-site.xml , and hadoop-config.xml
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 16, 2018 at 2:30 PM purna pradeep <
>>>>>> purna2prad...@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> I have tried this ,just added s3 instead of *
>>>>>>>
>>>>>>> 
>>>>>>>
>>>>>>>
>>&g

Re: Spark 2.3 in oozie

2018-05-16 Thread purna pradeep
Thanks Peter!

I’m able to run spark pi example on Kubernetes cluster from oozie after
this change

On Wed, May 16, 2018 at 10:27 AM Peter Cseh <gezap...@cloudera.com> wrote:

> The version of the xml schema has nothing to do with the version of the
> component you're using.
>
> Thanks for verifying that -Dspark.scala.binary.verstion=2.11 is required
> for compilation with Spark 2.3.0
>
> Oozie does not pull in Spark's Kubernetes artifact.
> To make it part of the Oozie Spark sharelib you'll have to include the
> spark-kubernetes.jar
> <
> https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-kubernetes_2.11%7C2.3.0%7Cjar
> >
> in
> the sharelib/spark/pom.xml as a compile-time dependency.
>
> gp
>
> On Tue, May 15, 2018 at 9:04 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
> > I’m able to compile successfully with after adding this override option
> >
> > -Dspark.scala.binary.version=2.11
> >
> > Dspark.version = 2.3.0
> >
> > But when I’m running a spark action with spark-pi example jar against
> > Kubernetes master I’m getting below error in stderr log
> >
> >
> > *Error:Could not load KUBERNETES classes.This copy of spark may not have
> > been compiled with Kubernetes support*
> >
> > Below is my workflow.xml
> >
> > <*spark xmlns="uri:oozie:spark-action:1.0">*
> >
> > *${resourceManager}*
> >
> > *${nameNode}*
> >
> > *k8s://<***.com>*
> >
> > *Python-Spark-Pi*
> >
> > *spark-examples_2.11-2.3.0.jar*
> >
> > *--class org.apache.spark.examples.SparkPi --conf
> > spark.executor.instances=2 --conf spark.kubernetes.namespace=spark --conf
> > spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf
> > spark.kubernetes.container.image=artifactory.cloud.
> > capitalone.com/kubespark/spark-quantum:v2.3.0
> > <http://artifactory.cloud.capitalone.com/kubespark/spark-quantum:v2.3.0>
> > --conf
> spark.kubernetes.node.selector.node-role.kubernetes.io/worker=true
> > <
> http://spark.kubernetes.node.selector.node-role.kubernetes.io/worker=true
> > >
> > --conf
> > spark.kubernetes.driver.label.application=is1-driver --conf
> > spark.kubernetes.executor.label.application=is1-exec*utor
> > local:///opt/spark/examples/jars/spark-examples_2.11-2.3.
> > 0.jar
> >
> > 
> >
> >
> > Is this because of uri:oozie:spark-action:1.0 in spark xml tag? Does it
> > needs to be spark-action:2.0 as I’m using spark 2.3?
> >
> >
> > Please suggest!
> >
> >
> > On Tue, May 15, 2018 at 12:43 PM Peter Cseh <gezap...@cloudera.com>
> wrote:
> >
> > > I think the error is related to the Scala version being present in the
> > > artifact name.
> > > I'll take a look at this tomorrow.
> > > Gp
> > >
> > > On Tue, May 15, 2018, 18:28 Artem Ervits <artemerv...@gmail.com>
> wrote:
> > >
> > > > Did you run
> > > > mvn clean install first on the parent directory?
> > > >
> > > > On Tue, May 15, 2018, 11:35 AM purna pradeep <
> purna2prad...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks peter,
> > > > >
> > > > > I have tried changing Dspark.version to 2.3.0 and compiled oozie
> I’m
> > > > > getting below error from oozie examples
> > > > >
> > > > >
> > > > > *ERROR] Failed to execute goal on project oozie-examples: Could not
> > > > resolve
> > > > > dependencies for project org.apache.oozie:oozie-examples:jar:5.0.0:
> > > Could
> > > > > not find artifact org.apache.spark:spark-core_2.10:jar:2.3.0 in
> > > > resolution
> > > > > *
> > > > >
> > > > > On Tue, May 15, 2018 at 11:14 AM Peter Cseh <gezap...@cloudera.com
> >
> > > > wrote:
> > > > >
> > > > > > Oozie has a spark-2 profile that is currently hard-coded to Spark
> > > 2.1:
> > > > > > https://github.com/apache/oozie/blob/master/pom.xml#L1983
> > > > > > I'm sure if you overwrite the -Dspark.version and compile Oozie
> > that
> > > > way
> > > > > it
> > > > > > will work.
> > > > > > gp
> > > > > >
> > > > > >
> > > > > > On Tue, May 15, 2018 

Re: Oozie for spark jobs without Hadoop

2018-05-16 Thread purna pradeep
Peter,

I got rid of this error by adding
hadoop-aws-2.8.3.jar and jets3t-0.9.4.jar

But I’m getting below error now

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key
must be specified by setting the fs.s3.awsAccessKeyId and
fs.s3.awsSecretAccessKey properties (respectively)

I have tried adding AWS access ,secret keys in

oozie-site.xml and hadoop core-site.xml , and hadoop-config.xml




On Wed, May 16, 2018 at 2:30 PM purna pradeep <purna2prad...@gmail.com>
wrote:

>
> I have tried this ,just added s3 instead of *
>
> 
>
> oozie.service.HadoopAccessorService.supported.filesystems
>
> hdfs,hftp,webhdfs,s3
>
> 
>
>
> Getting below error
>
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.s3a.S3AFileSystem not found
>
> at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2369)
>
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
>
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
>
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
>
> at
> org.apache.oozie.service.HadoopAccessorService$5.run(HadoopAccessorService.java:625)
>
> at
> org.apache.oozie.service.HadoopAccessorService$5.run(HadoopAccessorService.java:623
>
>
> On Wed, May 16, 2018 at 2:19 PM purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> This is what is in the logs
>>
>> 2018-05-16 14:06:13,500  INFO URIHandlerService:520 - SERVER[localhost]
>> Loaded urihandlers [org.apache.oozie.dependency.FSURIHandler]
>>
>> 2018-05-16 14:06:13,501  INFO URIHandlerService:520 - SERVER[localhost]
>> Loaded default urihandler org.apache.oozie.dependency.FSURIHandler
>>
>>
>> On Wed, May 16, 2018 at 12:27 PM Peter Cseh <gezap...@cloudera.com>
>> wrote:
>>
>>> That's strange, this exception should not happen in that case.
>>> Can you check the server logs for messages like this?
>>> LOG.info("Loaded urihandlers {0}", Arrays.toString(classes));
>>> LOG.info("Loaded default urihandler {0}",
>>> defaultHandler.getClass().getName());
>>> Thanks
>>>
>>> On Wed, May 16, 2018 at 5:47 PM, purna pradeep <purna2prad...@gmail.com>
>>> wrote:
>>>
>>>> This is what I already have in my oozie-site.xml
>>>>
>>>> 
>>>>
>>>>
>>>> oozie.service.HadoopAccessorService.supported.filesystems
>>>>
>>>> *
>>>>
>>>> 
>>>>
>>>> On Wed, May 16, 2018 at 11:37 AM Peter Cseh <gezap...@cloudera.com>
>>>> wrote:
>>>>
>>>>> You'll have to configure
>>>>> oozie.service.HadoopAccessorService.supported.filesystems
>>>>> hdfs,hftp,webhdfs Enlist
>>>>> the different filesystems supported for federation. If wildcard "*" is
>>>>> specified, then ALL file schemes will be allowed.properly.
>>>>>
>>>>> For testing purposes it's ok to put * in there in oozie-site.xml
>>>>>
>>>>> On Wed, May 16, 2018 at 5:29 PM, purna pradeep <
>>>>> purna2prad...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Peter,
>>>>> >
>>>>> > I have tried to specify dataset with uri starting with s3://, s3a://
>>>>> and
>>>>> > s3n:// and I am getting exception
>>>>> >
>>>>> >
>>>>> >
>>>>> > Exception occurred:E0904: Scheme [s3] not supported in uri
>>>>> > [s3://mybucket/input.data] Making the job failed
>>>>> >
>>>>> > org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3]
>>>>> not
>>>>> > supported in uri [s3:// mybucket /input.data]
>>>>> >
>>>>> > at
>>>>> > org.apache.oozie.service.URIHandlerService.getURIHandler(
>>>>> > URIHandlerService.java:185)
>>>>> >
>>>>> > at
>>>>> > org.apache.oozie.service.URIHandlerService.getURIHandler(
>>>>> > URIHandlerService.java:168)
>>>>>

Re: Oozie for spark jobs without Hadoop

2018-05-16 Thread purna pradeep
I have tried this ,just added s3 instead of *



oozie.service.HadoopAccessorService.supported.filesystems

hdfs,hftp,webhdfs,s3




Getting below error

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3a.S3AFileSystem not found

at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2369)

at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)

at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)

at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)

at
org.apache.oozie.service.HadoopAccessorService$5.run(HadoopAccessorService.java:625)

at
org.apache.oozie.service.HadoopAccessorService$5.run(HadoopAccessorService.java:623


On Wed, May 16, 2018 at 2:19 PM purna pradeep <purna2prad...@gmail.com>
wrote:

> This is what is in the logs
>
> 2018-05-16 14:06:13,500  INFO URIHandlerService:520 - SERVER[localhost]
> Loaded urihandlers [org.apache.oozie.dependency.FSURIHandler]
>
> 2018-05-16 14:06:13,501  INFO URIHandlerService:520 - SERVER[localhost]
> Loaded default urihandler org.apache.oozie.dependency.FSURIHandler
>
>
> On Wed, May 16, 2018 at 12:27 PM Peter Cseh <gezap...@cloudera.com> wrote:
>
>> That's strange, this exception should not happen in that case.
>> Can you check the server logs for messages like this?
>> LOG.info("Loaded urihandlers {0}", Arrays.toString(classes));
>> LOG.info("Loaded default urihandler {0}",
>> defaultHandler.getClass().getName());
>> Thanks
>>
>> On Wed, May 16, 2018 at 5:47 PM, purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>> This is what I already have in my oozie-site.xml
>>>
>>> 
>>>
>>>
>>> oozie.service.HadoopAccessorService.supported.filesystems
>>>
>>> *
>>>
>>> 
>>>
>>> On Wed, May 16, 2018 at 11:37 AM Peter Cseh <gezap...@cloudera.com>
>>> wrote:
>>>
>>>> You'll have to configure
>>>> oozie.service.HadoopAccessorService.supported.filesystems
>>>> hdfs,hftp,webhdfs Enlist
>>>> the different filesystems supported for federation. If wildcard "*" is
>>>> specified, then ALL file schemes will be allowed.properly.
>>>>
>>>> For testing purposes it's ok to put * in there in oozie-site.xml
>>>>
>>>> On Wed, May 16, 2018 at 5:29 PM, purna pradeep <purna2prad...@gmail.com
>>>> >
>>>> wrote:
>>>>
>>>> > Peter,
>>>> >
>>>> > I have tried to specify dataset with uri starting with s3://, s3a://
>>>> and
>>>> > s3n:// and I am getting exception
>>>> >
>>>> >
>>>> >
>>>> > Exception occurred:E0904: Scheme [s3] not supported in uri
>>>> > [s3://mybucket/input.data] Making the job failed
>>>> >
>>>> > org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3]
>>>> not
>>>> > supported in uri [s3:// mybucket /input.data]
>>>> >
>>>> > at
>>>> > org.apache.oozie.service.URIHandlerService.getURIHandler(
>>>> > URIHandlerService.java:185)
>>>> >
>>>> > at
>>>> > org.apache.oozie.service.URIHandlerService.getURIHandler(
>>>> > URIHandlerService.java:168)
>>>> >
>>>> > at
>>>> > org.apache.oozie.service.URIHandlerService.getURIHandler(
>>>> > URIHandlerService.java:160)
>>>> >
>>>> > at
>>>> > org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs(
>>>> > CoordCommandUtils.java:465)
>>>> >
>>>> > at
>>>> > org.apache.oozie.command.coord.CoordCommandUtils.
>>>> > separateResolvedAndUnresolved(CoordCommandUtils.java:404)
>>>> >
>>>> > at
>>>> > org.apache.oozie.command.coord.CoordCommandUtils.
>>>> > materializeInputDataEvents(CoordCommandUtils.java:731)
>>>> >
>>>> > at
>>>> >
>>>> org.apache.oozie.command.coord.CoordCommandUtils.materializeOneInstance(
>>>> > CoordCommandUtils.java:546)
>>>> >
>

Re: Oozie for spark jobs without Hadoop

2018-05-16 Thread purna pradeep
This is what is in the logs

2018-05-16 14:06:13,500  INFO URIHandlerService:520 - SERVER[localhost]
Loaded urihandlers [org.apache.oozie.dependency.FSURIHandler]

2018-05-16 14:06:13,501  INFO URIHandlerService:520 - SERVER[localhost]
Loaded default urihandler org.apache.oozie.dependency.FSURIHandler


On Wed, May 16, 2018 at 12:27 PM Peter Cseh <gezap...@cloudera.com> wrote:

> That's strange, this exception should not happen in that case.
> Can you check the server logs for messages like this?
> LOG.info("Loaded urihandlers {0}", Arrays.toString(classes));
> LOG.info("Loaded default urihandler {0}",
> defaultHandler.getClass().getName());
> Thanks
>
> On Wed, May 16, 2018 at 5:47 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> This is what I already have in my oozie-site.xml
>>
>> 
>>
>>
>> oozie.service.HadoopAccessorService.supported.filesystems
>>
>> *
>>
>> 
>>
>> On Wed, May 16, 2018 at 11:37 AM Peter Cseh <gezap...@cloudera.com>
>> wrote:
>>
>>> You'll have to configure
>>> oozie.service.HadoopAccessorService.supported.filesystems
>>> hdfs,hftp,webhdfs Enlist
>>> the different filesystems supported for federation. If wildcard "*" is
>>> specified, then ALL file schemes will be allowed.properly.
>>>
>>> For testing purposes it's ok to put * in there in oozie-site.xml
>>>
>>> On Wed, May 16, 2018 at 5:29 PM, purna pradeep <purna2prad...@gmail.com>
>>> wrote:
>>>
>>> > Peter,
>>> >
>>> > I have tried to specify dataset with uri starting with s3://, s3a://
>>> and
>>> > s3n:// and I am getting exception
>>> >
>>> >
>>> >
>>> > Exception occurred:E0904: Scheme [s3] not supported in uri
>>> > [s3://mybucket/input.data] Making the job failed
>>> >
>>> > org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3] not
>>> > supported in uri [s3:// mybucket /input.data]
>>> >
>>> > at
>>> > org.apache.oozie.service.URIHandlerService.getURIHandler(
>>> > URIHandlerService.java:185)
>>> >
>>> > at
>>> > org.apache.oozie.service.URIHandlerService.getURIHandler(
>>> > URIHandlerService.java:168)
>>> >
>>> > at
>>> > org.apache.oozie.service.URIHandlerService.getURIHandler(
>>> > URIHandlerService.java:160)
>>> >
>>> > at
>>> > org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs(
>>> > CoordCommandUtils.java:465)
>>> >
>>> > at
>>> > org.apache.oozie.command.coord.CoordCommandUtils.
>>> > separateResolvedAndUnresolved(CoordCommandUtils.java:404)
>>> >
>>> > at
>>> > org.apache.oozie.command.coord.CoordCommandUtils.
>>> > materializeInputDataEvents(CoordCommandUtils.java:731)
>>> >
>>> > at
>>> >
>>> org.apache.oozie.command.coord.CoordCommandUtils.materializeOneInstance(
>>> > CoordCommandUtils.java:546)
>>> >
>>> > at
>>> > org.apache.oozie.command.coord.CoordMaterializeTransitionXCom
>>> > mand.materializeActions(CoordMaterializeTransitionXCommand.java:492)
>>> >
>>> > at
>>> > org.apache.oozie.command.coord.CoordMaterializeTransitionXCom
>>> > mand.materialize(CoordMaterializeTransitionXCommand.java:362)
>>> >
>>> > at
>>> > org.apache.oozie.command.MaterializeTransitionXCommand.execute(
>>> > MaterializeTransitionXCommand.java:73)
>>> >
>>> > at
>>> > org.apache.oozie.command.MaterializeTransitionXCommand.execute(
>>> > MaterializeTransitionXCommand.java:29)
>>> >
>>> > at org.apache.oozie.command.XCommand.call(XCommand.java:290)
>>> >
>>> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> >
>>> > at
>>> > org.apache.oozie.service.CallableQueueService$CallableWrapper.run(
>>> > CallableQueueService.java:181)
>>> >
>>> > at
>>> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> > ThreadPoolExecutor.java:1149)
>>> >
>>> > at
>>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> > ThreadPoolExecutor.java:624)
>>> >
>>&

Re: Oozie for spark jobs without Hadoop

2018-05-16 Thread purna pradeep
This is what I already have in my oozie-site.xml




oozie.service.HadoopAccessorService.supported.filesystems

*



On Wed, May 16, 2018 at 11:37 AM Peter Cseh <gezap...@cloudera.com> wrote:

> You'll have to configure
> oozie.service.HadoopAccessorService.supported.filesystems
> hdfs,hftp,webhdfs Enlist
> the different filesystems supported for federation. If wildcard "*" is
> specified, then ALL file schemes will be allowed.properly.
>
> For testing purposes it's ok to put * in there in oozie-site.xml
>
> On Wed, May 16, 2018 at 5:29 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
> > Peter,
> >
> > I have tried to specify dataset with uri starting with s3://, s3a:// and
> > s3n:// and I am getting exception
> >
> >
> >
> > Exception occurred:E0904: Scheme [s3] not supported in uri
> > [s3://mybucket/input.data] Making the job failed
> >
> > org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3] not
> > supported in uri [s3:// mybucket /input.data]
> >
> > at
> > org.apache.oozie.service.URIHandlerService.getURIHandler(
> > URIHandlerService.java:185)
> >
> > at
> > org.apache.oozie.service.URIHandlerService.getURIHandler(
> > URIHandlerService.java:168)
> >
> > at
> > org.apache.oozie.service.URIHandlerService.getURIHandler(
> > URIHandlerService.java:160)
> >
> > at
> > org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs(
> > CoordCommandUtils.java:465)
> >
> > at
> > org.apache.oozie.command.coord.CoordCommandUtils.
> > separateResolvedAndUnresolved(CoordCommandUtils.java:404)
> >
> > at
> > org.apache.oozie.command.coord.CoordCommandUtils.
> > materializeInputDataEvents(CoordCommandUtils.java:731)
> >
> > at
> > org.apache.oozie.command.coord.CoordCommandUtils.materializeOneInstance(
> > CoordCommandUtils.java:546)
> >
> > at
> > org.apache.oozie.command.coord.CoordMaterializeTransitionXCom
> > mand.materializeActions(CoordMaterializeTransitionXCommand.java:492)
> >
> > at
> > org.apache.oozie.command.coord.CoordMaterializeTransitionXCom
> > mand.materialize(CoordMaterializeTransitionXCommand.java:362)
> >
> > at
> > org.apache.oozie.command.MaterializeTransitionXCommand.execute(
> > MaterializeTransitionXCommand.java:73)
> >
> > at
> > org.apache.oozie.command.MaterializeTransitionXCommand.execute(
> > MaterializeTransitionXCommand.java:29)
> >
> > at org.apache.oozie.command.XCommand.call(XCommand.java:290)
> >
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >
> > at
> > org.apache.oozie.service.CallableQueueService$CallableWrapper.run(
> > CallableQueueService.java:181)
> >
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1149)
> >
> > at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:624)
> >
> > at java.lang.Thread.run(Thread.java:748)
> >
> >
> >
> > Is S3 support specific to CDH distribution or should it work in Apache
> > Oozie as well? I’m not using CDH yet so
> >
> > On Wed, May 16, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com>
> wrote:
> >
> > > I think it should be possible for Oozie to poll S3. Check out this
> > > <
> > > https://www.cloudera.com/documentation/enterprise/5-9-
> > x/topics/admin_oozie_s3.html
> > > >
> > > description on how to make it work in jobs, something similar should
> work
> > > on the server side as well
> > >
> > > On Tue, May 15, 2018 at 4:43 PM, purna pradeep <
> purna2prad...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Andras,
> > > >
> > > > Also I also would like to know if oozie supports Aws S3 as input
> events
> > > to
> > > > poll for a dependency file before kicking off a spark action
> > > >
> > > >
> > > > For example: I don’t want to kick off a spark action until a file is
> > > > arrived on a given AWS s3 location
> > > >
> > > > On Tue, May 15, 2018 at 10:17 AM Andras Piros <
> > andras.pi...@cloudera.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Oozie needs HDFS to store workflow, coordinator, or bundle
> > definitions,
> > &g

Re: Oozie for spark jobs without Hadoop

2018-05-16 Thread purna pradeep
+Peter

On Wed, May 16, 2018 at 11:29 AM purna pradeep <purna2prad...@gmail.com>
wrote:

> Peter,
>
> I have tried to specify dataset with uri starting with s3://, s3a:// and
> s3n:// and I am getting exception
>
>
>
> Exception occurred:E0904: Scheme [s3] not supported in uri
> [s3://mybucket/input.data] Making the job failed
>
> org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3] not
> supported in uri [s3:// mybucket /input.data]
>
> at
> org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:185)
>
> at
> org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:168)
>
> at
> org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:160)
>
> at
> org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs(CoordCommandUtils.java:465)
>
> at
> org.apache.oozie.command.coord.CoordCommandUtils.separateResolvedAndUnresolved(CoordCommandUtils.java:404)
>
> at
> org.apache.oozie.command.coord.CoordCommandUtils.materializeInputDataEvents(CoordCommandUtils.java:731)
>
> at
> org.apache.oozie.command.coord.CoordCommandUtils.materializeOneInstance(CoordCommandUtils.java:546)
>
> at
> org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand.materializeActions(CoordMaterializeTransitionXCommand.java:492)
>
> at
> org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand.materialize(CoordMaterializeTransitionXCommand.java:362)
>
> at
> org.apache.oozie.command.MaterializeTransitionXCommand.execute(MaterializeTransitionXCommand.java:73)
>
> at
> org.apache.oozie.command.MaterializeTransitionXCommand.execute(MaterializeTransitionXCommand.java:29)
>
> at org.apache.oozie.command.XCommand.call(XCommand.java:290)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:181)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>
> Is S3 support specific to CDH distribution or should it work in Apache
> Oozie as well? I’m not using CDH yet so
>
> On Wed, May 16, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> wrote:
>
>> I think it should be possible for Oozie to poll S3. Check out this
>> <
>> https://www.cloudera.com/documentation/enterprise/5-9-x/topics/admin_oozie_s3.html
>> >
>> description on how to make it work in jobs, something similar should work
>> on the server side as well
>>
>> On Tue, May 15, 2018 at 4:43 PM, purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>> > Thanks Andras,
>> >
>> > Also I also would like to know if oozie supports Aws S3 as input events
>> to
>> > poll for a dependency file before kicking off a spark action
>> >
>> >
>> > For example: I don’t want to kick off a spark action until a file is
>> > arrived on a given AWS s3 location
>> >
>> > On Tue, May 15, 2018 at 10:17 AM Andras Piros <
>> andras.pi...@cloudera.com>
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > Oozie needs HDFS to store workflow, coordinator, or bundle
>> definitions,
>> > as
>> > > well as sharelib files in a safe, distributed and scalable way. Oozie
>> > needs
>> > > YARN to run almost all of its actions, Spark action being no
>> exception.
>> > >
>> > > At the moment it's not feasible to install Oozie without those Hadoop
>> > > components. How to install Oozie please *find here
>> > > <https://oozie.apache.org/docs/5.0.0/AG_Install.html>*.
>> > >
>> > > Regards,
>> > >
>> > > Andras
>> > >
>> > > On Tue, May 15, 2018 at 4:11 PM, purna pradeep <
>> purna2prad...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > Would like to know if I can use sparkaction in oozie without having
>> > > Hadoop
>> > > > cluster?
>> > > >
>> > > > I want to use oozie to schedule spark jobs on Kubernetes cluster
>> > > >
>> > > > I’m a beginner in oozie
>> > > >
>> > > > Thanks
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> *Peter Cseh *| Software Engineer
>> cloudera.com <https://www.cloudera.com>
>>
>> [image: Cloudera] <https://www.cloudera.com/>
>>
>> [image: Cloudera on Twitter] <https://twitter.com/cloudera> [image:
>> Cloudera on Facebook] <https://www.facebook.com/cloudera> [image:
>> Cloudera
>> on LinkedIn] <https://www.linkedin.com/company/cloudera>
>> --
>>
>


Re: Oozie for spark jobs without Hadoop

2018-05-16 Thread purna pradeep
Peter,

I have tried to specify dataset with uri starting with s3://, s3a:// and
s3n:// and I am getting exception



Exception occurred:E0904: Scheme [s3] not supported in uri
[s3://mybucket/input.data] Making the job failed

org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3] not
supported in uri [s3:// mybucket /input.data]

at
org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:185)

at
org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:168)

at
org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:160)

at
org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs(CoordCommandUtils.java:465)

at
org.apache.oozie.command.coord.CoordCommandUtils.separateResolvedAndUnresolved(CoordCommandUtils.java:404)

at
org.apache.oozie.command.coord.CoordCommandUtils.materializeInputDataEvents(CoordCommandUtils.java:731)

at
org.apache.oozie.command.coord.CoordCommandUtils.materializeOneInstance(CoordCommandUtils.java:546)

at
org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand.materializeActions(CoordMaterializeTransitionXCommand.java:492)

at
org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand.materialize(CoordMaterializeTransitionXCommand.java:362)

at
org.apache.oozie.command.MaterializeTransitionXCommand.execute(MaterializeTransitionXCommand.java:73)

at
org.apache.oozie.command.MaterializeTransitionXCommand.execute(MaterializeTransitionXCommand.java:29)

at org.apache.oozie.command.XCommand.call(XCommand.java:290)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:181)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)



Is S3 support specific to CDH distribution or should it work in Apache
Oozie as well? I’m not using CDH yet so

On Wed, May 16, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> wrote:

> I think it should be possible for Oozie to poll S3. Check out this
> <
> https://www.cloudera.com/documentation/enterprise/5-9-x/topics/admin_oozie_s3.html
> >
> description on how to make it work in jobs, something similar should work
> on the server side as well
>
> On Tue, May 15, 2018 at 4:43 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
> > Thanks Andras,
> >
> > Also I also would like to know if oozie supports Aws S3 as input events
> to
> > poll for a dependency file before kicking off a spark action
> >
> >
> > For example: I don’t want to kick off a spark action until a file is
> > arrived on a given AWS s3 location
> >
> > On Tue, May 15, 2018 at 10:17 AM Andras Piros <andras.pi...@cloudera.com
> >
> > wrote:
> >
> > > Hi,
> > >
> > > Oozie needs HDFS to store workflow, coordinator, or bundle definitions,
> > as
> > > well as sharelib files in a safe, distributed and scalable way. Oozie
> > needs
> > > YARN to run almost all of its actions, Spark action being no exception.
> > >
> > > At the moment it's not feasible to install Oozie without those Hadoop
> > > components. How to install Oozie please *find here
> > > <https://oozie.apache.org/docs/5.0.0/AG_Install.html>*.
> > >
> > > Regards,
> > >
> > > Andras
> > >
> > > On Tue, May 15, 2018 at 4:11 PM, purna pradeep <
> purna2prad...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Would like to know if I can use sparkaction in oozie without having
> > > Hadoop
> > > > cluster?
> > > >
> > > > I want to use oozie to schedule spark jobs on Kubernetes cluster
> > > >
> > > > I’m a beginner in oozie
> > > >
> > > > Thanks
> > > >
> > >
> >
>
>
>
> --
> *Peter Cseh *| Software Engineer
> cloudera.com <https://www.cloudera.com>
>
> [image: Cloudera] <https://www.cloudera.com/>
>
> [image: Cloudera on Twitter] <https://twitter.com/cloudera> [image:
> Cloudera on Facebook] <https://www.facebook.com/cloudera> [image: Cloudera
> on LinkedIn] <https://www.linkedin.com/company/cloudera>
> --
>


Re: Spark 2.3 in oozie

2018-05-15 Thread purna pradeep
I’m able to compile successfully with after adding this override option

-Dspark.scala.binary.version=2.11

Dspark.version = 2.3.0

But when I’m running a spark action with spark-pi example jar against
Kubernetes master I’m getting below error in stderr log


*Error:Could not load KUBERNETES classes.This copy of spark may not have
been compiled with Kubernetes support*

Below is my workflow.xml

<*spark xmlns="uri:oozie:spark-action:1.0">*

*${resourceManager}*

*${nameNode}*

*k8s://<***.com>*

*Python-Spark-Pi*

*spark-examples_2.11-2.3.0.jar*

*--class org.apache.spark.examples.SparkPi --conf
spark.executor.instances=2 --conf spark.kubernetes.namespace=spark --conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf
spark.kubernetes.container.image=artifactory.cloud.capitalone.com/kubespark/spark-quantum:v2.3.0
<http://artifactory.cloud.capitalone.com/kubespark/spark-quantum:v2.3.0>
--conf spark.kubernetes.node.selector.node-role.kubernetes.io/worker=true
<http://spark.kubernetes.node.selector.node-role.kubernetes.io/worker=true>
--conf
spark.kubernetes.driver.label.application=is1-driver --conf
spark.kubernetes.executor.label.application=is1-exec*utor
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar




Is this because of uri:oozie:spark-action:1.0 in spark xml tag? Does it
needs to be spark-action:2.0 as I’m using spark 2.3?


Please suggest!


On Tue, May 15, 2018 at 12:43 PM Peter Cseh <gezap...@cloudera.com> wrote:

> I think the error is related to the Scala version being present in the
> artifact name.
> I'll take a look at this tomorrow.
> Gp
>
> On Tue, May 15, 2018, 18:28 Artem Ervits <artemerv...@gmail.com> wrote:
>
> > Did you run
> > mvn clean install first on the parent directory?
> >
> > On Tue, May 15, 2018, 11:35 AM purna pradeep <purna2prad...@gmail.com>
> > wrote:
> >
> > > Thanks peter,
> > >
> > > I have tried changing Dspark.version to 2.3.0 and compiled oozie I’m
> > > getting below error from oozie examples
> > >
> > >
> > > *ERROR] Failed to execute goal on project oozie-examples: Could not
> > resolve
> > > dependencies for project org.apache.oozie:oozie-examples:jar:5.0.0:
> Could
> > > not find artifact org.apache.spark:spark-core_2.10:jar:2.3.0 in
> > resolution
> > > *
> > >
> > > On Tue, May 15, 2018 at 11:14 AM Peter Cseh <gezap...@cloudera.com>
> > wrote:
> > >
> > > > Oozie has a spark-2 profile that is currently hard-coded to Spark
> 2.1:
> > > > https://github.com/apache/oozie/blob/master/pom.xml#L1983
> > > > I'm sure if you overwrite the -Dspark.version and compile Oozie that
> > way
> > > it
> > > > will work.
> > > > gp
> > > >
> > > >
> > > > On Tue, May 15, 2018 at 5:07 PM, purna pradeep <
> > purna2prad...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > Does oozie supports spark 2.3? Or will it even care of the spark
> > > version
> > > > >
> > > > > I want to use spark action
> > > > >
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Purna
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > *Peter Cseh *| Software Engineer
> > > > cloudera.com <https://www.cloudera.com>
> > > >
> > > > [image: Cloudera] <https://www.cloudera.com/>
> > > >
> > > > [image: Cloudera on Twitter] <https://twitter.com/cloudera> [image:
> > > > Cloudera on Facebook] <https://www.facebook.com/cloudera> [image:
> > > Cloudera
> > > > on LinkedIn] <https://www.linkedin.com/company/cloudera>
> > > > --
> > > >
> > >
> >
>


Re: Spark 2.3 in oozie

2018-05-15 Thread purna pradeep
Thanks peter,

I have tried changing Dspark.version to 2.3.0 and compiled oozie I’m
getting below error from oozie examples


*ERROR] Failed to execute goal on project oozie-examples: Could not resolve
dependencies for project org.apache.oozie:oozie-examples:jar:5.0.0: Could
not find artifact org.apache.spark:spark-core_2.10:jar:2.3.0 in resolution *

On Tue, May 15, 2018 at 11:14 AM Peter Cseh <gezap...@cloudera.com> wrote:

> Oozie has a spark-2 profile that is currently hard-coded to Spark 2.1:
> https://github.com/apache/oozie/blob/master/pom.xml#L1983
> I'm sure if you overwrite the -Dspark.version and compile Oozie that way it
> will work.
> gp
>
>
> On Tue, May 15, 2018 at 5:07 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
> > Hello,
> >
> > Does oozie supports spark 2.3? Or will it even care of the spark version
> >
> > I want to use spark action
> >
> >
> >
> > Thanks,
> > Purna
> >
>
>
>
> --
> *Peter Cseh *| Software Engineer
> cloudera.com <https://www.cloudera.com>
>
> [image: Cloudera] <https://www.cloudera.com/>
>
> [image: Cloudera on Twitter] <https://twitter.com/cloudera> [image:
> Cloudera on Facebook] <https://www.facebook.com/cloudera> [image: Cloudera
> on LinkedIn] <https://www.linkedin.com/company/cloudera>
> --
>


Spark 2.3 in oozie

2018-05-15 Thread purna pradeep
Hello,

Does oozie supports spark 2.3? Or will it even care of the spark version

I want to use spark action



Thanks,
Purna


Re: Oozie for spark jobs without Hadoop

2018-05-15 Thread purna pradeep
Thanks Andras,

Also I also would like to know if oozie supports Aws S3 as input events to
poll for a dependency file before kicking off a spark action


For example: I don’t want to kick off a spark action until a file is
arrived on a given AWS s3 location

On Tue, May 15, 2018 at 10:17 AM Andras Piros <andras.pi...@cloudera.com>
wrote:

> Hi,
>
> Oozie needs HDFS to store workflow, coordinator, or bundle definitions, as
> well as sharelib files in a safe, distributed and scalable way. Oozie needs
> YARN to run almost all of its actions, Spark action being no exception.
>
> At the moment it's not feasible to install Oozie without those Hadoop
> components. How to install Oozie please *find here
> <https://oozie.apache.org/docs/5.0.0/AG_Install.html>*.
>
> Regards,
>
> Andras
>
> On Tue, May 15, 2018 at 4:11 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Would like to know if I can use sparkaction in oozie without having
> Hadoop
> > cluster?
> >
> > I want to use oozie to schedule spark jobs on Kubernetes cluster
> >
> > I’m a beginner in oozie
> >
> > Thanks
> >
>


Workflow S3 listener

2018-05-13 Thread purna pradeep
Hi,
Hi,

I’m very new to oozie ,actually I would like to run Spark 2.3 jobs on oozie
based on file arrival on aws s3 which is a dependency for the job


I see some examples which uses s3 as input event datasets as below




s3n://mybucket/a/b/${YEAR}/${MONTH}/${DAY}



So my question is does oozie listenes to file arrival on aws s3 to check
for dependency before kicking off spark job ??

Thanks


Oozie with spark 2.3 in Kubernetes

2018-05-11 Thread purna pradeep
Hello,

Would like to know if anyone tried oozie with spark 2.3 actions on
Kubernetes for scheduling spark jobs .


Thanks,
Purna


Re: Scala program to spark-submit on k8 cluster

2018-04-04 Thread purna pradeep
yes “REST application that submits a Spark job to a k8s cluster by running
spark-submit programmatically” and also would like to expose as a
 Kubernetes service so that clients can access as any other Rest api

On Wed, Apr 4, 2018 at 12:25 PM Yinan Li  wrote:

> Hi Kittu,
>
> What do you mean by "a Scala program"? Do you mean a program that submits
> a Spark job to a k8s cluster by running spark-submit programmatically, or
> some example Scala application that is to run on the cluster?
>
> On Wed, Apr 4, 2018 at 4:45 AM, Kittu M  wrote:
>
>> Hi,
>>
>> I’m looking for a Scala program to spark submit a Scala application
>> (spark 2.3 job) on k8 cluster .
>>
>> Any help  would be much appreciated. Thanks
>>
>>
>>
>


unsubscribe

2018-04-02 Thread purna pradeep
unsubscribe


unsubscribe

2018-03-28 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2018-03-28 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster

2018-03-21 Thread purna pradeep
Thanks Yinan,

Looks like this is stil in alpha version.

Would like to know if there is any rest-interface for spark2.3 job
submission similar to spark 2.2 as I need to submit spark applications to
k8 master based on different events (cron or s3 file based trigger)

On Tue, Mar 20, 2018 at 11:50 PM Yinan Li <liyinan...@gmail.com> wrote:

> One option is the Spark Operator
> <https://github.com/GoogleCloudPlatform/spark-on-k8s-operator>. It allows
> specifying and running Spark applications on Kubernetes using Kubernetes
> custom resources objects. It takes SparkApplication CRD objects and
> automatically submits the applications to run on a Kubernetes cluster.
>
> Yinan
>
> On Tue, Mar 20, 2018 at 7:47 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3
>> ,now i want to run spark-submit from AWS lambda function to k8s
>> master,would like to know if there is any REST interface to run Spark
>> submit on k8s Master
>
>
>


Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster

2018-03-20 Thread purna pradeep
Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3
,now i want to run spark-submit from AWS lambda function to k8s
master,would like to know if there is any REST interface to run Spark
submit on k8s Master


[kubernetes-users] Cluster-autoscaler v1.0.4 error

2018-03-14 Thread purna pradeep
Hello ,I’m using CA v1.0.4 from kubernetes for aws node autoscaling
(Kubernetes version 1.8.3)



https://github.com/kubernetes/autoscaler/tree/master/cluster-autoscaler/cloudprovider/aws

And I’m getting below error in the logs of CA pod


node registry: RequestError: send request failed
caused by: Post https://autoscaling.us-west-2.amazonaws.com/: dial tcp: i/o
timeout


I have tried deploying kube-dns and CA on master nodes as suggested in this
git issue
still no luck

https://github.com/kubernetes/autoscaler/issues/113.   I also have
autoscaling:* for IAM role policy


Please suggest !

-- 
You received this message because you are subscribed to the Google Groups 
"Kubernetes user discussion and Q" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to kubernetes-users+unsubscr...@googlegroups.com.
To post to this group, send email to kubernetes-users@googlegroups.com.
Visit this group at https://groups.google.com/group/kubernetes-users.
For more options, visit https://groups.google.com/d/optout.


Re: Spark 2.3 submit on Kubernetes error

2018-03-12 Thread purna pradeep
Thanks Yinan,

I’m able to get kube-dns endpoints when I ran this command

kubectl get ep kube-dns —namespace=kube-system

Do I need to deploy under kube-system instead of default namespace

And please lemme know if you have any insights on Error1 ?

On Sun, Mar 11, 2018 at 8:26 PM Yinan Li <liyinan...@gmail.com> wrote:

> Spark on Kubernetes requires the presence of the kube-dns add-on properly
> configured. The executors connect to the driver through a headless
> Kubernetes service using the DNS name of the service. Can you check if you
> have the add-on installed in your cluster? This issue
> https://github.com/apache-spark-on-k8s/spark/issues/558 might help.
>
>
> On Sun, Mar 11, 2018 at 5:01 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Getting below errors when I’m trying to run spark-submit on k8 cluster
>>
>>
>> *Error 1*:This looks like a warning it doesn’t interrupt the app running
>> inside executor pod but keeps on getting this warning
>>
>>
>> *2018-03-09 11:15:21 WARN  WatchConnectionManager:192 - Exec Failure*
>> *java.io.EOFException*
>> *   at
>> okio.RealBufferedSource.require(RealBufferedSource.java:60)*
>> *   at
>> okio.RealBufferedSource.readByte(RealBufferedSource.java:73)*
>> *   at okhttp3.internal.ws
>> <http://okhttp3.internal.ws>.WebSocketReader.readHeader(WebSocketReader.java:113)*
>> *   at okhttp3.internal.ws
>> <http://okhttp3.internal.ws>.WebSocketReader.processNextFrame(WebSocketReader.java:97)*
>> *   at okhttp3.internal.ws
>> <http://okhttp3.internal.ws>.RealWebSocket.loopReader(RealWebSocket.java:262)*
>> *   at okhttp3.internal.ws
>> <http://okhttp3.internal.ws>.RealWebSocket$2.onResponse(RealWebSocket.java:201)*
>> *   at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)*
>> *   at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)*
>> *   at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)*
>> *   at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)*
>> *   at java.lang.Thread.run(Thread.java:748)*
>>
>>
>>
>> *Error2:* This is intermittent error  which is failing the executor pod
>> to run
>>
>>
>> *org.apache.spark.SparkException: External scheduler cannot be
>> instantiated*
>> * at
>> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)*
>> * at org.apache.spark.SparkContext.(SparkContext.scala:492)*
>> * at
>> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)*
>> * at
>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)*
>> * at
>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)*
>> * at scala.Option.getOrElse(Option.scala:121)*
>> * at
>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)*
>> * at
>> com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)*
>> * at
>> com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)*
>> * at
>> com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)*
>> * at
>> com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)*
>> * at
>> com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)*
>> *Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
>> Operation: [get]  for kind: [Pod]  with name:
>> [myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver]  in namespace:
>> [default]  failed.*
>> * at
>> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)*
>> * at
>> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)*
>> * at
>> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)*
>> * at
>> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)*
>> * at
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)*
>> * at
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)*
>> * at
>> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$cr

Spark 2.3 submit on Kubernetes error

2018-03-11 Thread purna pradeep
Getting below errors when I’m trying to run spark-submit on k8 cluster


*Error 1*:This looks like a warning it doesn’t interrupt the app running
inside executor pod but keeps on getting this warning


*2018-03-09 11:15:21 WARN  WatchConnectionManager:192 - Exec Failure*
*java.io.EOFException*
*   at okio.RealBufferedSource.require(RealBufferedSource.java:60)*
*   at okio.RealBufferedSource.readByte(RealBufferedSource.java:73)*
*   at
okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:113)*
*   at
okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:97)*
*   at
okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:262)*
*   at
okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:201)*
*   at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)*
*   at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)*
*   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)*
*   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)*
*   at java.lang.Thread.run(Thread.java:748)*



*Error2:* This is intermittent error  which is failing the executor pod to
run


*org.apache.spark.SparkException: External scheduler cannot be
instantiated*
* at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)*
* at org.apache.spark.SparkContext.(SparkContext.scala:492)*
* at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)*
* at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)*
* at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)*
* at scala.Option.getOrElse(Option.scala:121)*
* at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)*
* at
com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)*
* at
com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)*
* at
com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)*
* at
com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)*
* at
com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)*
*Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
Operation: [get]  for kind: [Pod]  with name:
[myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver]  in namespace:
[default]  failed.*
* at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)*
* at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)*
* at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)*
* at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)*
* at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)*
* at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)*
* at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)*
* ... 11 more*
*Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try
again*
* at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)*
* at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)*
* at
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)*
* at java.net.InetAddress.getAllByName0(InetAddress.java:1276)*
* at java.net.InetAddress.getAllByName(InetAddress.java:1192)*
* at java.net.InetAddress.getAllByName(InetAddress.java:1126)*
* at okhttp3.Dns$1.lookup(Dns.java:39)*
* at
okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)*
* at
okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)*
* at
okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)*
* at
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)*
* at
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)*
* at
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)*
* at
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)*
* at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)*
* at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)*
* at
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)*
* at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)*
* 

handling Remote dependencies for spark-submit in spark 2.3 with kubernetes

2018-03-08 Thread purna pradeep
Im trying to run spark-submit to kubernetes cluster with spark 2.3 docker
container image

The challenge im facing is application have a mainapplication.jar and other
dependency files & jars which are located in Remote location like AWS s3
,but as per spark 2.3 documentation there is something called kubernetes
init-container to download remote dependencies but in this case im not
creating any Podspec to include init-containers in kubernetes, as per
documentation Spark 2.3 spark/kubernetes internally creates Pods
(driver,executor) So not sure how can i use init-container for spark-submit
when there are remote dependencies.

https://spark.apache.org/docs/latest/running-on-kubernetes.html#using-remote-dependencies

Please suggest


Unsubscribe

2018-02-27 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2018-02-26 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2018-02-26 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unsubscribe

2018-02-11 Thread purna pradeep
Unsubscribe


Executor not getting added SparkUI & Spark Eventlog in deploymode:cluster

2017-11-14 Thread Mamillapalli, Purna Pradeep
Hi all,

Im performing spark submit using Spark rest api POST operation on 6066 port 
with below config

> Launch Command:
> "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.141-1.b16.el7_3.x86_64/jre/bin/java"
> "-cp" "/usr/local/spark/conf/:/usr/local/spark/jars/*" "-Xmx4096M"
> "-Dspark.eventLog.enabled=true"
> "-Dspark.app.name=WorkflowApp"
> "-Dspark.submit.deployMode=cluster"
> "-Dspark.local.dir=/data0,/data1,/data2,/data3"
> "-Dspark.executor.cores=2" "-Dspark.master=spark://:7077"
> "-Dspark.serializer=org.apache.spark.serializer.KryoSerializer"
> "-Dspark.jars=s3a://<***>.jar" "-Dspark.driver.supervise=false"
> "-Dspark.history.fs.logDirectory=s3a://<*>/"
> "-Dspark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256"
> "-Dspark.driver.memory=4G" "-Dspark.executor.memory=4G"
> "-Dspark.eventLog.dir=s3a://<*>/"
> "org.apache.spark.deploy.worker.DriverWrapper" "spark://Worker@<***>"
> "/usr/local/spark/work/driver-<***>.jar" "MyApp" "-c" "s3a://<***>"


when i looked into Spark eventlog below is what i observed

{"Event":"SparkListenerExecutorAdded","Timestamp":1510633498623,"Executor 
ID":"driver","Executor Info":{"Host":"localhost","Total Cores":2,"Log Urls":{}}}
"spark.master":"local[*]"


Though i ran in deployMode as cluster  the slave ip is not shown in Host 
section & spark.master is shown as local[*] above ,because of this the job is 
running only on driver and therefore when job is submitted its not showing up 
in http://:8080  under Running and Completed applications and it 
shows only under Running Drivers & Completed Drivers. Please suggest



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Spark http: Not showing completed apps

2017-11-08 Thread purna pradeep
Hi,

I'm using spark  standalone in aws ec2 .And I'm using spark rest
API http::8080/Json to get completed apps but the Json completed
apps as empty array though the job ran successfully.


Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-30 Thread purna pradeep
@Andres I need latest but it should less than 10 months based income_age
column and don't want to use sql here

On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi <iaiva...@gmail.com> wrote:

> Hi, if you need the last value from income in window function you can use
> last_value.
> No tested but meaby with @ayan sql
>
> spark.sql("select *, row_number(), last_value(income) over (partition by
> id order by income_age_ts desc) r from t")
>
>
> On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> @ayan,
>>
>> Thanks for your response
>>
>> I would like to have functions in this case  calculateIncome and the
>> reason why I need function is to reuse in other parts of the application
>> ..that's the reason I'm planning for mapgroups with function as argument
>> which takes rowiterator ..but not sure if this is the best to implement as
>> my initial dataframe is very large
>>
>> On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> the tool you are looking for is window function.  Example:
>>>
>>> >>> df.show()
>>> +++---+--+-+
>>> |JoinDate|dept| id|income|income_age_ts|
>>> +++---+--+-+
>>> | 4/20/13|  ES|101| 19000|  4/20/17|
>>> | 4/20/13|  OS|101| 1|  10/3/15|
>>> | 4/20/12|  DS|102| 13000|   5/9/17|
>>> | 4/20/12|  CS|102| 12000|   5/8/17|
>>> | 4/20/10|  EQ|103| 1|   5/9/17|
>>> | 4/20/10|  MD|103|  9000|   5/8/17|
>>> +++---+--+-+
>>>
>>> >>> res = spark.sql("select *, row_number() over (partition by id order
>>> by income_age_ts desc) r from t")
>>> >>> res.show()
>>> +++---+--+-+---+
>>> |JoinDate|dept| id|income|income_age_ts|  r|
>>> +++---+--+-+---+
>>> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
>>> | 4/20/10|  MD|103|  9000|   5/8/17|  2|
>>> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
>>> | 4/20/13|  OS|101| 1|  10/3/15|  2|
>>> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
>>> | 4/20/12|  CS|102| 12000|   5/8/17|  2|
>>> +++---+--+-+---+
>>>
>>> >>> res = spark.sql("select * from (select *, row_number() over
>>> (partition by id order by income_age_ts desc) r from t) x where r=1")
>>> >>> res.show()
>>> +++---+--+-+---+
>>> |JoinDate|dept| id|income|income_age_ts|  r|
>>> +++---+--+-----+---+
>>> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
>>> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
>>> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
>>> +++---+--+-+---+
>>>
>>> This should be better because it uses all in-built optimizations in
>>> Spark.
>>>
>>> Best
>>> Ayan
>>>
>>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com
>>> > wrote:
>>>
>>>> Please click on unnamed text/html  link for better view
>>>>
>>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> -- Forwarded message -
>>>>> From: Mamillapalli, Purna Pradeep <
>>>>> purnapradeep.mamillapa...@capitalone.com>
>>>>> Date: Tue, Aug 29, 2017 at 8:08 PM
>>>>> Subject: Spark question
>>>>> To: purna pradeep <purna2prad...@gmail.com>
>>>>>
>>>>> Below is the input Dataframe(In real this is a very large Dataframe)
>>>>>
>>>>>
>>>>>
>>>>> EmployeeID
>>>>>
>>>>> INCOME
>>>>>
>>>>> INCOME AGE TS
>>>>>
>>>>> JoinDate
>>>>>
>>>>> Dept
>>>>>
>>>>> 101
>>>>>
>>>>> 19000
>>>>>
>>>>> 4/20/17
>>>>>
>>>>> 4/20/13
>>>>>
>>>>> ES
>>>>>
>>>>> 101
>>>>>
>>>>> 1
>>>>>
>>>>> 10/3/15
>>>>>
>>>>> 4/20/13
>>>>

Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
@ayan,

Thanks for your response

I would like to have functions in this case  calculateIncome and the reason
why I need function is to reuse in other parts of the application ..that's
the reason I'm planning for mapgroups with function as argument which takes
rowiterator ..but not sure if this is the best to implement as my initial
dataframe is very large

On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> the tool you are looking for is window function.  Example:
>
> >>> df.show()
> +++---+--+-+
> |JoinDate|dept| id|income|income_age_ts|
> +++---+--+-+
> | 4/20/13|  ES|101| 19000|  4/20/17|
> | 4/20/13|  OS|101| 1|  10/3/15|
> | 4/20/12|  DS|102| 13000|   5/9/17|
> | 4/20/12|  CS|102| 12000|   5/8/17|
> | 4/20/10|  EQ|103| 1|   5/9/17|
> | 4/20/10|  MD|103|  9000|   5/8/17|
> +++---+--+-+
>
> >>> res = spark.sql("select *, row_number() over (partition by id order by
> income_age_ts desc) r from t")
> >>> res.show()
> +++---+--+-+---+
> |JoinDate|dept| id|income|income_age_ts|  r|
> +++---+--+-+---+
> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
> | 4/20/10|  MD|103|  9000|   5/8/17|  2|
> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
> | 4/20/13|  OS|101| 1|  10/3/15|  2|
> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
> | 4/20/12|  CS|102| 12000|   5/8/17|  2|
> +++---+--+-+---+
>
> >>> res = spark.sql("select * from (select *, row_number() over (partition
> by id order by income_age_ts desc) r from t) x where r=1")
> >>> res.show()
> +++---+--+-+---+
> |JoinDate|dept| id|income|income_age_ts|  r|
> +++---+--+-+---+
> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
> +++---+--+-+---+
>
> This should be better because it uses all in-built optimizations in Spark.
>
> Best
> Ayan
>
> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Please click on unnamed text/html  link for better view
>>
>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>>
>>> -- Forwarded message -
>>> From: Mamillapalli, Purna Pradeep <
>>> purnapradeep.mamillapa...@capitalone.com>
>>> Date: Tue, Aug 29, 2017 at 8:08 PM
>>> Subject: Spark question
>>> To: purna pradeep <purna2prad...@gmail.com>
>>>
>>> Below is the input Dataframe(In real this is a very large Dataframe)
>>>
>>>
>>>
>>> EmployeeID
>>>
>>> INCOME
>>>
>>> INCOME AGE TS
>>>
>>> JoinDate
>>>
>>> Dept
>>>
>>> 101
>>>
>>> 19000
>>>
>>> 4/20/17
>>>
>>> 4/20/13
>>>
>>> ES
>>>
>>> 101
>>>
>>> 1
>>>
>>> 10/3/15
>>>
>>> 4/20/13
>>>
>>> OS
>>>
>>> 102
>>>
>>> 13000
>>>
>>> 5/9/17
>>>
>>> 4/20/12
>>>
>>> DS
>>>
>>> 102
>>>
>>> 12000
>>>
>>> 5/8/17
>>>
>>> 4/20/12
>>>
>>> CS
>>>
>>> 103
>>>
>>> 1
>>>
>>> 5/9/17
>>>
>>> 4/20/10
>>>
>>> EQ
>>>
>>> 103
>>>
>>> 9000
>>>
>>> 5/8/15
>>>
>>> 4/20/10
>>>
>>> MD
>>>
>>> Get the latest income of an employee which has  Income_age ts <10 months
>>>
>>> Expected output Dataframe
>>>
>>> EmployeeID
>>>
>>> INCOME
>>>
>>> INCOME AGE TS
>>>
>>> JoinDate
>>>
>>> Dept
>>>
>>> 101
>>>
>>> 19000
>>>
>>> 4/20/17
>>>
>>> 4/20/13
>>>
>>> ES
>>>
>>> 102
>>>
>>> 13000
>>>
>>> 5/9/17
>>>
>>> 4/20/12
>>>
>>> DS
>>>
>>> 103
>>>
>>> 1
>>>
>>> 5/9/17
>>>
>>> 4/20/10
>>>
>

Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
Please click on unnamed text/html  link for better view

On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com>
wrote:

>
> -- Forwarded message -
> From: Mamillapalli, Purna Pradeep <
> purnapradeep.mamillapa...@capitalone.com>
> Date: Tue, Aug 29, 2017 at 8:08 PM
> Subject: Spark question
> To: purna pradeep <purna2prad...@gmail.com>
>
> Below is the input Dataframe(In real this is a very large Dataframe)
>
>
>
> EmployeeID
>
> INCOME
>
> INCOME AGE TS
>
> JoinDate
>
> Dept
>
> 101
>
> 19000
>
> 4/20/17
>
> 4/20/13
>
> ES
>
> 101
>
> 1
>
> 10/3/15
>
> 4/20/13
>
> OS
>
> 102
>
> 13000
>
> 5/9/17
>
> 4/20/12
>
> DS
>
> 102
>
> 12000
>
> 5/8/17
>
> 4/20/12
>
> CS
>
> 103
>
> 1
>
> 5/9/17
>
> 4/20/10
>
> EQ
>
> 103
>
> 9000
>
> 5/8/15
>
> 4/20/10
>
> MD
>
> Get the latest income of an employee which has  Income_age ts <10 months
>
> Expected output Dataframe
>
> EmployeeID
>
> INCOME
>
> INCOME AGE TS
>
> JoinDate
>
> Dept
>
> 101
>
> 19000
>
> 4/20/17
>
> 4/20/13
>
> ES
>
> 102
>
> 13000
>
> 5/9/17
>
> 4/20/12
>
> DS
>
> 103
>
> 1
>
> 5/9/17
>
> 4/20/10
>
> EQ
>
>
>





Below is what im planning to implement
>
>
>
> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
> *JOINDATE*: Int,DEPT:String)
>
>
>
> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
> *"Date"*). add(*"DEPT"*,*"String"*)
>
>
>
> *//Reading from the File **import *sparkSession.implicits._
>
> *val *readEmpFile = sparkSession.read
>   .option(*"sep"*, *","*)
>   .schema(empSchema)
>   .csv(INPUT_DIRECTORY)
>
>
> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]
>
>
> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
> EmployeeID*)
>
>
> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>
>
>
>
>
> *def *performETL(empData: Iterator[employee]) : new employee  = {
>
>   *val *empList = empData.toList
>
>
> *//calculate income has Logic to figureout latest income for an account
> and returns latest income   val *income = calculateIncome(empList)
>
>
>   *for *(i <- empList) {
>
>   *val *row = i
>
> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>   }
>   *return  "Done"*
>
>
>
> }
>
>
>
> Is this a better approach or even the right approach to implement the
> same.If not please suggest a better way to implement the same?
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: use WithColumn with external function in a java jar

2017-08-29 Thread purna pradeep
Thanks, I'll check it out.

On Mon, Aug 28, 2017 at 10:22 PM Praneeth Gayam <praneeth.ga...@gmail.com>
wrote:

> You can create a UDF which will invoke your java lib
>
> def calculateExpense: UserDefinedFunction = udf((pexpense: String, cexpense: 
> String) => new MyJava().calculateExpense(pexpense.toDouble, 
> cexpense.toDouble))
>
>
>
>
>
> On Tue, Aug 29, 2017 at 6:53 AM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> I have data in a DataFrame with below columns
>>
>> 1)Fileformat is csv
>> 2)All below column datatypes are String
>>
>> employeeid,pexpense,cexpense
>>
>> Now I need to create a new DataFrame which has new column called
>> `expense`, which is calculated based on columns `pexpense`, `cexpense`.
>>
>> The tricky part is the calculation algorithm is not an **UDF** function
>> which I created, but it's an external function that needs to be imported
>> from a Java library which takes primitive types as arguments - in this case
>> `pexpense`, `cexpense` - to calculate the value required for new column.
>>
>> The external function signature
>>
>> public class MyJava
>>
>> {
>>
>> public Double calculateExpense(Double pexpense, Double cexpense) {
>>// calculation
>> }
>>
>> }
>>
>> So how can I invoke that external function to create a new calculated
>> column. Can I register that external function as UDF in my Spark
>> application?
>>
>> Stackoverflow reference
>>
>>
>> https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function
>>
>>
>>
>>
>>
>>
>


Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
-- Forwarded message -
From: Mamillapalli, Purna Pradeep <purnapradeep.mamillapa...@capitalone.com>
Date: Tue, Aug 29, 2017 at 8:08 PM
Subject: Spark question
To: purna pradeep <purna2prad...@gmail.com>

Below is the input Dataframe(In real this is a very large Dataframe)



EmployeeID

INCOME

INCOME AGE TS

JoinDate

Dept

101

19000

4/20/17

4/20/13

ES

101

1

10/3/15

4/20/13

OS

102

13000

5/9/17

4/20/12

DS

102

12000

5/8/17

4/20/12

CS

103

1

5/9/17

4/20/10

EQ

103

9000

5/8/15

4/20/10

MD

Get the latest income of an employee which has  Income_age ts <10 months

Expected output Dataframe

EmployeeID

INCOME

INCOME AGE TS

JoinDate

Dept

101

19000

4/20/17

4/20/13

ES

102

13000

5/9/17

4/20/12

DS

103

1

5/9/17

4/20/10

EQ


Below is what im planning to implement



case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
*JOINDATE*: Int,DEPT:String)



*val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
*"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,*"Date"*).
add(*"DEPT"*,*"String"*)



*//Reading from the File **import *sparkSession.implicits._

*val *readEmpFile = sparkSession.read
  .option(*"sep"*, *","*)
  .schema(empSchema)
  .csv(INPUT_DIRECTORY)


*//Create employee DataFrame **val *custDf = readEmpFile.as[employee]


*//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
EmployeeID*)


*val *k = groupByDf.mapGroups((key,value) => performETL(value))





*def *performETL(empData: Iterator[employee]) : new employee  = {

  *val *empList = empData.toList


*//calculate income has Logic to figureout latest income for an account and
returns latest income   val *income = calculateIncome(empList)


  *for *(i <- empList) {

  *val *row = i

*return new *employee(row.EmployeeID, row.INCOMEAGE , income)
  }
  *return  "Done"*



}



Is this a better approach or even the right approach to implement the
same.If not please suggest a better way to implement the same?



--

The information contained in this e-mail is confidential and/or proprietary
to Capital One and/or its affiliates and may only be used solely in
performance of work or services for Capital One. The information
transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended
recipient, you are hereby notified that any review, retransmission,
dissemination, distribution, copying or other use of, or taking of any
action in reliance upon this information is strictly prohibited. If you
have received this communication in error, please contact the sender and
delete the material from your computer.


use WithColumn with external function in a java jar

2017-08-28 Thread purna pradeep
I have data in a DataFrame with below columns

1)Fileformat is csv
2)All below column datatypes are String

employeeid,pexpense,cexpense

Now I need to create a new DataFrame which has new column called `expense`,
which is calculated based on columns `pexpense`, `cexpense`.

The tricky part is the calculation algorithm is not an **UDF** function
which I created, but it's an external function that needs to be imported
from a Java library which takes primitive types as arguments - in this case
`pexpense`, `cexpense` - to calculate the value required for new column.

The external function signature

public class MyJava

{

public Double calculateExpense(Double pexpense, Double cexpense) {
   // calculation
}

}

So how can I invoke that external function to create a new calculated
column. Can I register that external function as UDF in my Spark
application?

Stackoverflow reference

https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function


Re: Restart streaming query spark 2.1 structured streaming

2017-08-16 Thread purna pradeep
And also is query.stop() is graceful stop operation?what happens to already
received data will it be processed ?

On Tue, Aug 15, 2017 at 7:21 PM purna pradeep <purna2prad...@gmail.com>
wrote:

> Ok thanks
>
> Few more
>
> 1.when I looked into the documentation it says onQueryprogress is not
> threadsafe ,So Is this method would be the right place to refresh cache?and
> no need to restart query if I choose listener ?
>
> The methods are not thread-safe as they may be called from different
> threads.
>
>
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
>
>
>
> 2.if I use streamingquerylistner onqueryprogress my understanding is
> method will be executed only when the query is in progress so if I refresh
> data frame here without restarting  query will it impact application ?
>
> 3.should I use unpersist (Boolean) blocking method or async method
> unpersist() as the data size is big.
>
> I feel your solution is better as it stops query --> refresh cache -->
> starts query if I compromise on little downtime even cached dataframe is
> huge .I'm not sure how listener behaves as it's asynchronous, correct me if
> I'm wrong.
>
> On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
>> Both works. The asynchronous method with listener will have less of down
>> time, just that the first trigger/batch after the asynchronous
>> unpersist+persist will probably take longer as it has to reload the data.
>>
>>
>> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>> Thanks tathagata das actually I'm planning to something like this
>>>
>>> activeQuery.stop()
>>>
>>> //unpersist and persist cached data frame
>>>
>>> df.unpersist()
>>>
>>> //read the updated data //data size of df is around 100gb
>>>
>>> df.persist()
>>>
>>>  activeQuery = startQuery()
>>>
>>>
>>> the cached data frame size around 100gb ,so the question is this the
>>> right place to refresh this huge cached data frame ?
>>>
>>> I'm also trying to refresh cached data frame in onqueryprogress() method
>>> in a class which extends StreamingQuerylistner
>>>
>>> Would like to know which is the best place to refresh cached data frame
>>> and why
>>>
>>> Thanks again for the below response
>>>
>>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> You can do something like this.
>>>>
>>>>
>>>> def startQuery(): StreamingQuery = {
>>>>// create your streaming dataframes
>>>>// start the query with the same checkpoint directory}
>>>>
>>>> // handle to the active queryvar activeQuery: StreamingQuery = null
>>>> while(!stopped) {
>>>>
>>>>    if (activeQuery = null) { // if query not active, start query
>>>>  activeQuery = startQuery()
>>>>
>>>>} else if (shouldRestartQuery())  {  // check your condition and 
>>>> restart query
>>>>  activeQuery.stop()
>>>>  activeQuery = startQuery()
>>>>}
>>>>
>>>>activeQuery.awaitTermination(100)   // wait for 100 ms.
>>>>// if there is any error it will throw exception and quit the loop
>>>>// otherwise it will keep checking the condition every 100ms}
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com
>>>> > wrote:
>>>>
>>>>> Thanks Michael
>>>>>
>>>>> I guess my question is little confusing ..let me try again
>>>>>
>>>>>
>>>>> I would like to restart streaming query programmatically while my
>>>>> streaming application is running based on a condition and why I want to do
>>>>> this
>>>>>
>>>>> I want to refresh a cached data frame based on a condition and the
>>>>> best way to do this restart streaming query suggested by Tdas below for
>>>>> similar problem
>>>>>
>>>>>
>>>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>>>>>
>>&g

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Ok thanks

Few more

1.when I looked into the documentation it says onQueryprogress is not
threadsafe ,So Is this method would be the right place to refresh cache?and
no need to restart query if I choose listener ?

The methods are not thread-safe as they may be called from different
threads.


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala



2.if I use streamingquerylistner onqueryprogress my understanding is method
will be executed only when the query is in progress so if I refresh data
frame here without restarting  query will it impact application ?

3.should I use unpersist (Boolean) blocking method or async method
unpersist() as the data size is big.

I feel your solution is better as it stops query --> refresh cache -->
starts query if I compromise on little downtime even cached dataframe is
huge .I'm not sure how listener behaves as it's asynchronous, correct me if
I'm wrong.

On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Both works. The asynchronous method with listener will have less of down
> time, just that the first trigger/batch after the asynchronous
> unpersist+persist will probably take longer as it has to reload the data.
>
>
> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Thanks tathagata das actually I'm planning to something like this
>>
>> activeQuery.stop()
>>
>> //unpersist and persist cached data frame
>>
>> df.unpersist()
>>
>> //read the updated data //data size of df is around 100gb
>>
>> df.persist()
>>
>>  activeQuery = startQuery()
>>
>>
>> the cached data frame size around 100gb ,so the question is this the
>> right place to refresh this huge cached data frame ?
>>
>> I'm also trying to refresh cached data frame in onqueryprogress() method
>> in a class which extends StreamingQuerylistner
>>
>> Would like to know which is the best place to refresh cached data frame
>> and why
>>
>> Thanks again for the below response
>>
>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> You can do something like this.
>>>
>>>
>>> def startQuery(): StreamingQuery = {
>>>// create your streaming dataframes
>>>// start the query with the same checkpoint directory}
>>>
>>> // handle to the active queryvar activeQuery: StreamingQuery = null
>>> while(!stopped) {
>>>
>>>if (activeQuery = null) { // if query not active, start query
>>>  activeQuery = startQuery()
>>>
>>>} else if (shouldRestartQuery())  {  // check your condition and 
>>> restart query
>>>  activeQuery.stop()
>>>  activeQuery = startQuery()
>>>}
>>>
>>>activeQuery.awaitTermination(100)   // wait for 100 ms.
>>>// if there is any error it will throw exception and quit the loop
>>>// otherwise it will keep checking the condition every 100ms}
>>>
>>>
>>>
>>>
>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Michael
>>>>
>>>> I guess my question is little confusing ..let me try again
>>>>
>>>>
>>>> I would like to restart streaming query programmatically while my
>>>> streaming application is running based on a condition and why I want to do
>>>> this
>>>>
>>>> I want to refresh a cached data frame based on a condition and the best
>>>> way to do this restart streaming query suggested by Tdas below for similar
>>>> problem
>>>>
>>>>
>>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>>>>
>>>> I do understand that checkpoint if helps in recovery and failures but I
>>>> would like to know "how to restart streaming query programmatically without
>>>> stopping my streaming application"
>>>>
>>>> In place of query.awaittermination should I need to have an logic to
>>>> restart query? Please suggest
>>>>
>>>>
>>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> See
>>>>> https://spark.apache.org/docs/latest/structur

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Thanks tathagata das actually I'm planning to something like this

activeQuery.stop()

//unpersist and persist cached data frame

df.unpersist()

//read the updated data //data size of df is around 100gb

df.persist()

 activeQuery = startQuery()


the cached data frame size around 100gb ,so the question is this the right
place to refresh this huge cached data frame ?

I'm also trying to refresh cached data frame in onqueryprogress() method in
a class which extends StreamingQuerylistner

Would like to know which is the best place to refresh cached data frame and
why

Thanks again for the below response

On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> You can do something like this.
>
>
> def startQuery(): StreamingQuery = {
>// create your streaming dataframes
>// start the query with the same checkpoint directory}
>
> // handle to the active queryvar activeQuery: StreamingQuery = null
> while(!stopped) {
>
>if (activeQuery = null) { // if query not active, start query
>  activeQuery = startQuery()
>
>} else if (shouldRestartQuery())  {  // check your condition and 
> restart query
>  activeQuery.stop()
>  activeQuery = startQuery()
>}
>
>activeQuery.awaitTermination(100)   // wait for 100 ms.
>// if there is any error it will throw exception and quit the loop
>// otherwise it will keep checking the condition every 100ms}
>
>
>
>
> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Thanks Michael
>>
>> I guess my question is little confusing ..let me try again
>>
>>
>> I would like to restart streaming query programmatically while my
>> streaming application is running based on a condition and why I want to do
>> this
>>
>> I want to refresh a cached data frame based on a condition and the best
>> way to do this restart streaming query suggested by Tdas below for similar
>> problem
>>
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>>
>> I do understand that checkpoint if helps in recovery and failures but I
>> would like to know "how to restart streaming query programmatically without
>> stopping my streaming application"
>>
>> In place of query.awaittermination should I need to have an logic to
>> restart query? Please suggest
>>
>>
>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> See
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>>>
>>> Though I think that this currently doesn't work with the console sink.
>>>
>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>>
>>>>> I'm trying to restart a streaming query to refresh cached data frame
>>>>>
>>>>> Where and how should I restart streaming query
>>>>>
>>>>
>>>>
>>>> val sparkSes = SparkSession
>>>>
>>>>   .builder
>>>>
>>>>   .config("spark.master", "local")
>>>>
>>>>   .appName("StreamingCahcePoc")
>>>>
>>>>   .getOrCreate()
>>>>
>>>>
>>>>
>>>> import sparkSes.implicits._
>>>>
>>>>
>>>>
>>>> val dataDF = sparkSes.readStream
>>>>
>>>>   .schema(streamSchema)
>>>>
>>>>   .csv("testData")
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>val query = counts.writeStream
>>>>
>>>>   .outputMode("complete")
>>>>
>>>>   .format("console")
>>>>
>>>>   .start()
>>>>
>>>>
>>>> query.awaittermination()
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>>
>>>
>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Thanks Michael

I guess my question is little confusing ..let me try again


I would like to restart streaming query programmatically while my streaming
application is running based on a condition and why I want to do this

I want to refresh a cached data frame based on a condition and the best way
to do this restart streaming query suggested by Tdas below for similar
problem

http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e

I do understand that checkpoint if helps in recovery and failures but I
would like to know "how to restart streaming query programmatically without
stopping my streaming application"

In place of query.awaittermination should I need to have an logic to
restart query? Please suggest


On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com>
wrote:

> See
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
> Though I think that this currently doesn't work with the console sink.
>
> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Hi,
>>
>>>
>>> I'm trying to restart a streaming query to refresh cached data frame
>>>
>>> Where and how should I restart streaming query
>>>
>>
>>
>> val sparkSes = SparkSession
>>
>>   .builder
>>
>>   .config("spark.master", "local")
>>
>>   .appName("StreamingCahcePoc")
>>
>>   .getOrCreate()
>>
>>
>>
>> import sparkSes.implicits._
>>
>>
>>
>> val dataDF = sparkSes.readStream
>>
>>   .schema(streamSchema)
>>
>>   .csv("testData")
>>
>>
>>
>>
>>
>>val query = counts.writeStream
>>
>>   .outputMode("complete")
>>
>>   .format("console")
>>
>>   .start()
>>
>>
>> query.awaittermination()
>>
>>
>>
>>>
>>>
>>>
>


Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Hi,

>
> I'm trying to restart a streaming query to refresh cached data frame
>
> Where and how should I restart streaming query
>


val sparkSes = SparkSession

  .builder

  .config("spark.master", "local")

  .appName("StreamingCahcePoc")

  .getOrCreate()



import sparkSes.implicits._



val dataDF = sparkSes.readStream

  .schema(streamSchema)

  .csv("testData")





   val query = counts.writeStream

  .outputMode("complete")

  .format("console")

  .start()


query.awaittermination()



>
>
>


StreamingQueryListner spark structered Streaming

2017-08-09 Thread purna pradeep
Im working on structered streaming application wherein im reading from
Kafka as stream and for each batch of streams i need to perform S3 lookup
file (which is nearly 200gb) to fetch some attributes .So im using
df.persist() (basically caching the lookup) but i need to refresh the
dataframe as the S3 lookup data changes frequently.im using below code


class RefreshcachedDF(sparkSession: SparkSession) extends
StreamingQueryListener {

override def onQueryStarted(event:
org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent):
Unit = {}
override def onQueryTerminated(event:
org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent):
Unit = {}

override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
   val currTime = System.currentTimeMillis()
   if (currTime > (latestrefreshtime mentioned in a
globaltempview)) {
  //oldDF is a cached Dataframe created from GlobalTempView
which is of size 150GB.
  oldDF.unpersist() //I guess this is async call ,should i use
unpersist(true) which is blocking?and is it safe ?
  val inputDf: DataFrame = readFile(spec, sparkSession)
  val recreateddf = inputDf.persist()
  val count = recreateddf.count()
  }

  }
}
  }


Is the above approach is a better solution to refresh cached dataframe? and
the trigger for this refresh is will store the expirydate of cache for S3
in a globaltempview .

Note:S3 is one lookup source but i do have other sources which has data
size of 20 to 30 GB

 - So the question is this the right place to refresh the cached df ?
 - if yes should i use blocking or non-blocking unpersist method as the
data is huge 15GB?
 - For similar issue i see below response from Tdas with subject as Re:
Refreshing a persisted RDD

"Yes, you will have to recreate the streaming Dataframe along with the
static Dataframe, and restart the query. There isnt a currently
feasible to
do this without a query restart. But restarting a query WITHOUT
restarting
the whole application + spark cluster, is reasonably fast. If your
applicatoin can tolerate 10 second latencies, then stopping and
restarting
a query within the same Spark application is a reasonable solution."

[http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/browser]


  [1]: http://SparkMailingList

So if thats better solution should i restart query as below

query.processAllavaialble()
query.stop()
df.unpersist()
val inputDf: DataFrame = readFile(spec, sparkSession) //read file from S3
or anyother source
val recreateddf = inputDf.persist()
query.start()


when i looked into spark documentation of above methods
void processAllAvailable() ///documentation says This method is intended
for testing///
Blocks until all available data in the source has been processed and
committed to the sink. This method is intended for testing. Note that in
the case of continually arriving data, this method may block forever.
Additionally, this method is only guaranteed to block until data that has
been synchronously appended data to a Source prior to invocation. (i.e.
getOffset must immediately reflect the addition).

stop()
Stops the execution of this query if it is running. This method blocks
until the threads performing execution has stopped.

https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQuery.html#processAllAvailable()

Please suggest a better approach to refresh the cache.