Dynamic executor scaling spark/Kubernetes
Hello, Is Kubernetes Dynamic executor scaling for spark is available in latest release of spark I mean scaling the executors based on the work load vs preallocating number of executors for a spark job Thanks, Purna
Re: [ANNOUNCE] Announcing Apache Spark 2.4.0
Thanks this is a great news Can you please lemme if dynamic resource allocation is available in spark 2.4? I’m using spark 2.3.2 on Kubernetes, do I still need to provide executor memory options as part of spark submit command or spark will manage required executor memory based on the spark job size ? On Thu, Nov 8, 2018 at 2:18 PM Marcelo Vanzin wrote: > +user@ > > >> -- Forwarded message - > >> From: Wenchen Fan > >> Date: Thu, Nov 8, 2018 at 10:55 PM > >> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0 > >> To: Spark dev list > >> > >> > >> Hi all, > >> > >> Apache Spark 2.4.0 is the fifth release in the 2.x line. This release > adds Barrier Execution Mode for better integration with deep learning > frameworks, introduces 30+ built-in and higher-order functions to deal with > complex data type easier, improves the K8s integration, along with > experimental Scala 2.12 support. Other major updates include the built-in > Avro data source, Image data source, flexible streaming sinks, elimination > of the 2GB block size limitation during transfer, Pandas UDF improvements. > In addition, this release continues to focus on usability, stability, and > polish while resolving around 1100 tickets. > >> > >> We'd like to thank our contributors and users for their contributions > and early feedback to this release. This release would not have been > possible without you. > >> > >> To download Spark 2.4.0, head over to the download page: > http://spark.apache.org/downloads.html > >> > >> To view the release notes: > https://spark.apache.org/releases/spark-release-2-4-0.html > >> > >> Thanks, > >> Wenchen > >> > >> PS: If you see any issues with the release notes, webpage or published > artifacts, please contact me directly off-list. > > > > -- > Marcelo > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: [ANNOUNCE] Announcing Apache Spark 2.4.0
Thanks this is a great news Can you please lemme if dynamic resource allocation is available in spark 2.4? I’m using spark 2.3.2 on Kubernetes, do I still need to provide executor memory options as part of spark submit command or spark will manage required executor memory based on the spark job size ? On Thu, Nov 8, 2018 at 2:18 PM Marcelo Vanzin wrote: > +user@ > > >> -- Forwarded message - > >> From: Wenchen Fan > >> Date: Thu, Nov 8, 2018 at 10:55 PM > >> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0 > >> To: Spark dev list > >> > >> > >> Hi all, > >> > >> Apache Spark 2.4.0 is the fifth release in the 2.x line. This release > adds Barrier Execution Mode for better integration with deep learning > frameworks, introduces 30+ built-in and higher-order functions to deal with > complex data type easier, improves the K8s integration, along with > experimental Scala 2.12 support. Other major updates include the built-in > Avro data source, Image data source, flexible streaming sinks, elimination > of the 2GB block size limitation during transfer, Pandas UDF improvements. > In addition, this release continues to focus on usability, stability, and > polish while resolving around 1100 tickets. > >> > >> We'd like to thank our contributors and users for their contributions > and early feedback to this release. This release would not have been > possible without you. > >> > >> To download Spark 2.4.0, head over to the download page: > http://spark.apache.org/downloads.html > >> > >> To view the release notes: > https://spark.apache.org/releases/spark-release-2-4-0.html > >> > >> Thanks, > >> Wenchen > >> > >> PS: If you see any issues with the release notes, webpage or published > artifacts, please contact me directly off-list. > > > > -- > Marcelo > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Spark 2.3.1: k8s driver pods stuck in Initializing state
Hello , We're running spark 2.3.1 on kubernetes v1.11.0 and our driver pods from k8s are getting stuck in initializing state like so: NAME READY STATUS RESTARTS AGE my-pod-fd79926b819d3b34b05250e23347d0e7-driver 0/1 Init:0/1 0 18h And from *kubectl describe pod*: *Warning FailedMount 9m (x128 over 4h) * kubelet, 10.47.96.167 Unable to mount volumes for pod "my-pod-fd79926b819d3b34b05250e23347d0e7-driver_spark(1f3aba7b-c10f-11e8-bcec-1292fec79aba)": timeout expired waiting for volumes to attach or mount for pod "spark"/"my-pod-fd79926b819d3b34b05250e23347d0e7-driver". list of unmounted volumes=[spark-init-properties]. list of unattached volumes=[spark-init-properties download-jars-volume download-files-volume spark-token-tfpvp] *Warning FailedMount 4m (x153 over 4h) kubelet,* 10.47.96.167 MountVolume.SetUp failed for volume "spark-init-properties" : configmaps "my-pod-fd79926b819d3b34b05250e23347d0e7-init-config" not found >From what I can see in *kubectl get configmap* the init config map for the driver pod isn't there. Am I correct in assuming since the configmap isn't being created the driver pod will never start (hence stuck in init)? Where does the init config map come from? Why would it not be created? Please suggest Thanks, Purna
Spark 2.3.1: k8s driver pods stuck in Initializing state
We're running spark 2.3.1 on kubernetes v1.11.0 and our driver pods from k8s are getting stuck in initializing state like so: NAME READY STATUS RESTARTS AGE my-pod-fd79926b819d3b34b05250e23347d0e7-driver 0/1 Init:0/1 0 18h And from *kubectl describe pod*: *Warning FailedMount 9m (x128 over 4h) * kubelet, 10.47.96.167 Unable to mount volumes for pod "my-pod-fd79926b819d3b34b05250e23347d0e7-driver_spark(1f3aba7b-c10f-11e8-bcec-1292fec79aba)": timeout expired waiting for volumes to attach or mount for pod "spark"/"my-pod-fd79926b819d3b34b05250e23347d0e7-driver". list of unmounted volumes=[spark-init-properties]. list of unattached volumes=[spark-init-properties download-jars-volume download-files-volume spark-token-tfpvp] *Warning FailedMount 4m (x153 over 4h) kubelet,* 10.47.96.167 MountVolume.SetUp failed for volume "spark-init-properties" : configmaps "my-pod-fd79926b819d3b34b05250e23347d0e7-init-config" not found From what I can see in *kubectl get configmap* the init config map for the driver pod isn't there. Am I correct in assuming since the configmap isn't being created the driver pod will never start (hence stuck in init)? Where does the init config map come from? Why would it not be created? Thanks, Christopher Carney The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes
Resurfacing The question to get more attention Hello, > > im running Spark 2.3 job on kubernetes cluster >> >> kubectl version >> >> Client Version: version.Info{Major:"1", Minor:"9", >> GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", >> GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z", >> GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"} >> >> Server Version: version.Info{Major:"1", Minor:"8", >> GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", >> GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z", >> GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"} >> >> >> >> when i ran spark submit on k8s master the driverpod is stuck in Waiting: >> PodInitializing state. >> I had to manually kill the driver pod and submit new job in this case >> ,then it works.How this can be handled in production ? >> > This happens with executor pods as well > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128 > > >> >> This is happening if i submit the jobs almost parallel ie submit 5 jobs >> one after the other simultaneously. >> >> I'm running spark jobs on 20 nodes each having below configuration >> >> I tried kubectl describe node on the node where trhe driver pod is >> running this is what i got ,i do see there is overcommit on resources but i >> expected kubernetes scheduler not to schedule if resources in node are >> overcommitted or node is in Not Ready state ,in this case node is in Ready >> State but i observe same behaviour if node is in "Not Ready" state >> >> >> >> Name: ** >> >> Roles: worker >> >> Labels: beta.kubernetes.io/arch=amd64 >> >> beta.kubernetes.io/os=linux >> >> kubernetes.io/hostname= >> >> node-role.kubernetes.io/worker=true >> >> Annotations:node.alpha.kubernetes.io/ttl=0 >> >> >> volumes.kubernetes.io/controller-managed-attach-detach=true >> >> Taints: >> >> CreationTimestamp: Tue, 31 Jul 2018 09:59:24 -0400 >> >> Conditions: >> >> Type Status LastHeartbeatTime >> LastTransitionTimeReason Message >> >> -- - >> ---- --- >> >> OutOfDiskFalse Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 >> Jul 2018 09:59:24 -0400 KubeletHasSufficientDisk kubelet has >> sufficient disk space available >> >> MemoryPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 >> Jul 2018 09:59:24 -0400 KubeletHasSufficientMemory kubelet has >> sufficient memory available >> >> DiskPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 >> Jul 2018 09:59:24 -0400 KubeletHasNoDiskPressure kubelet has no disk >> pressure >> >> ReadyTrueTue, 14 Aug 2018 09:31:20 -0400 Sat, 11 >> Aug 2018 00:41:27 -0400 KubeletReady kubelet is posting >> ready status. AppArmor enabled >> >> Addresses: >> >> InternalIP: * >> >> Hostname:** >> >> Capacity: >> >> cpu: 16 >> >> memory: 125827288Ki >> >> pods:110 >> >> Allocatable: >> >> cpu: 16 >> >> memory: 125724888Ki >> >> pods:110 >> >> System Info: >> >> Machine ID: * >> >> System UUID:** >> >> Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f >> >> Kernel Version: 4.4.0-1062-aws >> >> OS Image: Ubuntu 16.04.4 LTS >> >> Operating System: linux >> >> Architecture: amd64 >> >> Container Runtime Version: docker://Unknown >> >> Kubelet Version:v1.8.3 >> >> Kube-Proxy Version: v1.8.3 >> >> PodCIDR: ** >> >> ExternalID: ** >> >> Non-terminated Pods: (11 in total) >> >> Namespace Name >>CPU Requests CPU Limits Memory Requests Memory >> Limits >> >> - >> -- --- >> - >> >> kube-systemcalico-node-gj5mb >> 250m (1%) 0 (0%) 0 (0%) 0 (0%) >> >> kube-system >> kube-proxy- 100m (0%) >> 0 (0%) 0 (0%) 0 (0%) >> >> kube-system >> prometheus-prometheus-node-exporter-9cntq 100m (0%) >> 200m (1%) 30Mi (0%)50Mi (0%) >> >> logging >> elasticsearch-elasticsearch-data-69df997486-gqcwg 400m (2%) >> 1 (6%) 8Gi (6%) 16Gi (13%) >> >> logging
Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes
Hello, im running Spark 2.3 job on kubernetes cluster > > kubectl version > > Client Version: version.Info{Major:"1", Minor:"9", > GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", > GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z", > GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"} > > Server Version: version.Info{Major:"1", Minor:"8", > GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", > GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z", > GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"} > > > > when i ran spark submit on k8s master the driver pod is stuck in Waiting: > PodInitializing state. > I had to manually kill the driver pod and submit new job in this case > ,then it works.How this can be handled in production ? > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128 > > This is happening if i submit the jobs almost parallel ie submit 5 jobs > one after the other simultaneously. > > I'm running spark jobs on 20 nodes each having below configuration > > I tried kubectl describe node on the node where trhe driver pod is running > this is what i got ,i do see there is overcommit on resources but i > expected kubernetes scheduler not to schedule if resources in node are > overcommitted or node is in Not Ready state ,in this case node is in Ready > State but i observe same behaviour if node is in "Not Ready" state > > > > Name: ** > > Roles: worker > > Labels: beta.kubernetes.io/arch=amd64 > > beta.kubernetes.io/os=linux > > kubernetes.io/hostname= > > node-role.kubernetes.io/worker=true > > Annotations:node.alpha.kubernetes.io/ttl=0 > > > volumes.kubernetes.io/controller-managed-attach-detach=true > > Taints: > > CreationTimestamp: Tue, 31 Jul 2018 09:59:24 -0400 > > Conditions: > > Type Status LastHeartbeatTime > LastTransitionTimeReason Message > > -- - > ---- --- > > OutOfDiskFalse Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 > Jul 2018 09:59:24 -0400 KubeletHasSufficientDisk kubelet has > sufficient disk space available > > MemoryPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 > Jul 2018 09:59:24 -0400 KubeletHasSufficientMemory kubelet has > sufficient memory available > > DiskPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 > Jul 2018 09:59:24 -0400 KubeletHasNoDiskPressure kubelet has no disk > pressure > > ReadyTrueTue, 14 Aug 2018 09:31:20 -0400 Sat, 11 > Aug 2018 00:41:27 -0400 KubeletReady kubelet is posting > ready status. AppArmor enabled > > Addresses: > > InternalIP: * > > Hostname:** > > Capacity: > > cpu: 16 > > memory: 125827288Ki > > pods:110 > > Allocatable: > > cpu: 16 > > memory: 125724888Ki > > pods:110 > > System Info: > > Machine ID: * > > System UUID:** > > Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f > > Kernel Version: 4.4.0-1062-aws > > OS Image: Ubuntu 16.04.4 LTS > > Operating System: linux > > Architecture: amd64 > > Container Runtime Version: docker://Unknown > > Kubelet Version:v1.8.3 > > Kube-Proxy Version: v1.8.3 > > PodCIDR: ** > > ExternalID: ** > > Non-terminated Pods: (11 in total) > > Namespace Name >CPU Requests CPU Limits Memory Requests Memory > Limits > > - > -- --- > - > > kube-systemcalico-node-gj5mb > 250m (1%) 0 (0%) 0 (0%) 0 (0%) > > kube-system > kube-proxy- 100m (0%) > 0 (0%) 0 (0%) 0 (0%) > > kube-systemprometheus-prometheus-node-exporter-9cntq > 100m (0%) 200m (1%) 30Mi (0%)50Mi (0%) > > logging > elasticsearch-elasticsearch-data-69df997486-gqcwg 400m (2%) > 1 (6%) 8Gi (6%) 16Gi (13%) > > loggingfluentd-fluentd-elasticsearch-tj7nd > 200m (1%) 0 (0%) 612Mi (0%) 0 (0%) > > rook rook-agent-6jtzm >0 (0%)0 (0%) 0 (0%) 0 (0%)
spark driver pod stuck in Waiting: PodInitializing state in Kubernetes
im running Spark 2.3 job on kubernetes cluster kubectl version Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z", GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"} Server Version: version.Info{Major:"1", Minor:"8", GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z", GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"} when i ran spark submit on k8s master the driver pod is stuck in Waiting: PodInitializing state. I had to manually kill the driver pod and submit new job in this case ,then it works. This is happening if i submit the jobs almost parallel ie submit 5 jobs one after the other simultaneously. I'm running spark jobs on 20 nodes each having below configuration I tried kubectl describe node on the node where trhe driver pod is running this is what i got ,i do see there is overcommit on resources but i expected kubernetes scheduler not to schedule if resources in node are overcommitted or node is in Not Ready state ,in this case node is in Ready State but i observe same behaviour if node is in "Not Ready" state Name: ** Roles: worker Labels: beta.kubernetes.io/arch=amd64 beta.kubernetes.io/os=linux kubernetes.io/hostname= node-role.kubernetes.io/worker=true Annotations:node.alpha.kubernetes.io/ttl=0 volumes.kubernetes.io/controller-managed-attach-detach=true Taints: CreationTimestamp: Tue, 31 Jul 2018 09:59:24 -0400 Conditions: Type Status LastHeartbeatTime LastTransitionTimeReason Message -- - ---- --- OutOfDiskFalse Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 Jul 2018 09:59:24 -0400 KubeletHasSufficientDisk kubelet has sufficient disk space available MemoryPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 Jul 2018 09:59:24 -0400 KubeletHasSufficientMemory kubelet has sufficient memory available DiskPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 Jul 2018 09:59:24 -0400 KubeletHasNoDiskPressure kubelet has no disk pressure ReadyTrueTue, 14 Aug 2018 09:31:20 -0400 Sat, 11 Aug 2018 00:41:27 -0400 KubeletReady kubelet is posting ready status. AppArmor enabled Addresses: InternalIP: * Hostname:** Capacity: cpu: 16 memory: 125827288Ki pods:110 Allocatable: cpu: 16 memory: 125724888Ki pods:110 System Info: Machine ID: * System UUID:** Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f Kernel Version: 4.4.0-1062-aws OS Image: Ubuntu 16.04.4 LTS Operating System: linux Architecture: amd64 Container Runtime Version: docker://Unknown Kubelet Version:v1.8.3 Kube-Proxy Version: v1.8.3 PodCIDR: ** ExternalID: ** Non-terminated Pods: (11 in total) Namespace Name CPU Requests CPU Limits Memory Requests Memory Limits - -- --- - kube-systemcalico-node-gj5mb 250m (1%) 0 (0%) 0 (0%) 0 (0%) kube-system kube-proxy- 100m (0%) 0 (0%) 0 (0%) 0 (0%) kube-systemprometheus-prometheus-node-exporter-9cntq 100m (0%) 200m (1%) 30Mi (0%)50Mi (0%) logging elasticsearch-elasticsearch-data-69df997486-gqcwg 400m (2%) 1 (6%) 8Gi (6%) 16Gi (13%) loggingfluentd-fluentd-elasticsearch-tj7nd 200m (1%) 0 (0%) 612Mi (0%) 0 (0%) rook rook-agent-6jtzm 0 (0%)0 (0%) 0 (0%) 0 (0%) rook rook-ceph-osd-10-6-42-250.accel.aws-cardda.cb4good.com-gwb8j0 (0%) 0 (0%) 0 (0%) 0 (0%) spark accelerate-test-5-a3bfb8a597e83d459193a183e17f13b5-exec-1 2 (12%) 0 (0%) 10Gi (8%)12Gi (10%) spark accelerate-testing-1-8ed0482f3bfb3c0a83da30bb7d433dff-exec-52 (12%) 0 (0%) 10Gi (8%)12Gi
Re: Executor lost for unknown reasons error Spark 2.3 on kubernetes
$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) 2018-07-30 19:58:42 INFO BlockManagerMasterEndpoint:54 - Trying to remove executor 7 from BlockManagerMaster. 2018-07-30 19:58:42 WARN BlockManagerMasterEndpoint:66 - No more replicas available for rdd_9_37 ! MasterEndpoint:54 - Removing block manager BlockManagerId(7, 10.*.*.*.*, 43888, None) 2018-07-30 19:58:42 INFO BlockManagerMaster:54 - Removed 7 successfully in removeExecutor 2018-07-30 19:58:42 INFO DAGScheduler:54 - Shuffle files lost for executor: 7 (epoch 1) 2018-07-30 19:58:42 ERROR ContextCleaner:91 - Error cleaning broadcast 11 org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:155) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:321) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:238) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$1.apply(ContextCleaner.scala:194) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$1.apply(ContextCleaner.scala:185) at scala.Option.foreach(Option.scala:257) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:185) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319) at org.apache.spark.ContextCleaner.org <http://org.apache.spark.contextcleaner.org/> $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java: On Tue, Jul 31, 2018 at 8:32 AM purna pradeep wrote: > > Hello, >> >> >> >> I’m getting below error in spark driver pod logs and executor pods are >> getting killed midway through while the job is running and even driver pod >> Terminated with below intermittent error ,this happens if I run multiple >> jobs in parallel. >> >> >> >> Not able to see executor logs as executor pods are killed >> >> >> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in >> stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure >> (executor 1 exited caused by one of the running tasks) Reason: Executor >> lost for unknown reasons. >> >> Driver stacktrace: >> >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) >> >> at >> org.apache.spark.scheduler
Executor lost for unknown reasons error Spark 2.3 on kubernetes
> Hello, > > > > I’m getting below error in spark driver pod logs and executor pods are > getting killed midway through while the job is running and even driver pod > Terminated with below intermittent error ,this happens if I run multiple > jobs in parallel. > > > > Not able to see executor logs as executor pods are killed > > > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 > in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage > 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 > exited caused by one of the running tasks) Reason: Executor lost for > unknown reasons. > > Driver stacktrace: > > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > > at scala.Option.foreach(Option.scala:257) > > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) > > ... 42 mor >
Executor lost for unknown reasons error Spark 2.3 on kubernetes
Hello, I’m getting below error in spark driver pod logs and executor pods are getting killed midway through while the job is running and even driver pod Terminated with below intermittent error ,this happens if I run multiple jobs in parallel. Not able to see executor logs as executor pods are killed org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor lost for unknown reasons. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 42 mor
Executor lost for unknown reasons error Spark 2.3 on kubernetes
Hello, I’m getting below error in spark driver pod logs and executor pods are getting killed midway through while the job is running and even driver pod Terminated with below intermittent error ,this happens if I run multiple jobs in parallel. Not able to see executor logs as executor pods are killed org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor lost for unknown reasons. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 42 more Thanks, Purna The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Spark 2.3 Kubernetes error
> Hello, > > > > When I’m trying to set below options to spark-submit command on k8s Master > getting below error in spark-driver pod logs > > > > --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost > -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ > > --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost > -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ > > > > But when I tried to set these extraJavaoptions as system.properties in the > spark application jar everything works fine. > > > > 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing > SparkContext. > > org.apache.spark.SparkException: External scheduler cannot be instantiated > > at > org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747) > > at > org.apache.spark.SparkContext.init(SparkContext.scala:492) > > at > org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486) > > at > org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930) > > at > org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921) > > at scala.Option.getOrElse(Option.scala:121) > > at > org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921) > > Caused by: io.fabric8.kubernetes.client.KubernetesClientException: > Operation: [get] for kind: [Pod] with name: > [test-657e2f715ada3f91ae32c588aa178f63-driver] in namespace: [test] > failed. > > at > io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62) > > at > io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71) > > at > io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228) > > at > io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184) > > at > org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70) > > at > org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120) > > at > org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741) > > ... 12 more > > Caused by: javax.net.ssl.SSLHandshakeException: > sun.security.validator.ValidatorException: PKIX path building failed: > sun.security.provider.certpath.SunCertPathBuilderException: unable to find > valid certification path to requested target > > at > sun.security.ssl.Alerts.getSSLException(Alerts.java:192) > > at > sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959) > > at > sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302) > > at > sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) > > at > sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514) > > at > sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) > > at > sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) > > at > sun.security.ssl.Handshaker.process_record(Handshaker.java:961) > > at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072) > > at > sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) > > at > sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) > > at > sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) > > at > okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281) > > at > okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251) > > at > okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151) > > at > okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195) > > at > okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) > > at > okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) > > at > okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) > > at > okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) > > at > okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) > > at > okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) > > at >
Spark 2.3 Kubernetes error
Hello, When I’m trying to set below options to spark-submit command on k8s Master getting below error in spark-driver pod logs --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ But when I tried to set these extraJavaoptions as system.properties in the spark application jar everything works fine. 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext. org.apache.spark.SparkException: External scheduler cannot be instantiated at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747) at org.apache.spark.SparkContext.init(SparkContext.scala:492) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921) Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [test-657e2f715ada3f91ae32c588aa178f63-driver] in namespace: [test] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62) at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120) at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741) ... 12 more Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.ssl.Alerts.getSSLException(Alerts.java:192) at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514) at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) at sun.security.ssl.Handshaker.process_record(Handshaker.java:961) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072) at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) at okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281) at okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251) at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151) at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195) at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at
Spark 2.3 Kubernetes error
Hello, When I’m trying to set below options to spark-submit command on k8s Master getting below error in spark-driver pod logs --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ But when I tried to set these extraJavaoptions as system.properties in the spark application jar everything works fine. 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext. org.apache.spark.SparkException: External scheduler cannot be instantiated at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747) at org.apache.spark.SparkContext.init(SparkContext.scala:492) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921) Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [test-657e2f715ada3f91ae32c588aa178f63-driver] in namespace: [test] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62) at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120) at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741) ... 12 more Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.ssl.Alerts.getSSLException(Alerts.java:192) at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514) at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) at sun.security.ssl.Handshaker.process_record(Handshaker.java:961) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072) at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) at okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281) at okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251) at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151) at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195) at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at
Spark 2.3 driver pod stuck in Running state — Kubernetes
Hello, When I run spark-submit on k8s cluster I’m Seeing driver pod stuck in Running state and when I pulled driver pod logs I’m able to see below log I do understand that this warning might be because of lack of cpu/ Memory , but I expect driver pod be in “Pending” state rather than “ Running” state though actually it’s not Running So I had kill the driver pod and resubmit the job Please suggest here ! 2018-06-08 14:38:01 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:38:16 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:38:31 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:38:46 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:39:01 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
spark partitionBy with partitioned column in json output
im reading below json in spark {"bucket": "B01", "actionType": "A1", "preaction": "NULL", "postaction": "NULL"} {"bucket": "B02", "actionType": "A2", "preaction": "NULL", "postaction": "NULL"} {"bucket": "B03", "actionType": "A3", "preaction": "NULL", "postaction": "NULL"} val df=spark.read.json("actions.json").toDF() Now im writing the same to a json output as below df.write. format("json"). mode("append"). partitionBy("bucket","actionType"). save("output.json") and the output.json is as below {"preaction":"NULL","postaction":"NULL"} bucket,actionType columns are missing in the json output, i need partitionby columns as well in the output
Re: Spark 2.3 error on Kubernetes
Abirudh, Thanks for your response I’m running k8s cluster on AWS and kub-dns pods are running fine and also as I mentioned only 1 executor pod is running though I requested for 5 and rest 4 were killed with below error and I do have enough resources available. On Tue, May 29, 2018 at 6:28 PM Anirudh Ramanathan wrote: > This looks to me like a kube-dns error that's causing the driver DNS > address to not resolve. > It would be worth double checking that kube-dns is indeed running (in the > kube-system namespace). > Often, with environments like minikube, kube-dns may exit/crashloop due to > lack of resource. > > On Tue, May 29, 2018 at 3:18 PM, purna pradeep > wrote: > >> Hello, >> >> I’m getting below error when I spark-submit a Spark 2.3 app on >> Kubernetes *v1.8.3* , some of the executor pods were killed with below >> error as soon as they come up >> >> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException >> >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) >> >> at >> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) >> >> Caused by: org.apache.spark.SparkException: Exception thrown in >> awaitResult: >> >> at >> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) >> >> at >> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) >> >> at >> org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) >> >> at >> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) >> >> at >> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) >> >> at java.security.AccessController.doPrivileged(Native >> Method) >> >> at javax.security.auth.Subject.doAs(Subject.java:422) >> >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) >> >> ... 4 more >> >> Caused by: java.io.IOException: Failed to connect to >> spark-1527629824987-driver-svc.spark.svc:7078 >> >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) >> >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) >> >> at >> org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) >> >> at >> org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) >> >> at >> org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) >> >> at >> java.util.concurrent.FutureTask.run(FutureTask.java:266) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: java.net.UnknownHostException: >> spark-1527629824987-driver-svc.spark.svc >> >> at >> java.net.InetAddress.getAllByName0(InetAddress.java:1280) >> >> at >> java.net.InetAddress.getAllByName(InetAddress.java:1192) >> >> at >> java.net.InetAddress.getAllByName(InetAddress.java:1126) >> >> at java.net.InetAddress.getByName(InetAddress.java:1076) >> >> at >> io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) >> >> at >> io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) >> >> at java.security.AccessController.doPri
Spark 2.3 error on Kubernetes
Hello, I’m getting below error when I spark-submit a Spark 2.3 app on Kubernetes *v1.8.3* , some of the executor pods were killed with below error as soon as they come up Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1527629824987-driver-svc.spark.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1527629824987-driver-svc.spark.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at
Spark 2.3 error on kubernetes
Hello, I’m getting below intermittent error when I spark-submit a Spark 2.3 app on Kubernetes v1.8.3 , some of the executor pods were killed with below error as soon as they come up Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1527629824987-driver-svc.spark.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1527629824987-driver-svc.spark.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at
Spark 2.3 error on kubernetes
Hello, I’m getting below intermittent error when I spark-submit a Spark 2.3 app on Kubernetes v1.8.3 , some of the executor pods were killed with below error as soon as they come up Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1527629824987-driver-svc.spark.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1527629824987-driver-svc.spark.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at
Spark driver pod garbage collection
Hello, Currently I observe dead pods are not getting garbage collected (aka spark driver pods which have completed execution). So pods could sit in the namespace for weeks potentially. This makes listing, parsing, and reading pods slower and well as having junk sit on the cluster. I believe minimum-container-ttl-duration kubelet flag is by default set to 0 minute but I don’t see the completed spark driver pods are garbage collected Do I need to set any flag explicitly @ kubelet level?
Spark driver pod eviction Kubernetes
Hi, What would be the recommended approach to wait for spark driver pod to complete the currently running job before it gets evicted to new nodes while maintenance on the current node is goingon (kernel upgrade,hardware maintenance etc..) using drain command I don’t think I can use PoDisruptionBudget as Spark pods deployment yaml(s) is taken by Kubernetes Please suggest !
Re: S3keysonsor
+ Joe On Mon, May 21, 2018 at 2:56 PM purna pradeep <purna2prad...@gmail.com> wrote: > I do know only to some extent , I mean If you see my sample s3 locations > > s3a://mybucket/20180425_111447_data1/_SUCCESS > > s3a://mybucket/20180424_111241_data1/_SUCCESS > > > > The only values which are static in above location are > > s3a://mybucket/ > > data1/_SUCCESS > > Now I want to configure tolerance for _SUCCESS file as latest or 1 day > older based on this configuration it should pick the right time stamp > folder which has _SUCCESS file > > On Mon, May 21, 2018 at 2:35 PM Joe Napolitano <joe.napolit...@wework.com> > wrote: > >> Purna, with regards to "this path is not completely static," can you >> clarify what you mean? >> >> Do you mean that you don't know the actual key name beforehand? E.g. >> pertaining to "111447", "111241", and "111035" in your example? >> >> On Mon, May 21, 2018 at 2:23 PM, Brian Greene < >> br...@heisenbergwoodworking.com> wrote: >> >> > I suggest it’ll work for your needs. >> > >> > Sent from a device with less than stellar autocorrect >> > >> > > On May 21, 2018, at 10:16 AM, purna pradeep <purna2prad...@gmail.com> >> > wrote: >> > > >> > > Hi , >> > > >> > > I’m trying to evaluate airflow to see if it suits my needs. >> > > >> > > Basically i can have below steps in a DAG >> > > >> > > >> > > >> > > 1)Look for a file arrival on given s3 location (this path is not >> > completely >> > > static) (i can use S3Keysensor in this step) >> > > >> > > i should be able to specify to look either for latest folder or >> 24hrs or >> > > n number of days older folder which has _SUCCESS file as mentioned >> below >> > > >> > > sample file location(s): >> > > >> > > s3a://mybucket/20180425_111447_data1/_SUCCESS >> > > >> > > > > > s3a://mybucket/20180424_111241_data1/_SUCCESS >> > > >> > > s3a://mybucket/20180424_111035_data1/_SUCCESS >> > > >> > > >> > > >> > > 2)invoke a simple restapi using HttpSimpleOperator once the above >> > > dependency is met ,i can set upstream for step2 as step1 >> > > >> > > >> > > >> > > Does S3keysensor supports step1 out of the box? >> > > >> > > Also in some cases i may to have a DAG without start date & end date >> it >> > > just needs to be triggered once file is available in a given s3 >> location >> > > >> > > >> > > >> > > *Please suggest !* >> > >> >
Re: S3keysonsor
I do know only to some extent , I mean If you see my sample s3 locations s3a://mybucket/20180425_111447_data1/_SUCCESS s3a://mybucket/20180424_111241_data1/_SUCCESS The only values which are static in above location are s3a://mybucket/ data1/_SUCCESS Now I want to configure tolerance for _SUCCESS file as latest or 1 day older based on this configuration it should pick the right time stamp folder which has _SUCCESS file On Mon, May 21, 2018 at 2:35 PM Joe Napolitano <joe.napolit...@wework.com> wrote: > Purna, with regards to "this path is not completely static," can you > clarify what you mean? > > Do you mean that you don't know the actual key name beforehand? E.g. > pertaining to "111447", "111241", and "111035" in your example? > > On Mon, May 21, 2018 at 2:23 PM, Brian Greene < > br...@heisenbergwoodworking.com> wrote: > > > I suggest it’ll work for your needs. > > > > Sent from a device with less than stellar autocorrect > > > > > On May 21, 2018, at 10:16 AM, purna pradeep <purna2prad...@gmail.com> > > wrote: > > > > > > Hi , > > > > > > I’m trying to evaluate airflow to see if it suits my needs. > > > > > > Basically i can have below steps in a DAG > > > > > > > > > > > > 1)Look for a file arrival on given s3 location (this path is not > > completely > > > static) (i can use S3Keysensor in this step) > > > > > > i should be able to specify to look either for latest folder or 24hrs > or > > > n number of days older folder which has _SUCCESS file as mentioned > below > > > > > > sample file location(s): > > > > > > s3a://mybucket/20180425_111447_data1/_SUCCESS > > > > > > s3a://mybucket/20180424_111241_data1/_SUCCESS > > > > > > s3a://mybucket/20180424_111035_data1/_SUCCESS > > > > > > > > > > > > 2)invoke a simple restapi using HttpSimpleOperator once the above > > > dependency is met ,i can set upstream for step2 as step1 > > > > > > > > > > > > Does S3keysensor supports step1 out of the box? > > > > > > Also in some cases i may to have a DAG without start date & end date it > > > just needs to be triggered once file is available in a given s3 > location > > > > > > > > > > > > *Please suggest !* > > >
Re: Oozie for spark jobs without Hadoop
Here you go ! - Add oozie.service.HadoopAccessorService.supported.filesystems as * in oozie-site.xml - include hadoop-aws-2.8.3.jar - Rebuild oozie with Dhttpclient.version=4.5.5 -Dhttpcore.version=4.4.9 - Set jetty_opts with proxy values On Sat, May 19, 2018 at 2:17 AM Peter Cseh <gezap...@cloudera.com> wrote: > Wow, great work! > Can you please summarize the required steps? This would be useful for > others so we probably should add it to our documentation. > Thanks in advance! > Peter > > On Fri, May 18, 2018 at 11:33 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> I got this fixed by setting jetty_opts with proxy values. >> >> Thanks Peter!! >> >> On Thu, May 17, 2018 at 4:05 PM purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> Ok I fixed this by adding aws keys in oozie >>> >>> But I’m getting below error >>> >>> I have tried setting proxy in core-site.xml but no luck >>> >>> >>> 2018-05-17 15:39:20,602 ERROR CoordInputLogicEvaluatorPhaseOne:517 - >>> SERVER[localhost] USER[-] GROUP[-] TOKEN[-] APP[-] >>> JOB[000-180517144113498-oozie-xjt0-C] ACTION[000- >>> 180517144113498-oozie-xjt0-C@2] >>> org.apache.oozie.service.HadoopAccessorException: >>> E0902: Exception occurred: [doesBucketExist on cmsegmentation-qa: >>> com.amazonaws.SdkClientException: Unable to execute HTTP request: >>> Connect to mybucket.s3.amazonaws.com:443 >>> <http://cmsegmentation-qa.s3.amazonaws.com:443/> [mybucket. >>> s3.amazonaws.com/52.216.165.155 >>> <http://cmsegmentation-qa.s3.amazonaws.com/52.216.165.155>] failed: >>> connect timed out] >>> >>> org.apache.oozie.service.HadoopAccessorException: E0902: Exception >>> occurred: [doesBucketExist on cmsegmentation-qa: >>> com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect >>> to mybucket.s3.amazonaws.com:443 >>> <http://cmsegmentation-qa.s3.amazonaws.com:443/> [mybucket >>> .s3.amazonaws.com >>> <http://cmsegmentation-qa.s3.amazonaws.com/52.216.165.155> failed: >>> connect timed out] >>> >>> at >>> org.apache.oozie.service.HadoopAccessorService.createFileSystem(HadoopAccessorService.java:630) >>> >>> at >>> org.apache.oozie.service.HadoopAccessorService.createFileSystem(HadoopAccessorService.java:594) >>> at org.apache.oozie.dependency. >>> FSURIHandler.getFileSystem(FSURIHandler.java:184)-env.sh >>> >>> But now I’m getting this error >>> >>> >>> >>> On Thu, May 17, 2018 at 2:53 PM purna pradeep <purna2prad...@gmail.com> >>> wrote: >>> >>>> Ok I got passed this error >>>> >>>> By rebuilding oozie with Dhttpclient.version=4.5.5 >>>> -Dhttpcore.version=4.4.9 >>>> >>>> now getting this error >>>> >>>> >>>> >>>> ACTION[000-180517144113498-oozie-xjt0-C@1] >>>> org.apache.oozie.service.HadoopAccessorException: E0902: Exception >>>> occurred: [doesBucketExist on mybucketcom.amazonaws.AmazonClientException: >>>> No AWS Credentials provided by BasicAWSCredentialsProvider >>>> EnvironmentVariableCredentialsProvider >>>> SharedInstanceProfileCredentialsProvider : >>>> com.amazonaws.SdkClientException: Unable to load credentials from service >>>> endpoint] >>>> >>>> org.apache.oozie.service.HadoopAccessorException: E0902: Exception >>>> occurred: [doesBucketExist on cmsegmentation-qa: >>>> com.amazonaws.AmazonClientException: No AWS Credentials provided by >>>> BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider >>>> SharedInstanceProfileCredentialsProvider : >>>> com.amazonaws.SdkClientException: Unable to load credentials from service >>>> endpoint] >>>> >>>> On Thu, May 17, 2018 at 12:24 PM purna pradeep <purna2prad...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> Peter, >>>>> >>>>> Also When I submit a job with new http client jar, I get >>>>> >>>>> ```Error: IO_ERROR : java.io.IOException: Error while connecting Oozie >>>>> server. No of retries = 1. Exception = Could not authenticate, >>>>> Authentication failed, status: 500, message: Server Error``` >>>>> >>>&g
Event trigger Oozie datasets
Hello , Event trigger Oozie datasets 1) Does oozie supports event trigger? Trigger Workflow based on a file arrival on AWS s3 As per my understanding based on start date mentioned on coordinator it can poll for a file on s3 and once dependency is met it can execute an action/SparkAction but my requirement is trigger workflow based on a file arrival and compare currentdate with starttime(if startime is configured else execute action based on event) and execute action/SparkAction if its time to execute the same. 2)Also i see on datasets we need to specify initial-instance and dataset location is derived from initial-instance value for ex: s3a://app/logs/${YEAR}_${MONTH}_${DAY}_${HOUR} ${coord:latest(0)} Then, the dataset instances for the input events for the coordinator action will be: s3a://app/logs/2009_01_10 But my requirement is im not sure of the dataset generation timestamp and also im not sure of frequency of the dataset generation My requirement is dataset location could be s3a://app/logs/2018_02_10 (ie it may be generated everyday) and when i run my job on 2018/02/11 i should be able to specify to consider either latest or 24hrs or n number of days old (from the day I run workflow ) datset as dependency for the action/SparkAction which im trying to execute. Please suggest !
Re: Oozie for spark jobs without Hadoop
Ok I fixed this by adding aws keys in oozie But I’m getting below error I have tried setting proxy in core-site.xml but no luck 2018-05-17 15:39:20,602 ERROR CoordInputLogicEvaluatorPhaseOne:517 - SERVER[localhost] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[000-180517144113498-oozie-xjt0-C] ACTION[000- 180517144113498-oozie-xjt0-C@2] org.apache.oozie.service.HadoopAccessorException: E0902: Exception occurred: [doesBucketExist on cmsegmentation-qa: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to mybucket.s3.amazonaws.com:443 <http://cmsegmentation-qa.s3.amazonaws.com:443/> [mybucket. s3.amazonaws.com/52.216.165.155 <http://cmsegmentation-qa.s3.amazonaws.com/52.216.165.155>] failed: connect timed out] org.apache.oozie.service.HadoopAccessorException: E0902: Exception occurred: [doesBucketExist on cmsegmentation-qa: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to mybucket.s3.amazonaws.com:443 <http://cmsegmentation-qa.s3.amazonaws.com:443/> [mybucket.s3.amazonaws.com <http://cmsegmentation-qa.s3.amazonaws.com/52.216.165.155> failed: connect timed out] at org.apache.oozie.service.HadoopAccessorService.createFileSystem(HadoopAccessorService.java:630) at org.apache.oozie.service.HadoopAccessorService.createFileSystem(HadoopAccessorService.java:594) at org.apache.oozie.dependency.FSURIHandler.getFileSystem( FSURIHandler.java:184)-env.sh But now I’m getting this error On Thu, May 17, 2018 at 2:53 PM purna pradeep <purna2prad...@gmail.com> wrote: > Ok I got passed this error > > By rebuilding oozie with Dhttpclient.version=4.5.5 -Dhttpcore.version=4.4.9 > > now getting this error > > > > ACTION[000-180517144113498-oozie-xjt0-C@1] > org.apache.oozie.service.HadoopAccessorException: E0902: Exception > occurred: [doesBucketExist on mybucketcom.amazonaws.AmazonClientException: > No AWS Credentials provided by BasicAWSCredentialsProvider > EnvironmentVariableCredentialsProvider > SharedInstanceProfileCredentialsProvider : > com.amazonaws.SdkClientException: Unable to load credentials from service > endpoint] > > org.apache.oozie.service.HadoopAccessorException: E0902: Exception > occurred: [doesBucketExist on cmsegmentation-qa: > com.amazonaws.AmazonClientException: No AWS Credentials provided by > BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider > SharedInstanceProfileCredentialsProvider : > com.amazonaws.SdkClientException: Unable to load credentials from service > endpoint] > > On Thu, May 17, 2018 at 12:24 PM purna pradeep <purna2prad...@gmail.com> > wrote: > >> >> Peter, >> >> Also When I submit a job with new http client jar, I get >> >> ```Error: IO_ERROR : java.io.IOException: Error while connecting Oozie >> server. No of retries = 1. Exception = Could not authenticate, >> Authentication failed, status: 500, message: Server Error``` >> >> >> On Thu, May 17, 2018 at 12:14 PM purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> Ok I have tried this >>> >>> It appears that s3a support requires httpclient 4.4.x and oozie is >>> bundled with httpclient 4.3.6. When httpclient is upgraded, the ext UI >>> stops loading. >>> >>> >>> >>> On Thu, May 17, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> >>> wrote: >>> >>>> Purna, >>>> >>>> Based on >>>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3 >>>> you should try to go for s3a. >>>> You'll have to include the aws-jdk as well if I see it correctly: >>>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A >>>> Also, the property names are slightly different so you'll have to >>>> change the example I've given. >>>> >>>> >>>> >>>> On Thu, May 17, 2018 at 4:16 PM, purna pradeep <purna2prad...@gmail.com >>>> > wrote: >>>> >>>>> Peter, >>>>> >>>>> I’m using latest oozie 5.0.0 and I have tried below changes but no >>>>> luck >>>>> >>>>> Is this for s3 or s3a ? >>>>> >>>>> I’m using s3 but if this is for s3a do you know which jar I need to >>>>> include I mean Hadoop-aws jar or any other jar if required >>>>> >>>>> Hadoop-aws-2.8.3.jar is what I’m using >>>>> >>>>> On Wed, May 16, 2018 at 5:19 PM Peter Cseh <gezap...@cloudera.com> >>>>>
Re: Oozie for spark jobs without Hadoop
Ok I got passed this error By rebuilding oozie with Dhttpclient.version=4.5.5 -Dhttpcore.version=4.4.9 now getting this error ACTION[000-180517144113498-oozie-xjt0-C@1] org.apache.oozie.service.HadoopAccessorException: E0902: Exception occurred: [doesBucketExist on mybucketcom.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider SharedInstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint] org.apache.oozie.service.HadoopAccessorException: E0902: Exception occurred: [doesBucketExist on cmsegmentation-qa: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider SharedInstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint] On Thu, May 17, 2018 at 12:24 PM purna pradeep <purna2prad...@gmail.com> wrote: > > Peter, > > Also When I submit a job with new http client jar, I get > > ```Error: IO_ERROR : java.io.IOException: Error while connecting Oozie > server. No of retries = 1. Exception = Could not authenticate, > Authentication failed, status: 500, message: Server Error``` > > > On Thu, May 17, 2018 at 12:14 PM purna pradeep <purna2prad...@gmail.com> > wrote: > >> Ok I have tried this >> >> It appears that s3a support requires httpclient 4.4.x and oozie is >> bundled with httpclient 4.3.6. When httpclient is upgraded, the ext UI >> stops loading. >> >> >> >> On Thu, May 17, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> >> wrote: >> >>> Purna, >>> >>> Based on >>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3 >>> you should try to go for s3a. >>> You'll have to include the aws-jdk as well if I see it correctly: >>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A >>> Also, the property names are slightly different so you'll have to change >>> the example I've given. >>> >>> >>> >>> On Thu, May 17, 2018 at 4:16 PM, purna pradeep <purna2prad...@gmail.com> >>> wrote: >>> >>>> Peter, >>>> >>>> I’m using latest oozie 5.0.0 and I have tried below changes but no luck >>>> >>>> Is this for s3 or s3a ? >>>> >>>> I’m using s3 but if this is for s3a do you know which jar I need to >>>> include I mean Hadoop-aws jar or any other jar if required >>>> >>>> Hadoop-aws-2.8.3.jar is what I’m using >>>> >>>> On Wed, May 16, 2018 at 5:19 PM Peter Cseh <gezap...@cloudera.com> >>>> wrote: >>>> >>>>> Ok, I've found it: >>>>> >>>>> If you are using 4.3.0 or newer this is the part which checks for >>>>> dependencies: >>>>> >>>>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java#L914-L926 >>>>> It passes the coordinator action's configuration and even does >>>>> impersonation to check for the dependencies: >>>>> >>>>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java#L159 >>>>> >>>>> Have you tried the following in the coordinator xml: >>>>> >>>>> >>>>> >>>>> hdfs://bar:9000/usr/joe/logsprocessor-wf >>>>> >>>>> >>>>> fs.s3.awsAccessKeyId >>>>> [YOURKEYID] >>>>> >>>>> >>>>> fs.s3.awsSecretAccessKey >>>>> [YOURKEY] >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Based on the source this should be able to poll s3 periodically. >>>>> >>>>> On Wed, May 16, 2018 at 10:57 PM, purna pradeep < >>>>> purna2prad...@gmail.com> wrote: >>>>> >>>>>> >>>>>> I have tried with coordinator's configuration too but no luck ☹️ >>>>>> >>>>>> On Wed, May 16, 2018 at 3:54 PM Peter Cseh <gezap...@cloudera.com> >>>>>> wrote: >>>>>> >>>>
Re: Oozie for spark jobs without Hadoop
Peter, Also When I submit a job with new http client jar, I get ```Error: IO_ERROR : java.io.IOException: Error while connecting Oozie server. No of retries = 1. Exception = Could not authenticate, Authentication failed, status: 500, message: Server Error``` On Thu, May 17, 2018 at 12:14 PM purna pradeep <purna2prad...@gmail.com> wrote: > Ok I have tried this > > It appears that s3a support requires httpclient 4.4.x and oozie is bundled > with httpclient 4.3.6. When httpclient is upgraded, the ext UI stops > loading. > > > > On Thu, May 17, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> wrote: > >> Purna, >> >> Based on >> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3 >> you should try to go for s3a. >> You'll have to include the aws-jdk as well if I see it correctly: >> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A >> Also, the property names are slightly different so you'll have to change >> the example I've given. >> >> >> >> On Thu, May 17, 2018 at 4:16 PM, purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> Peter, >>> >>> I’m using latest oozie 5.0.0 and I have tried below changes but no luck >>> >>> Is this for s3 or s3a ? >>> >>> I’m using s3 but if this is for s3a do you know which jar I need to >>> include I mean Hadoop-aws jar or any other jar if required >>> >>> Hadoop-aws-2.8.3.jar is what I’m using >>> >>> On Wed, May 16, 2018 at 5:19 PM Peter Cseh <gezap...@cloudera.com> >>> wrote: >>> >>>> Ok, I've found it: >>>> >>>> If you are using 4.3.0 or newer this is the part which checks for >>>> dependencies: >>>> >>>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java#L914-L926 >>>> It passes the coordinator action's configuration and even does >>>> impersonation to check for the dependencies: >>>> >>>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java#L159 >>>> >>>> Have you tried the following in the coordinator xml: >>>> >>>> >>>> >>>> hdfs://bar:9000/usr/joe/logsprocessor-wf >>>> >>>> >>>> fs.s3.awsAccessKeyId >>>> [YOURKEYID] >>>> >>>> >>>> fs.s3.awsSecretAccessKey >>>> [YOURKEY] >>>> >>>> >>>> >>>> >>>> >>>> Based on the source this should be able to poll s3 periodically. >>>> >>>> On Wed, May 16, 2018 at 10:57 PM, purna pradeep < >>>> purna2prad...@gmail.com> wrote: >>>> >>>>> >>>>> I have tried with coordinator's configuration too but no luck ☹️ >>>>> >>>>> On Wed, May 16, 2018 at 3:54 PM Peter Cseh <gezap...@cloudera.com> >>>>> wrote: >>>>> >>>>>> Great progress there purna! :) >>>>>> >>>>>> Have you tried adding these properites to the coordinator's >>>>>> configuration? we usually use the action config to build up connection to >>>>>> the distributed file system. >>>>>> Although I'm not sure we're using these when polling the dependencies >>>>>> for coordinators, but I'm excited about you trying to make it work! >>>>>> >>>>>> I'll get back with a - hopefully - more helpful answer soon, I have >>>>>> to check the code in more depth first. >>>>>> gp >>>>>> >>>>>> On Wed, May 16, 2018 at 9:45 PM, purna pradeep < >>>>>> purna2prad...@gmail.com> wrote: >>>>>> >>>>>>> Peter, >>>>>>> >>>>>>> I got rid of this error by adding >>>>>>> hadoop-aws-2.8.3.jar and jets3t-0.9.4.jar >>>>>>> >>>>>>> But I’m getting below error now >>>>>>> >>>>>>> java.lang.IllegalArgumentException: AWS Access Key ID and Secret >>>>>>> Access Key must be specified by setting the fs.s3.awsAccessKeyI
Re: Oozie for spark jobs without Hadoop
Ok I have tried this It appears that s3a support requires httpclient 4.4.x and oozie is bundled with httpclient 4.3.6. When httpclient is upgraded, the ext UI stops loading. On Thu, May 17, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> wrote: > Purna, > > Based on > https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3 > you should try to go for s3a. > You'll have to include the aws-jdk as well if I see it correctly: > https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A > Also, the property names are slightly different so you'll have to change > the example I've given. > > > > On Thu, May 17, 2018 at 4:16 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Peter, >> >> I’m using latest oozie 5.0.0 and I have tried below changes but no luck >> >> Is this for s3 or s3a ? >> >> I’m using s3 but if this is for s3a do you know which jar I need to >> include I mean Hadoop-aws jar or any other jar if required >> >> Hadoop-aws-2.8.3.jar is what I’m using >> >> On Wed, May 16, 2018 at 5:19 PM Peter Cseh <gezap...@cloudera.com> wrote: >> >>> Ok, I've found it: >>> >>> If you are using 4.3.0 or newer this is the part which checks for >>> dependencies: >>> >>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java#L914-L926 >>> It passes the coordinator action's configuration and even does >>> impersonation to check for the dependencies: >>> >>> https://github.com/apache/oozie/blob/master/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java#L159 >>> >>> Have you tried the following in the coordinator xml: >>> >>> >>> >>> hdfs://bar:9000/usr/joe/logsprocessor-wf >>> >>> >>> fs.s3.awsAccessKeyId >>> [YOURKEYID] >>> >>> >>> fs.s3.awsSecretAccessKey >>> [YOURKEY] >>> >>> >>> >>> >>> >>> Based on the source this should be able to poll s3 periodically. >>> >>> On Wed, May 16, 2018 at 10:57 PM, purna pradeep <purna2prad...@gmail.com >>> > wrote: >>> >>>> >>>> I have tried with coordinator's configuration too but no luck ☹️ >>>> >>>> On Wed, May 16, 2018 at 3:54 PM Peter Cseh <gezap...@cloudera.com> >>>> wrote: >>>> >>>>> Great progress there purna! :) >>>>> >>>>> Have you tried adding these properites to the coordinator's >>>>> configuration? we usually use the action config to build up connection to >>>>> the distributed file system. >>>>> Although I'm not sure we're using these when polling the dependencies >>>>> for coordinators, but I'm excited about you trying to make it work! >>>>> >>>>> I'll get back with a - hopefully - more helpful answer soon, I have to >>>>> check the code in more depth first. >>>>> gp >>>>> >>>>> On Wed, May 16, 2018 at 9:45 PM, purna pradeep < >>>>> purna2prad...@gmail.com> wrote: >>>>> >>>>>> Peter, >>>>>> >>>>>> I got rid of this error by adding >>>>>> hadoop-aws-2.8.3.jar and jets3t-0.9.4.jar >>>>>> >>>>>> But I’m getting below error now >>>>>> >>>>>> java.lang.IllegalArgumentException: AWS Access Key ID and Secret >>>>>> Access Key must be specified by setting the fs.s3.awsAccessKeyId and >>>>>> fs.s3.awsSecretAccessKey properties (respectively) >>>>>> >>>>>> I have tried adding AWS access ,secret keys in >>>>>> >>>>>> oozie-site.xml and hadoop core-site.xml , and hadoop-config.xml >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, May 16, 2018 at 2:30 PM purna pradeep < >>>>>> purna2prad...@gmail.com> wrote: >>>>>> >>>>>>> >>>>>>> I have tried this ,just added s3 instead of * >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>&g
Re: Spark 2.3 in oozie
Thanks Peter! I’m able to run spark pi example on Kubernetes cluster from oozie after this change On Wed, May 16, 2018 at 10:27 AM Peter Cseh <gezap...@cloudera.com> wrote: > The version of the xml schema has nothing to do with the version of the > component you're using. > > Thanks for verifying that -Dspark.scala.binary.verstion=2.11 is required > for compilation with Spark 2.3.0 > > Oozie does not pull in Spark's Kubernetes artifact. > To make it part of the Oozie Spark sharelib you'll have to include the > spark-kubernetes.jar > < > https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-kubernetes_2.11%7C2.3.0%7Cjar > > > in > the sharelib/spark/pom.xml as a compile-time dependency. > > gp > > On Tue, May 15, 2018 at 9:04 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > > > I’m able to compile successfully with after adding this override option > > > > -Dspark.scala.binary.version=2.11 > > > > Dspark.version = 2.3.0 > > > > But when I’m running a spark action with spark-pi example jar against > > Kubernetes master I’m getting below error in stderr log > > > > > > *Error:Could not load KUBERNETES classes.This copy of spark may not have > > been compiled with Kubernetes support* > > > > Below is my workflow.xml > > > > <*spark xmlns="uri:oozie:spark-action:1.0">* > > > > *${resourceManager}* > > > > *${nameNode}* > > > > *k8s://<***.com>* > > > > *Python-Spark-Pi* > > > > *spark-examples_2.11-2.3.0.jar* > > > > *--class org.apache.spark.examples.SparkPi --conf > > spark.executor.instances=2 --conf spark.kubernetes.namespace=spark --conf > > spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf > > spark.kubernetes.container.image=artifactory.cloud. > > capitalone.com/kubespark/spark-quantum:v2.3.0 > > <http://artifactory.cloud.capitalone.com/kubespark/spark-quantum:v2.3.0> > > --conf > spark.kubernetes.node.selector.node-role.kubernetes.io/worker=true > > < > http://spark.kubernetes.node.selector.node-role.kubernetes.io/worker=true > > > > > --conf > > spark.kubernetes.driver.label.application=is1-driver --conf > > spark.kubernetes.executor.label.application=is1-exec*utor > > local:///opt/spark/examples/jars/spark-examples_2.11-2.3. > > 0.jar > > > > > > > > > > Is this because of uri:oozie:spark-action:1.0 in spark xml tag? Does it > > needs to be spark-action:2.0 as I’m using spark 2.3? > > > > > > Please suggest! > > > > > > On Tue, May 15, 2018 at 12:43 PM Peter Cseh <gezap...@cloudera.com> > wrote: > > > > > I think the error is related to the Scala version being present in the > > > artifact name. > > > I'll take a look at this tomorrow. > > > Gp > > > > > > On Tue, May 15, 2018, 18:28 Artem Ervits <artemerv...@gmail.com> > wrote: > > > > > > > Did you run > > > > mvn clean install first on the parent directory? > > > > > > > > On Tue, May 15, 2018, 11:35 AM purna pradeep < > purna2prad...@gmail.com> > > > > wrote: > > > > > > > > > Thanks peter, > > > > > > > > > > I have tried changing Dspark.version to 2.3.0 and compiled oozie > I’m > > > > > getting below error from oozie examples > > > > > > > > > > > > > > > *ERROR] Failed to execute goal on project oozie-examples: Could not > > > > resolve > > > > > dependencies for project org.apache.oozie:oozie-examples:jar:5.0.0: > > > Could > > > > > not find artifact org.apache.spark:spark-core_2.10:jar:2.3.0 in > > > > resolution > > > > > * > > > > > > > > > > On Tue, May 15, 2018 at 11:14 AM Peter Cseh <gezap...@cloudera.com > > > > > > wrote: > > > > > > > > > > > Oozie has a spark-2 profile that is currently hard-coded to Spark > > > 2.1: > > > > > > https://github.com/apache/oozie/blob/master/pom.xml#L1983 > > > > > > I'm sure if you overwrite the -Dspark.version and compile Oozie > > that > > > > way > > > > > it > > > > > > will work. > > > > > > gp > > > > > > > > > > > > > > > > > > On Tue, May 15, 2018
Re: Oozie for spark jobs without Hadoop
Peter, I got rid of this error by adding hadoop-aws-2.8.3.jar and jets3t-0.9.4.jar But I’m getting below error now java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified by setting the fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey properties (respectively) I have tried adding AWS access ,secret keys in oozie-site.xml and hadoop core-site.xml , and hadoop-config.xml On Wed, May 16, 2018 at 2:30 PM purna pradeep <purna2prad...@gmail.com> wrote: > > I have tried this ,just added s3 instead of * > > > > oozie.service.HadoopAccessorService.supported.filesystems > > hdfs,hftp,webhdfs,s3 > > > > > Getting below error > > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class > org.apache.hadoop.fs.s3a.S3AFileSystem not found > > at > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2369) > > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793) > > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810) > > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100) > > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849) > > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831) > > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389) > > at > org.apache.oozie.service.HadoopAccessorService$5.run(HadoopAccessorService.java:625) > > at > org.apache.oozie.service.HadoopAccessorService$5.run(HadoopAccessorService.java:623 > > > On Wed, May 16, 2018 at 2:19 PM purna pradeep <purna2prad...@gmail.com> > wrote: > >> This is what is in the logs >> >> 2018-05-16 14:06:13,500 INFO URIHandlerService:520 - SERVER[localhost] >> Loaded urihandlers [org.apache.oozie.dependency.FSURIHandler] >> >> 2018-05-16 14:06:13,501 INFO URIHandlerService:520 - SERVER[localhost] >> Loaded default urihandler org.apache.oozie.dependency.FSURIHandler >> >> >> On Wed, May 16, 2018 at 12:27 PM Peter Cseh <gezap...@cloudera.com> >> wrote: >> >>> That's strange, this exception should not happen in that case. >>> Can you check the server logs for messages like this? >>> LOG.info("Loaded urihandlers {0}", Arrays.toString(classes)); >>> LOG.info("Loaded default urihandler {0}", >>> defaultHandler.getClass().getName()); >>> Thanks >>> >>> On Wed, May 16, 2018 at 5:47 PM, purna pradeep <purna2prad...@gmail.com> >>> wrote: >>> >>>> This is what I already have in my oozie-site.xml >>>> >>>> >>>> >>>> >>>> oozie.service.HadoopAccessorService.supported.filesystems >>>> >>>> * >>>> >>>> >>>> >>>> On Wed, May 16, 2018 at 11:37 AM Peter Cseh <gezap...@cloudera.com> >>>> wrote: >>>> >>>>> You'll have to configure >>>>> oozie.service.HadoopAccessorService.supported.filesystems >>>>> hdfs,hftp,webhdfs Enlist >>>>> the different filesystems supported for federation. If wildcard "*" is >>>>> specified, then ALL file schemes will be allowed.properly. >>>>> >>>>> For testing purposes it's ok to put * in there in oozie-site.xml >>>>> >>>>> On Wed, May 16, 2018 at 5:29 PM, purna pradeep < >>>>> purna2prad...@gmail.com> >>>>> wrote: >>>>> >>>>> > Peter, >>>>> > >>>>> > I have tried to specify dataset with uri starting with s3://, s3a:// >>>>> and >>>>> > s3n:// and I am getting exception >>>>> > >>>>> > >>>>> > >>>>> > Exception occurred:E0904: Scheme [s3] not supported in uri >>>>> > [s3://mybucket/input.data] Making the job failed >>>>> > >>>>> > org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3] >>>>> not >>>>> > supported in uri [s3:// mybucket /input.data] >>>>> > >>>>> > at >>>>> > org.apache.oozie.service.URIHandlerService.getURIHandler( >>>>> > URIHandlerService.java:185) >>>>> > >>>>> > at >>>>> > org.apache.oozie.service.URIHandlerService.getURIHandler( >>>>> > URIHandlerService.java:168) >>>>>
Re: Oozie for spark jobs without Hadoop
I have tried this ,just added s3 instead of * oozie.service.HadoopAccessorService.supported.filesystems hdfs,hftp,webhdfs,s3 Getting below error java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2369) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389) at org.apache.oozie.service.HadoopAccessorService$5.run(HadoopAccessorService.java:625) at org.apache.oozie.service.HadoopAccessorService$5.run(HadoopAccessorService.java:623 On Wed, May 16, 2018 at 2:19 PM purna pradeep <purna2prad...@gmail.com> wrote: > This is what is in the logs > > 2018-05-16 14:06:13,500 INFO URIHandlerService:520 - SERVER[localhost] > Loaded urihandlers [org.apache.oozie.dependency.FSURIHandler] > > 2018-05-16 14:06:13,501 INFO URIHandlerService:520 - SERVER[localhost] > Loaded default urihandler org.apache.oozie.dependency.FSURIHandler > > > On Wed, May 16, 2018 at 12:27 PM Peter Cseh <gezap...@cloudera.com> wrote: > >> That's strange, this exception should not happen in that case. >> Can you check the server logs for messages like this? >> LOG.info("Loaded urihandlers {0}", Arrays.toString(classes)); >> LOG.info("Loaded default urihandler {0}", >> defaultHandler.getClass().getName()); >> Thanks >> >> On Wed, May 16, 2018 at 5:47 PM, purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> This is what I already have in my oozie-site.xml >>> >>> >>> >>> >>> oozie.service.HadoopAccessorService.supported.filesystems >>> >>> * >>> >>> >>> >>> On Wed, May 16, 2018 at 11:37 AM Peter Cseh <gezap...@cloudera.com> >>> wrote: >>> >>>> You'll have to configure >>>> oozie.service.HadoopAccessorService.supported.filesystems >>>> hdfs,hftp,webhdfs Enlist >>>> the different filesystems supported for federation. If wildcard "*" is >>>> specified, then ALL file schemes will be allowed.properly. >>>> >>>> For testing purposes it's ok to put * in there in oozie-site.xml >>>> >>>> On Wed, May 16, 2018 at 5:29 PM, purna pradeep <purna2prad...@gmail.com >>>> > >>>> wrote: >>>> >>>> > Peter, >>>> > >>>> > I have tried to specify dataset with uri starting with s3://, s3a:// >>>> and >>>> > s3n:// and I am getting exception >>>> > >>>> > >>>> > >>>> > Exception occurred:E0904: Scheme [s3] not supported in uri >>>> > [s3://mybucket/input.data] Making the job failed >>>> > >>>> > org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3] >>>> not >>>> > supported in uri [s3:// mybucket /input.data] >>>> > >>>> > at >>>> > org.apache.oozie.service.URIHandlerService.getURIHandler( >>>> > URIHandlerService.java:185) >>>> > >>>> > at >>>> > org.apache.oozie.service.URIHandlerService.getURIHandler( >>>> > URIHandlerService.java:168) >>>> > >>>> > at >>>> > org.apache.oozie.service.URIHandlerService.getURIHandler( >>>> > URIHandlerService.java:160) >>>> > >>>> > at >>>> > org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs( >>>> > CoordCommandUtils.java:465) >>>> > >>>> > at >>>> > org.apache.oozie.command.coord.CoordCommandUtils. >>>> > separateResolvedAndUnresolved(CoordCommandUtils.java:404) >>>> > >>>> > at >>>> > org.apache.oozie.command.coord.CoordCommandUtils. >>>> > materializeInputDataEvents(CoordCommandUtils.java:731) >>>> > >>>> > at >>>> > >>>> org.apache.oozie.command.coord.CoordCommandUtils.materializeOneInstance( >>>> > CoordCommandUtils.java:546) >>>> > >
Re: Oozie for spark jobs without Hadoop
This is what is in the logs 2018-05-16 14:06:13,500 INFO URIHandlerService:520 - SERVER[localhost] Loaded urihandlers [org.apache.oozie.dependency.FSURIHandler] 2018-05-16 14:06:13,501 INFO URIHandlerService:520 - SERVER[localhost] Loaded default urihandler org.apache.oozie.dependency.FSURIHandler On Wed, May 16, 2018 at 12:27 PM Peter Cseh <gezap...@cloudera.com> wrote: > That's strange, this exception should not happen in that case. > Can you check the server logs for messages like this? > LOG.info("Loaded urihandlers {0}", Arrays.toString(classes)); > LOG.info("Loaded default urihandler {0}", > defaultHandler.getClass().getName()); > Thanks > > On Wed, May 16, 2018 at 5:47 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> This is what I already have in my oozie-site.xml >> >> >> >> >> oozie.service.HadoopAccessorService.supported.filesystems >> >> * >> >> >> >> On Wed, May 16, 2018 at 11:37 AM Peter Cseh <gezap...@cloudera.com> >> wrote: >> >>> You'll have to configure >>> oozie.service.HadoopAccessorService.supported.filesystems >>> hdfs,hftp,webhdfs Enlist >>> the different filesystems supported for federation. If wildcard "*" is >>> specified, then ALL file schemes will be allowed.properly. >>> >>> For testing purposes it's ok to put * in there in oozie-site.xml >>> >>> On Wed, May 16, 2018 at 5:29 PM, purna pradeep <purna2prad...@gmail.com> >>> wrote: >>> >>> > Peter, >>> > >>> > I have tried to specify dataset with uri starting with s3://, s3a:// >>> and >>> > s3n:// and I am getting exception >>> > >>> > >>> > >>> > Exception occurred:E0904: Scheme [s3] not supported in uri >>> > [s3://mybucket/input.data] Making the job failed >>> > >>> > org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3] not >>> > supported in uri [s3:// mybucket /input.data] >>> > >>> > at >>> > org.apache.oozie.service.URIHandlerService.getURIHandler( >>> > URIHandlerService.java:185) >>> > >>> > at >>> > org.apache.oozie.service.URIHandlerService.getURIHandler( >>> > URIHandlerService.java:168) >>> > >>> > at >>> > org.apache.oozie.service.URIHandlerService.getURIHandler( >>> > URIHandlerService.java:160) >>> > >>> > at >>> > org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs( >>> > CoordCommandUtils.java:465) >>> > >>> > at >>> > org.apache.oozie.command.coord.CoordCommandUtils. >>> > separateResolvedAndUnresolved(CoordCommandUtils.java:404) >>> > >>> > at >>> > org.apache.oozie.command.coord.CoordCommandUtils. >>> > materializeInputDataEvents(CoordCommandUtils.java:731) >>> > >>> > at >>> > >>> org.apache.oozie.command.coord.CoordCommandUtils.materializeOneInstance( >>> > CoordCommandUtils.java:546) >>> > >>> > at >>> > org.apache.oozie.command.coord.CoordMaterializeTransitionXCom >>> > mand.materializeActions(CoordMaterializeTransitionXCommand.java:492) >>> > >>> > at >>> > org.apache.oozie.command.coord.CoordMaterializeTransitionXCom >>> > mand.materialize(CoordMaterializeTransitionXCommand.java:362) >>> > >>> > at >>> > org.apache.oozie.command.MaterializeTransitionXCommand.execute( >>> > MaterializeTransitionXCommand.java:73) >>> > >>> > at >>> > org.apache.oozie.command.MaterializeTransitionXCommand.execute( >>> > MaterializeTransitionXCommand.java:29) >>> > >>> > at org.apache.oozie.command.XCommand.call(XCommand.java:290) >>> > >>> > at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> > >>> > at >>> > org.apache.oozie.service.CallableQueueService$CallableWrapper.run( >>> > CallableQueueService.java:181) >>> > >>> > at >>> > java.util.concurrent.ThreadPoolExecutor.runWorker( >>> > ThreadPoolExecutor.java:1149) >>> > >>> > at >>> > java.util.concurrent.ThreadPoolExecutor$Worker.run( >>> > ThreadPoolExecutor.java:624) >>> > >>&
Re: Oozie for spark jobs without Hadoop
This is what I already have in my oozie-site.xml oozie.service.HadoopAccessorService.supported.filesystems * On Wed, May 16, 2018 at 11:37 AM Peter Cseh <gezap...@cloudera.com> wrote: > You'll have to configure > oozie.service.HadoopAccessorService.supported.filesystems > hdfs,hftp,webhdfs Enlist > the different filesystems supported for federation. If wildcard "*" is > specified, then ALL file schemes will be allowed.properly. > > For testing purposes it's ok to put * in there in oozie-site.xml > > On Wed, May 16, 2018 at 5:29 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > > > Peter, > > > > I have tried to specify dataset with uri starting with s3://, s3a:// and > > s3n:// and I am getting exception > > > > > > > > Exception occurred:E0904: Scheme [s3] not supported in uri > > [s3://mybucket/input.data] Making the job failed > > > > org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3] not > > supported in uri [s3:// mybucket /input.data] > > > > at > > org.apache.oozie.service.URIHandlerService.getURIHandler( > > URIHandlerService.java:185) > > > > at > > org.apache.oozie.service.URIHandlerService.getURIHandler( > > URIHandlerService.java:168) > > > > at > > org.apache.oozie.service.URIHandlerService.getURIHandler( > > URIHandlerService.java:160) > > > > at > > org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs( > > CoordCommandUtils.java:465) > > > > at > > org.apache.oozie.command.coord.CoordCommandUtils. > > separateResolvedAndUnresolved(CoordCommandUtils.java:404) > > > > at > > org.apache.oozie.command.coord.CoordCommandUtils. > > materializeInputDataEvents(CoordCommandUtils.java:731) > > > > at > > org.apache.oozie.command.coord.CoordCommandUtils.materializeOneInstance( > > CoordCommandUtils.java:546) > > > > at > > org.apache.oozie.command.coord.CoordMaterializeTransitionXCom > > mand.materializeActions(CoordMaterializeTransitionXCommand.java:492) > > > > at > > org.apache.oozie.command.coord.CoordMaterializeTransitionXCom > > mand.materialize(CoordMaterializeTransitionXCommand.java:362) > > > > at > > org.apache.oozie.command.MaterializeTransitionXCommand.execute( > > MaterializeTransitionXCommand.java:73) > > > > at > > org.apache.oozie.command.MaterializeTransitionXCommand.execute( > > MaterializeTransitionXCommand.java:29) > > > > at org.apache.oozie.command.XCommand.call(XCommand.java:290) > > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > > > at > > org.apache.oozie.service.CallableQueueService$CallableWrapper.run( > > CallableQueueService.java:181) > > > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker( > > ThreadPoolExecutor.java:1149) > > > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run( > > ThreadPoolExecutor.java:624) > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > > > Is S3 support specific to CDH distribution or should it work in Apache > > Oozie as well? I’m not using CDH yet so > > > > On Wed, May 16, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> > wrote: > > > > > I think it should be possible for Oozie to poll S3. Check out this > > > < > > > https://www.cloudera.com/documentation/enterprise/5-9- > > x/topics/admin_oozie_s3.html > > > > > > > description on how to make it work in jobs, something similar should > work > > > on the server side as well > > > > > > On Tue, May 15, 2018 at 4:43 PM, purna pradeep < > purna2prad...@gmail.com> > > > wrote: > > > > > > > Thanks Andras, > > > > > > > > Also I also would like to know if oozie supports Aws S3 as input > events > > > to > > > > poll for a dependency file before kicking off a spark action > > > > > > > > > > > > For example: I don’t want to kick off a spark action until a file is > > > > arrived on a given AWS s3 location > > > > > > > > On Tue, May 15, 2018 at 10:17 AM Andras Piros < > > andras.pi...@cloudera.com > > > > > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > Oozie needs HDFS to store workflow, coordinator, or bundle > > definitions, > > &g
Re: Oozie for spark jobs without Hadoop
+Peter On Wed, May 16, 2018 at 11:29 AM purna pradeep <purna2prad...@gmail.com> wrote: > Peter, > > I have tried to specify dataset with uri starting with s3://, s3a:// and > s3n:// and I am getting exception > > > > Exception occurred:E0904: Scheme [s3] not supported in uri > [s3://mybucket/input.data] Making the job failed > > org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3] not > supported in uri [s3:// mybucket /input.data] > > at > org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:185) > > at > org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:168) > > at > org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:160) > > at > org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs(CoordCommandUtils.java:465) > > at > org.apache.oozie.command.coord.CoordCommandUtils.separateResolvedAndUnresolved(CoordCommandUtils.java:404) > > at > org.apache.oozie.command.coord.CoordCommandUtils.materializeInputDataEvents(CoordCommandUtils.java:731) > > at > org.apache.oozie.command.coord.CoordCommandUtils.materializeOneInstance(CoordCommandUtils.java:546) > > at > org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand.materializeActions(CoordMaterializeTransitionXCommand.java:492) > > at > org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand.materialize(CoordMaterializeTransitionXCommand.java:362) > > at > org.apache.oozie.command.MaterializeTransitionXCommand.execute(MaterializeTransitionXCommand.java:73) > > at > org.apache.oozie.command.MaterializeTransitionXCommand.execute(MaterializeTransitionXCommand.java:29) > > at org.apache.oozie.command.XCommand.call(XCommand.java:290) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:181) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > > > Is S3 support specific to CDH distribution or should it work in Apache > Oozie as well? I’m not using CDH yet so > > On Wed, May 16, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> wrote: > >> I think it should be possible for Oozie to poll S3. Check out this >> < >> https://www.cloudera.com/documentation/enterprise/5-9-x/topics/admin_oozie_s3.html >> > >> description on how to make it work in jobs, something similar should work >> on the server side as well >> >> On Tue, May 15, 2018 at 4:43 PM, purna pradeep <purna2prad...@gmail.com> >> wrote: >> >> > Thanks Andras, >> > >> > Also I also would like to know if oozie supports Aws S3 as input events >> to >> > poll for a dependency file before kicking off a spark action >> > >> > >> > For example: I don’t want to kick off a spark action until a file is >> > arrived on a given AWS s3 location >> > >> > On Tue, May 15, 2018 at 10:17 AM Andras Piros < >> andras.pi...@cloudera.com> >> > wrote: >> > >> > > Hi, >> > > >> > > Oozie needs HDFS to store workflow, coordinator, or bundle >> definitions, >> > as >> > > well as sharelib files in a safe, distributed and scalable way. Oozie >> > needs >> > > YARN to run almost all of its actions, Spark action being no >> exception. >> > > >> > > At the moment it's not feasible to install Oozie without those Hadoop >> > > components. How to install Oozie please *find here >> > > <https://oozie.apache.org/docs/5.0.0/AG_Install.html>*. >> > > >> > > Regards, >> > > >> > > Andras >> > > >> > > On Tue, May 15, 2018 at 4:11 PM, purna pradeep < >> purna2prad...@gmail.com> >> > > wrote: >> > > >> > > > Hi, >> > > > >> > > > Would like to know if I can use sparkaction in oozie without having >> > > Hadoop >> > > > cluster? >> > > > >> > > > I want to use oozie to schedule spark jobs on Kubernetes cluster >> > > > >> > > > I’m a beginner in oozie >> > > > >> > > > Thanks >> > > > >> > > >> > >> >> >> >> -- >> *Peter Cseh *| Software Engineer >> cloudera.com <https://www.cloudera.com> >> >> [image: Cloudera] <https://www.cloudera.com/> >> >> [image: Cloudera on Twitter] <https://twitter.com/cloudera> [image: >> Cloudera on Facebook] <https://www.facebook.com/cloudera> [image: >> Cloudera >> on LinkedIn] <https://www.linkedin.com/company/cloudera> >> -- >> >
Re: Oozie for spark jobs without Hadoop
Peter, I have tried to specify dataset with uri starting with s3://, s3a:// and s3n:// and I am getting exception Exception occurred:E0904: Scheme [s3] not supported in uri [s3://mybucket/input.data] Making the job failed org.apache.oozie.dependency.URIHandlerException: E0904: Scheme [s3] not supported in uri [s3:// mybucket /input.data] at org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:185) at org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:168) at org.apache.oozie.service.URIHandlerService.getURIHandler(URIHandlerService.java:160) at org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs(CoordCommandUtils.java:465) at org.apache.oozie.command.coord.CoordCommandUtils.separateResolvedAndUnresolved(CoordCommandUtils.java:404) at org.apache.oozie.command.coord.CoordCommandUtils.materializeInputDataEvents(CoordCommandUtils.java:731) at org.apache.oozie.command.coord.CoordCommandUtils.materializeOneInstance(CoordCommandUtils.java:546) at org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand.materializeActions(CoordMaterializeTransitionXCommand.java:492) at org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand.materialize(CoordMaterializeTransitionXCommand.java:362) at org.apache.oozie.command.MaterializeTransitionXCommand.execute(MaterializeTransitionXCommand.java:73) at org.apache.oozie.command.MaterializeTransitionXCommand.execute(MaterializeTransitionXCommand.java:29) at org.apache.oozie.command.XCommand.call(XCommand.java:290) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:181) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Is S3 support specific to CDH distribution or should it work in Apache Oozie as well? I’m not using CDH yet so On Wed, May 16, 2018 at 10:28 AM Peter Cseh <gezap...@cloudera.com> wrote: > I think it should be possible for Oozie to poll S3. Check out this > < > https://www.cloudera.com/documentation/enterprise/5-9-x/topics/admin_oozie_s3.html > > > description on how to make it work in jobs, something similar should work > on the server side as well > > On Tue, May 15, 2018 at 4:43 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > > > Thanks Andras, > > > > Also I also would like to know if oozie supports Aws S3 as input events > to > > poll for a dependency file before kicking off a spark action > > > > > > For example: I don’t want to kick off a spark action until a file is > > arrived on a given AWS s3 location > > > > On Tue, May 15, 2018 at 10:17 AM Andras Piros <andras.pi...@cloudera.com > > > > wrote: > > > > > Hi, > > > > > > Oozie needs HDFS to store workflow, coordinator, or bundle definitions, > > as > > > well as sharelib files in a safe, distributed and scalable way. Oozie > > needs > > > YARN to run almost all of its actions, Spark action being no exception. > > > > > > At the moment it's not feasible to install Oozie without those Hadoop > > > components. How to install Oozie please *find here > > > <https://oozie.apache.org/docs/5.0.0/AG_Install.html>*. > > > > > > Regards, > > > > > > Andras > > > > > > On Tue, May 15, 2018 at 4:11 PM, purna pradeep < > purna2prad...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > Would like to know if I can use sparkaction in oozie without having > > > Hadoop > > > > cluster? > > > > > > > > I want to use oozie to schedule spark jobs on Kubernetes cluster > > > > > > > > I’m a beginner in oozie > > > > > > > > Thanks > > > > > > > > > > > > > -- > *Peter Cseh *| Software Engineer > cloudera.com <https://www.cloudera.com> > > [image: Cloudera] <https://www.cloudera.com/> > > [image: Cloudera on Twitter] <https://twitter.com/cloudera> [image: > Cloudera on Facebook] <https://www.facebook.com/cloudera> [image: Cloudera > on LinkedIn] <https://www.linkedin.com/company/cloudera> > -- >
Re: Spark 2.3 in oozie
I’m able to compile successfully with after adding this override option -Dspark.scala.binary.version=2.11 Dspark.version = 2.3.0 But when I’m running a spark action with spark-pi example jar against Kubernetes master I’m getting below error in stderr log *Error:Could not load KUBERNETES classes.This copy of spark may not have been compiled with Kubernetes support* Below is my workflow.xml <*spark xmlns="uri:oozie:spark-action:1.0">* *${resourceManager}* *${nameNode}* *k8s://<***.com>* *Python-Spark-Pi* *spark-examples_2.11-2.3.0.jar* *--class org.apache.spark.examples.SparkPi --conf spark.executor.instances=2 --conf spark.kubernetes.namespace=spark --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.container.image=artifactory.cloud.capitalone.com/kubespark/spark-quantum:v2.3.0 <http://artifactory.cloud.capitalone.com/kubespark/spark-quantum:v2.3.0> --conf spark.kubernetes.node.selector.node-role.kubernetes.io/worker=true <http://spark.kubernetes.node.selector.node-role.kubernetes.io/worker=true> --conf spark.kubernetes.driver.label.application=is1-driver --conf spark.kubernetes.executor.label.application=is1-exec*utor local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar Is this because of uri:oozie:spark-action:1.0 in spark xml tag? Does it needs to be spark-action:2.0 as I’m using spark 2.3? Please suggest! On Tue, May 15, 2018 at 12:43 PM Peter Cseh <gezap...@cloudera.com> wrote: > I think the error is related to the Scala version being present in the > artifact name. > I'll take a look at this tomorrow. > Gp > > On Tue, May 15, 2018, 18:28 Artem Ervits <artemerv...@gmail.com> wrote: > > > Did you run > > mvn clean install first on the parent directory? > > > > On Tue, May 15, 2018, 11:35 AM purna pradeep <purna2prad...@gmail.com> > > wrote: > > > > > Thanks peter, > > > > > > I have tried changing Dspark.version to 2.3.0 and compiled oozie I’m > > > getting below error from oozie examples > > > > > > > > > *ERROR] Failed to execute goal on project oozie-examples: Could not > > resolve > > > dependencies for project org.apache.oozie:oozie-examples:jar:5.0.0: > Could > > > not find artifact org.apache.spark:spark-core_2.10:jar:2.3.0 in > > resolution > > > * > > > > > > On Tue, May 15, 2018 at 11:14 AM Peter Cseh <gezap...@cloudera.com> > > wrote: > > > > > > > Oozie has a spark-2 profile that is currently hard-coded to Spark > 2.1: > > > > https://github.com/apache/oozie/blob/master/pom.xml#L1983 > > > > I'm sure if you overwrite the -Dspark.version and compile Oozie that > > way > > > it > > > > will work. > > > > gp > > > > > > > > > > > > On Tue, May 15, 2018 at 5:07 PM, purna pradeep < > > purna2prad...@gmail.com> > > > > wrote: > > > > > > > > > Hello, > > > > > > > > > > Does oozie supports spark 2.3? Or will it even care of the spark > > > version > > > > > > > > > > I want to use spark action > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > Purna > > > > > > > > > > > > > > > > > > > > > -- > > > > *Peter Cseh *| Software Engineer > > > > cloudera.com <https://www.cloudera.com> > > > > > > > > [image: Cloudera] <https://www.cloudera.com/> > > > > > > > > [image: Cloudera on Twitter] <https://twitter.com/cloudera> [image: > > > > Cloudera on Facebook] <https://www.facebook.com/cloudera> [image: > > > Cloudera > > > > on LinkedIn] <https://www.linkedin.com/company/cloudera> > > > > -- > > > > > > > > > >
Re: Spark 2.3 in oozie
Thanks peter, I have tried changing Dspark.version to 2.3.0 and compiled oozie I’m getting below error from oozie examples *ERROR] Failed to execute goal on project oozie-examples: Could not resolve dependencies for project org.apache.oozie:oozie-examples:jar:5.0.0: Could not find artifact org.apache.spark:spark-core_2.10:jar:2.3.0 in resolution * On Tue, May 15, 2018 at 11:14 AM Peter Cseh <gezap...@cloudera.com> wrote: > Oozie has a spark-2 profile that is currently hard-coded to Spark 2.1: > https://github.com/apache/oozie/blob/master/pom.xml#L1983 > I'm sure if you overwrite the -Dspark.version and compile Oozie that way it > will work. > gp > > > On Tue, May 15, 2018 at 5:07 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > > > Hello, > > > > Does oozie supports spark 2.3? Or will it even care of the spark version > > > > I want to use spark action > > > > > > > > Thanks, > > Purna > > > > > > -- > *Peter Cseh *| Software Engineer > cloudera.com <https://www.cloudera.com> > > [image: Cloudera] <https://www.cloudera.com/> > > [image: Cloudera on Twitter] <https://twitter.com/cloudera> [image: > Cloudera on Facebook] <https://www.facebook.com/cloudera> [image: Cloudera > on LinkedIn] <https://www.linkedin.com/company/cloudera> > -- >
Spark 2.3 in oozie
Hello, Does oozie supports spark 2.3? Or will it even care of the spark version I want to use spark action Thanks, Purna
Re: Oozie for spark jobs without Hadoop
Thanks Andras, Also I also would like to know if oozie supports Aws S3 as input events to poll for a dependency file before kicking off a spark action For example: I don’t want to kick off a spark action until a file is arrived on a given AWS s3 location On Tue, May 15, 2018 at 10:17 AM Andras Piros <andras.pi...@cloudera.com> wrote: > Hi, > > Oozie needs HDFS to store workflow, coordinator, or bundle definitions, as > well as sharelib files in a safe, distributed and scalable way. Oozie needs > YARN to run almost all of its actions, Spark action being no exception. > > At the moment it's not feasible to install Oozie without those Hadoop > components. How to install Oozie please *find here > <https://oozie.apache.org/docs/5.0.0/AG_Install.html>*. > > Regards, > > Andras > > On Tue, May 15, 2018 at 4:11 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > > > Hi, > > > > Would like to know if I can use sparkaction in oozie without having > Hadoop > > cluster? > > > > I want to use oozie to schedule spark jobs on Kubernetes cluster > > > > I’m a beginner in oozie > > > > Thanks > > >
Workflow S3 listener
Hi, Hi, I’m very new to oozie ,actually I would like to run Spark 2.3 jobs on oozie based on file arrival on aws s3 which is a dependency for the job I see some examples which uses s3 as input event datasets as below s3n://mybucket/a/b/${YEAR}/${MONTH}/${DAY} So my question is does oozie listenes to file arrival on aws s3 to check for dependency before kicking off spark job ?? Thanks
Oozie with spark 2.3 in Kubernetes
Hello, Would like to know if anyone tried oozie with spark 2.3 actions on Kubernetes for scheduling spark jobs . Thanks, Purna
Re: Scala program to spark-submit on k8 cluster
yes “REST application that submits a Spark job to a k8s cluster by running spark-submit programmatically” and also would like to expose as a Kubernetes service so that clients can access as any other Rest api On Wed, Apr 4, 2018 at 12:25 PM Yinan Liwrote: > Hi Kittu, > > What do you mean by "a Scala program"? Do you mean a program that submits > a Spark job to a k8s cluster by running spark-submit programmatically, or > some example Scala application that is to run on the cluster? > > On Wed, Apr 4, 2018 at 4:45 AM, Kittu M wrote: > >> Hi, >> >> I’m looking for a Scala program to spark submit a Scala application >> (spark 2.3 job) on k8 cluster . >> >> Any help would be much appreciated. Thanks >> >> >> >
unsubscribe
unsubscribe
unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster
Thanks Yinan, Looks like this is stil in alpha version. Would like to know if there is any rest-interface for spark2.3 job submission similar to spark 2.2 as I need to submit spark applications to k8 master based on different events (cron or s3 file based trigger) On Tue, Mar 20, 2018 at 11:50 PM Yinan Li <liyinan...@gmail.com> wrote: > One option is the Spark Operator > <https://github.com/GoogleCloudPlatform/spark-on-k8s-operator>. It allows > specifying and running Spark applications on Kubernetes using Kubernetes > custom resources objects. It takes SparkApplication CRD objects and > automatically submits the applications to run on a Kubernetes cluster. > > Yinan > > On Tue, Mar 20, 2018 at 7:47 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3 >> ,now i want to run spark-submit from AWS lambda function to k8s >> master,would like to know if there is any REST interface to run Spark >> submit on k8s Master > > >
Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster
Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3 ,now i want to run spark-submit from AWS lambda function to k8s master,would like to know if there is any REST interface to run Spark submit on k8s Master
[kubernetes-users] Cluster-autoscaler v1.0.4 error
Hello ,I’m using CA v1.0.4 from kubernetes for aws node autoscaling (Kubernetes version 1.8.3) https://github.com/kubernetes/autoscaler/tree/master/cluster-autoscaler/cloudprovider/aws And I’m getting below error in the logs of CA pod node registry: RequestError: send request failed caused by: Post https://autoscaling.us-west-2.amazonaws.com/: dial tcp: i/o timeout I have tried deploying kube-dns and CA on master nodes as suggested in this git issue still no luck https://github.com/kubernetes/autoscaler/issues/113. I also have autoscaling:* for IAM role policy Please suggest ! -- You received this message because you are subscribed to the Google Groups "Kubernetes user discussion and Q" group. To unsubscribe from this group and stop receiving emails from it, send an email to kubernetes-users+unsubscr...@googlegroups.com. To post to this group, send email to kubernetes-users@googlegroups.com. Visit this group at https://groups.google.com/group/kubernetes-users. For more options, visit https://groups.google.com/d/optout.
Re: Spark 2.3 submit on Kubernetes error
Thanks Yinan, I’m able to get kube-dns endpoints when I ran this command kubectl get ep kube-dns —namespace=kube-system Do I need to deploy under kube-system instead of default namespace And please lemme know if you have any insights on Error1 ? On Sun, Mar 11, 2018 at 8:26 PM Yinan Li <liyinan...@gmail.com> wrote: > Spark on Kubernetes requires the presence of the kube-dns add-on properly > configured. The executors connect to the driver through a headless > Kubernetes service using the DNS name of the service. Can you check if you > have the add-on installed in your cluster? This issue > https://github.com/apache-spark-on-k8s/spark/issues/558 might help. > > > On Sun, Mar 11, 2018 at 5:01 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Getting below errors when I’m trying to run spark-submit on k8 cluster >> >> >> *Error 1*:This looks like a warning it doesn’t interrupt the app running >> inside executor pod but keeps on getting this warning >> >> >> *2018-03-09 11:15:21 WARN WatchConnectionManager:192 - Exec Failure* >> *java.io.EOFException* >> * at >> okio.RealBufferedSource.require(RealBufferedSource.java:60)* >> * at >> okio.RealBufferedSource.readByte(RealBufferedSource.java:73)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.WebSocketReader.readHeader(WebSocketReader.java:113)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.WebSocketReader.processNextFrame(WebSocketReader.java:97)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.RealWebSocket.loopReader(RealWebSocket.java:262)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.RealWebSocket$2.onResponse(RealWebSocket.java:201)* >> * at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)* >> * at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)* >> * at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)* >> * at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)* >> * at java.lang.Thread.run(Thread.java:748)* >> >> >> >> *Error2:* This is intermittent error which is failing the executor pod >> to run >> >> >> *org.apache.spark.SparkException: External scheduler cannot be >> instantiated* >> * at >> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)* >> * at org.apache.spark.SparkContext.(SparkContext.scala:492)* >> * at >> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)* >> * at >> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)* >> * at >> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)* >> * at scala.Option.getOrElse(Option.scala:121)* >> * at >> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)* >> * at >> com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)* >> * at >> com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)* >> * at >> com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)* >> * at >> com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)* >> * at >> com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)* >> *Caused by: io.fabric8.kubernetes.client.KubernetesClientException: >> Operation: [get] for kind: [Pod] with name: >> [myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver] in namespace: >> [default] failed.* >> * at >> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)* >> * at >> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)* >> * at >> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)* >> * at >> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)* >> * at >> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)* >> * at >> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)* >> * at >> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$cr
Spark 2.3 submit on Kubernetes error
Getting below errors when I’m trying to run spark-submit on k8 cluster *Error 1*:This looks like a warning it doesn’t interrupt the app running inside executor pod but keeps on getting this warning *2018-03-09 11:15:21 WARN WatchConnectionManager:192 - Exec Failure* *java.io.EOFException* * at okio.RealBufferedSource.require(RealBufferedSource.java:60)* * at okio.RealBufferedSource.readByte(RealBufferedSource.java:73)* * at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:113)* * at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:97)* * at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:262)* * at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:201)* * at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)* * at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)* * at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)* * at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)* * at java.lang.Thread.run(Thread.java:748)* *Error2:* This is intermittent error which is failing the executor pod to run *org.apache.spark.SparkException: External scheduler cannot be instantiated* * at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)* * at org.apache.spark.SparkContext.(SparkContext.scala:492)* * at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)* * at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)* * at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)* * at scala.Option.getOrElse(Option.scala:121)* * at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)* * at com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)* * at com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)* * at com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)* * at com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)* * at com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)* *Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver] in namespace: [default] failed.* * at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)* * at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)* * at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)* * at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)* * at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)* * at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)* * at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)* * ... 11 more* *Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try again* * at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)* * at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)* * at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)* * at java.net.InetAddress.getAllByName0(InetAddress.java:1276)* * at java.net.InetAddress.getAllByName(InetAddress.java:1192)* * at java.net.InetAddress.getAllByName(InetAddress.java:1126)* * at okhttp3.Dns$1.lookup(Dns.java:39)* * at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)* * at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)* * at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)* * at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)* * at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)* * at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)* * at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)* * at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)* * at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)* * at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)* * at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)* *
handling Remote dependencies for spark-submit in spark 2.3 with kubernetes
Im trying to run spark-submit to kubernetes cluster with spark 2.3 docker container image The challenge im facing is application have a mainapplication.jar and other dependency files & jars which are located in Remote location like AWS s3 ,but as per spark 2.3 documentation there is something called kubernetes init-container to download remote dependencies but in this case im not creating any Podspec to include init-containers in kubernetes, as per documentation Spark 2.3 spark/kubernetes internally creates Pods (driver,executor) So not sure how can i use init-container for spark-submit when there are remote dependencies. https://spark.apache.org/docs/latest/running-on-kubernetes.html#using-remote-dependencies Please suggest
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Unsubscribe
Unsubscribe
Executor not getting added SparkUI & Spark Eventlog in deploymode:cluster
Hi all, Im performing spark submit using Spark rest api POST operation on 6066 port with below config > Launch Command: > "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.141-1.b16.el7_3.x86_64/jre/bin/java" > "-cp" "/usr/local/spark/conf/:/usr/local/spark/jars/*" "-Xmx4096M" > "-Dspark.eventLog.enabled=true" > "-Dspark.app.name=WorkflowApp" > "-Dspark.submit.deployMode=cluster" > "-Dspark.local.dir=/data0,/data1,/data2,/data3" > "-Dspark.executor.cores=2" "-Dspark.master=spark://:7077" > "-Dspark.serializer=org.apache.spark.serializer.KryoSerializer" > "-Dspark.jars=s3a://<***>.jar" "-Dspark.driver.supervise=false" > "-Dspark.history.fs.logDirectory=s3a://<*>/" > "-Dspark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256" > "-Dspark.driver.memory=4G" "-Dspark.executor.memory=4G" > "-Dspark.eventLog.dir=s3a://<*>/" > "org.apache.spark.deploy.worker.DriverWrapper" "spark://Worker@<***>" > "/usr/local/spark/work/driver-<***>.jar" "MyApp" "-c" "s3a://<***>" when i looked into Spark eventlog below is what i observed {"Event":"SparkListenerExecutorAdded","Timestamp":1510633498623,"Executor ID":"driver","Executor Info":{"Host":"localhost","Total Cores":2,"Log Urls":{}}} "spark.master":"local[*]" Though i ran in deployMode as cluster the slave ip is not shown in Host section & spark.master is shown as local[*] above ,because of this the job is running only on driver and therefore when job is submitted its not showing up in http://:8080 under Running and Completed applications and it shows only under Running Drivers & Completed Drivers. Please suggest The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Spark http: Not showing completed apps
Hi, I'm using spark standalone in aws ec2 .And I'm using spark rest API http::8080/Json to get completed apps but the Json completed apps as empty array though the job ran successfully.
Re: Select entire row based on a logic applied on 2 columns across multiple rows
@Andres I need latest but it should less than 10 months based income_age column and don't want to use sql here On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi <iaiva...@gmail.com> wrote: > Hi, if you need the last value from income in window function you can use > last_value. > No tested but meaby with @ayan sql > > spark.sql("select *, row_number(), last_value(income) over (partition by > id order by income_age_ts desc) r from t") > > > On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> @ayan, >> >> Thanks for your response >> >> I would like to have functions in this case calculateIncome and the >> reason why I need function is to reuse in other parts of the application >> ..that's the reason I'm planning for mapgroups with function as argument >> which takes rowiterator ..but not sure if this is the best to implement as >> my initial dataframe is very large >> >> On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote: >> >>> Hi >>> >>> the tool you are looking for is window function. Example: >>> >>> >>> df.show() >>> +++---+--+-+ >>> |JoinDate|dept| id|income|income_age_ts| >>> +++---+--+-+ >>> | 4/20/13| ES|101| 19000| 4/20/17| >>> | 4/20/13| OS|101| 1| 10/3/15| >>> | 4/20/12| DS|102| 13000| 5/9/17| >>> | 4/20/12| CS|102| 12000| 5/8/17| >>> | 4/20/10| EQ|103| 1| 5/9/17| >>> | 4/20/10| MD|103| 9000| 5/8/17| >>> +++---+--+-+ >>> >>> >>> res = spark.sql("select *, row_number() over (partition by id order >>> by income_age_ts desc) r from t") >>> >>> res.show() >>> +++---+--+-+---+ >>> |JoinDate|dept| id|income|income_age_ts| r| >>> +++---+--+-+---+ >>> | 4/20/10| EQ|103| 1| 5/9/17| 1| >>> | 4/20/10| MD|103| 9000| 5/8/17| 2| >>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>> | 4/20/13| OS|101| 1| 10/3/15| 2| >>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>> | 4/20/12| CS|102| 12000| 5/8/17| 2| >>> +++---+--+-+---+ >>> >>> >>> res = spark.sql("select * from (select *, row_number() over >>> (partition by id order by income_age_ts desc) r from t) x where r=1") >>> >>> res.show() >>> +++---+--+-+---+ >>> |JoinDate|dept| id|income|income_age_ts| r| >>> +++---+--+-----+---+ >>> | 4/20/10| EQ|103| 1| 5/9/17| 1| >>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>> +++---+--+-+---+ >>> >>> This should be better because it uses all in-built optimizations in >>> Spark. >>> >>> Best >>> Ayan >>> >>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com >>> > wrote: >>> >>>> Please click on unnamed text/html link for better view >>>> >>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> -- Forwarded message - >>>>> From: Mamillapalli, Purna Pradeep < >>>>> purnapradeep.mamillapa...@capitalone.com> >>>>> Date: Tue, Aug 29, 2017 at 8:08 PM >>>>> Subject: Spark question >>>>> To: purna pradeep <purna2prad...@gmail.com> >>>>> >>>>> Below is the input Dataframe(In real this is a very large Dataframe) >>>>> >>>>> >>>>> >>>>> EmployeeID >>>>> >>>>> INCOME >>>>> >>>>> INCOME AGE TS >>>>> >>>>> JoinDate >>>>> >>>>> Dept >>>>> >>>>> 101 >>>>> >>>>> 19000 >>>>> >>>>> 4/20/17 >>>>> >>>>> 4/20/13 >>>>> >>>>> ES >>>>> >>>>> 101 >>>>> >>>>> 1 >>>>> >>>>> 10/3/15 >>>>> >>>>> 4/20/13 >>>>
Re: Select entire row based on a logic applied on 2 columns across multiple rows
@ayan, Thanks for your response I would like to have functions in this case calculateIncome and the reason why I need function is to reuse in other parts of the application ..that's the reason I'm planning for mapgroups with function as argument which takes rowiterator ..but not sure if this is the best to implement as my initial dataframe is very large On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote: > Hi > > the tool you are looking for is window function. Example: > > >>> df.show() > +++---+--+-+ > |JoinDate|dept| id|income|income_age_ts| > +++---+--+-+ > | 4/20/13| ES|101| 19000| 4/20/17| > | 4/20/13| OS|101| 1| 10/3/15| > | 4/20/12| DS|102| 13000| 5/9/17| > | 4/20/12| CS|102| 12000| 5/8/17| > | 4/20/10| EQ|103| 1| 5/9/17| > | 4/20/10| MD|103| 9000| 5/8/17| > +++---+--+-+ > > >>> res = spark.sql("select *, row_number() over (partition by id order by > income_age_ts desc) r from t") > >>> res.show() > +++---+--+-+---+ > |JoinDate|dept| id|income|income_age_ts| r| > +++---+--+-+---+ > | 4/20/10| EQ|103| 1| 5/9/17| 1| > | 4/20/10| MD|103| 9000| 5/8/17| 2| > | 4/20/13| ES|101| 19000| 4/20/17| 1| > | 4/20/13| OS|101| 1| 10/3/15| 2| > | 4/20/12| DS|102| 13000| 5/9/17| 1| > | 4/20/12| CS|102| 12000| 5/8/17| 2| > +++---+--+-+---+ > > >>> res = spark.sql("select * from (select *, row_number() over (partition > by id order by income_age_ts desc) r from t) x where r=1") > >>> res.show() > +++---+--+-+---+ > |JoinDate|dept| id|income|income_age_ts| r| > +++---+--+-+---+ > | 4/20/10| EQ|103| 1| 5/9/17| 1| > | 4/20/13| ES|101| 19000| 4/20/17| 1| > | 4/20/12| DS|102| 13000| 5/9/17| 1| > +++---+--+-+---+ > > This should be better because it uses all in-built optimizations in Spark. > > Best > Ayan > > On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Please click on unnamed text/html link for better view >> >> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> >>> -- Forwarded message - >>> From: Mamillapalli, Purna Pradeep < >>> purnapradeep.mamillapa...@capitalone.com> >>> Date: Tue, Aug 29, 2017 at 8:08 PM >>> Subject: Spark question >>> To: purna pradeep <purna2prad...@gmail.com> >>> >>> Below is the input Dataframe(In real this is a very large Dataframe) >>> >>> >>> >>> EmployeeID >>> >>> INCOME >>> >>> INCOME AGE TS >>> >>> JoinDate >>> >>> Dept >>> >>> 101 >>> >>> 19000 >>> >>> 4/20/17 >>> >>> 4/20/13 >>> >>> ES >>> >>> 101 >>> >>> 1 >>> >>> 10/3/15 >>> >>> 4/20/13 >>> >>> OS >>> >>> 102 >>> >>> 13000 >>> >>> 5/9/17 >>> >>> 4/20/12 >>> >>> DS >>> >>> 102 >>> >>> 12000 >>> >>> 5/8/17 >>> >>> 4/20/12 >>> >>> CS >>> >>> 103 >>> >>> 1 >>> >>> 5/9/17 >>> >>> 4/20/10 >>> >>> EQ >>> >>> 103 >>> >>> 9000 >>> >>> 5/8/15 >>> >>> 4/20/10 >>> >>> MD >>> >>> Get the latest income of an employee which has Income_age ts <10 months >>> >>> Expected output Dataframe >>> >>> EmployeeID >>> >>> INCOME >>> >>> INCOME AGE TS >>> >>> JoinDate >>> >>> Dept >>> >>> 101 >>> >>> 19000 >>> >>> 4/20/17 >>> >>> 4/20/13 >>> >>> ES >>> >>> 102 >>> >>> 13000 >>> >>> 5/9/17 >>> >>> 4/20/12 >>> >>> DS >>> >>> 103 >>> >>> 1 >>> >>> 5/9/17 >>> >>> 4/20/10 >>> >
Re: Select entire row based on a logic applied on 2 columns across multiple rows
Please click on unnamed text/html link for better view On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com> wrote: > > -- Forwarded message - > From: Mamillapalli, Purna Pradeep < > purnapradeep.mamillapa...@capitalone.com> > Date: Tue, Aug 29, 2017 at 8:08 PM > Subject: Spark question > To: purna pradeep <purna2prad...@gmail.com> > > Below is the input Dataframe(In real this is a very large Dataframe) > > > > EmployeeID > > INCOME > > INCOME AGE TS > > JoinDate > > Dept > > 101 > > 19000 > > 4/20/17 > > 4/20/13 > > ES > > 101 > > 1 > > 10/3/15 > > 4/20/13 > > OS > > 102 > > 13000 > > 5/9/17 > > 4/20/12 > > DS > > 102 > > 12000 > > 5/8/17 > > 4/20/12 > > CS > > 103 > > 1 > > 5/9/17 > > 4/20/10 > > EQ > > 103 > > 9000 > > 5/8/15 > > 4/20/10 > > MD > > Get the latest income of an employee which has Income_age ts <10 months > > Expected output Dataframe > > EmployeeID > > INCOME > > INCOME AGE TS > > JoinDate > > Dept > > 101 > > 19000 > > 4/20/17 > > 4/20/13 > > ES > > 102 > > 13000 > > 5/9/17 > > 4/20/12 > > DS > > 103 > > 1 > > 5/9/17 > > 4/20/10 > > EQ > > > Below is what im planning to implement > > > > case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, > *JOINDATE*: Int,DEPT:String) > > > > *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( > *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, > *"Date"*). add(*"DEPT"*,*"String"*) > > > > *//Reading from the File **import *sparkSession.implicits._ > > *val *readEmpFile = sparkSession.read > .option(*"sep"*, *","*) > .schema(empSchema) > .csv(INPUT_DIRECTORY) > > > *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] > > > *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* > EmployeeID*) > > > *val *k = groupByDf.mapGroups((key,value) => performETL(value)) > > > > > > *def *performETL(empData: Iterator[employee]) : new employee = { > > *val *empList = empData.toList > > > *//calculate income has Logic to figureout latest income for an account > and returns latest income val *income = calculateIncome(empList) > > > *for *(i <- empList) { > > *val *row = i > > *return new *employee(row.EmployeeID, row.INCOMEAGE , income) > } > *return "Done"* > > > > } > > > > Is this a better approach or even the right approach to implement the > same.If not please suggest a better way to implement the same? > > > > -- > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >
Re: use WithColumn with external function in a java jar
Thanks, I'll check it out. On Mon, Aug 28, 2017 at 10:22 PM Praneeth Gayam <praneeth.ga...@gmail.com> wrote: > You can create a UDF which will invoke your java lib > > def calculateExpense: UserDefinedFunction = udf((pexpense: String, cexpense: > String) => new MyJava().calculateExpense(pexpense.toDouble, > cexpense.toDouble)) > > > > > > On Tue, Aug 29, 2017 at 6:53 AM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> I have data in a DataFrame with below columns >> >> 1)Fileformat is csv >> 2)All below column datatypes are String >> >> employeeid,pexpense,cexpense >> >> Now I need to create a new DataFrame which has new column called >> `expense`, which is calculated based on columns `pexpense`, `cexpense`. >> >> The tricky part is the calculation algorithm is not an **UDF** function >> which I created, but it's an external function that needs to be imported >> from a Java library which takes primitive types as arguments - in this case >> `pexpense`, `cexpense` - to calculate the value required for new column. >> >> The external function signature >> >> public class MyJava >> >> { >> >> public Double calculateExpense(Double pexpense, Double cexpense) { >>// calculation >> } >> >> } >> >> So how can I invoke that external function to create a new calculated >> column. Can I register that external function as UDF in my Spark >> application? >> >> Stackoverflow reference >> >> >> https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function >> >> >> >> >> >> >
Select entire row based on a logic applied on 2 columns across multiple rows
-- Forwarded message - From: Mamillapalli, Purna Pradeep <purnapradeep.mamillapa...@capitalone.com> Date: Tue, Aug 29, 2017 at 8:08 PM Subject: Spark question To: purna pradeep <purna2prad...@gmail.com> Below is the input Dataframe(In real this is a very large Dataframe) EmployeeID INCOME INCOME AGE TS JoinDate Dept 101 19000 4/20/17 4/20/13 ES 101 1 10/3/15 4/20/13 OS 102 13000 5/9/17 4/20/12 DS 102 12000 5/8/17 4/20/12 CS 103 1 5/9/17 4/20/10 EQ 103 9000 5/8/15 4/20/10 MD Get the latest income of an employee which has Income_age ts <10 months Expected output Dataframe EmployeeID INCOME INCOME AGE TS JoinDate Dept 101 19000 4/20/17 4/20/13 ES 102 13000 5/9/17 4/20/12 DS 103 1 5/9/17 4/20/10 EQ Below is what im planning to implement case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, *JOINDATE*: Int,DEPT:String) *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,*"Date"*). add(*"DEPT"*,*"String"*) *//Reading from the File **import *sparkSession.implicits._ *val *readEmpFile = sparkSession.read .option(*"sep"*, *","*) .schema(empSchema) .csv(INPUT_DIRECTORY) *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* EmployeeID*) *val *k = groupByDf.mapGroups((key,value) => performETL(value)) *def *performETL(empData: Iterator[employee]) : new employee = { *val *empList = empData.toList *//calculate income has Logic to figureout latest income for an account and returns latest income val *income = calculateIncome(empList) *for *(i <- empList) { *val *row = i *return new *employee(row.EmployeeID, row.INCOMEAGE , income) } *return "Done"* } Is this a better approach or even the right approach to implement the same.If not please suggest a better way to implement the same? -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
use WithColumn with external function in a java jar
I have data in a DataFrame with below columns 1)Fileformat is csv 2)All below column datatypes are String employeeid,pexpense,cexpense Now I need to create a new DataFrame which has new column called `expense`, which is calculated based on columns `pexpense`, `cexpense`. The tricky part is the calculation algorithm is not an **UDF** function which I created, but it's an external function that needs to be imported from a Java library which takes primitive types as arguments - in this case `pexpense`, `cexpense` - to calculate the value required for new column. The external function signature public class MyJava { public Double calculateExpense(Double pexpense, Double cexpense) { // calculation } } So how can I invoke that external function to create a new calculated column. Can I register that external function as UDF in my Spark application? Stackoverflow reference https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function
Re: Restart streaming query spark 2.1 structured streaming
And also is query.stop() is graceful stop operation?what happens to already received data will it be processed ? On Tue, Aug 15, 2017 at 7:21 PM purna pradeep <purna2prad...@gmail.com> wrote: > Ok thanks > > Few more > > 1.when I looked into the documentation it says onQueryprogress is not > threadsafe ,So Is this method would be the right place to refresh cache?and > no need to restart query if I choose listener ? > > The methods are not thread-safe as they may be called from different > threads. > > > > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala > > > > 2.if I use streamingquerylistner onqueryprogress my understanding is > method will be executed only when the query is in progress so if I refresh > data frame here without restarting query will it impact application ? > > 3.should I use unpersist (Boolean) blocking method or async method > unpersist() as the data size is big. > > I feel your solution is better as it stops query --> refresh cache --> > starts query if I compromise on little downtime even cached dataframe is > huge .I'm not sure how listener behaves as it's asynchronous, correct me if > I'm wrong. > > On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com> > wrote: > >> Both works. The asynchronous method with listener will have less of down >> time, just that the first trigger/batch after the asynchronous >> unpersist+persist will probably take longer as it has to reload the data. >> >> >> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> Thanks tathagata das actually I'm planning to something like this >>> >>> activeQuery.stop() >>> >>> //unpersist and persist cached data frame >>> >>> df.unpersist() >>> >>> //read the updated data //data size of df is around 100gb >>> >>> df.persist() >>> >>> activeQuery = startQuery() >>> >>> >>> the cached data frame size around 100gb ,so the question is this the >>> right place to refresh this huge cached data frame ? >>> >>> I'm also trying to refresh cached data frame in onqueryprogress() method >>> in a class which extends StreamingQuerylistner >>> >>> Would like to know which is the best place to refresh cached data frame >>> and why >>> >>> Thanks again for the below response >>> >>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> You can do something like this. >>>> >>>> >>>> def startQuery(): StreamingQuery = { >>>>// create your streaming dataframes >>>>// start the query with the same checkpoint directory} >>>> >>>> // handle to the active queryvar activeQuery: StreamingQuery = null >>>> while(!stopped) { >>>> >>>> if (activeQuery = null) { // if query not active, start query >>>> activeQuery = startQuery() >>>> >>>>} else if (shouldRestartQuery()) { // check your condition and >>>> restart query >>>> activeQuery.stop() >>>> activeQuery = startQuery() >>>>} >>>> >>>>activeQuery.awaitTermination(100) // wait for 100 ms. >>>>// if there is any error it will throw exception and quit the loop >>>>// otherwise it will keep checking the condition every 100ms} >>>> >>>> >>>> >>>> >>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com >>>> > wrote: >>>> >>>>> Thanks Michael >>>>> >>>>> I guess my question is little confusing ..let me try again >>>>> >>>>> >>>>> I would like to restart streaming query programmatically while my >>>>> streaming application is running based on a condition and why I want to do >>>>> this >>>>> >>>>> I want to refresh a cached data frame based on a condition and the >>>>> best way to do this restart streaming query suggested by Tdas below for >>>>> similar problem >>>>> >>>>> >>>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >>>>> >>&g
Re: Restart streaming query spark 2.1 structured streaming
Ok thanks Few more 1.when I looked into the documentation it says onQueryprogress is not threadsafe ,So Is this method would be the right place to refresh cache?and no need to restart query if I choose listener ? The methods are not thread-safe as they may be called from different threads. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala 2.if I use streamingquerylistner onqueryprogress my understanding is method will be executed only when the query is in progress so if I refresh data frame here without restarting query will it impact application ? 3.should I use unpersist (Boolean) blocking method or async method unpersist() as the data size is big. I feel your solution is better as it stops query --> refresh cache --> starts query if I compromise on little downtime even cached dataframe is huge .I'm not sure how listener behaves as it's asynchronous, correct me if I'm wrong. On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com> wrote: > Both works. The asynchronous method with listener will have less of down > time, just that the first trigger/batch after the asynchronous > unpersist+persist will probably take longer as it has to reload the data. > > > On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Thanks tathagata das actually I'm planning to something like this >> >> activeQuery.stop() >> >> //unpersist and persist cached data frame >> >> df.unpersist() >> >> //read the updated data //data size of df is around 100gb >> >> df.persist() >> >> activeQuery = startQuery() >> >> >> the cached data frame size around 100gb ,so the question is this the >> right place to refresh this huge cached data frame ? >> >> I'm also trying to refresh cached data frame in onqueryprogress() method >> in a class which extends StreamingQuerylistner >> >> Would like to know which is the best place to refresh cached data frame >> and why >> >> Thanks again for the below response >> >> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> You can do something like this. >>> >>> >>> def startQuery(): StreamingQuery = { >>>// create your streaming dataframes >>>// start the query with the same checkpoint directory} >>> >>> // handle to the active queryvar activeQuery: StreamingQuery = null >>> while(!stopped) { >>> >>>if (activeQuery = null) { // if query not active, start query >>> activeQuery = startQuery() >>> >>>} else if (shouldRestartQuery()) { // check your condition and >>> restart query >>> activeQuery.stop() >>> activeQuery = startQuery() >>>} >>> >>>activeQuery.awaitTermination(100) // wait for 100 ms. >>>// if there is any error it will throw exception and quit the loop >>>// otherwise it will keep checking the condition every 100ms} >>> >>> >>> >>> >>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com> >>> wrote: >>> >>>> Thanks Michael >>>> >>>> I guess my question is little confusing ..let me try again >>>> >>>> >>>> I would like to restart streaming query programmatically while my >>>> streaming application is running based on a condition and why I want to do >>>> this >>>> >>>> I want to refresh a cached data frame based on a condition and the best >>>> way to do this restart streaming query suggested by Tdas below for similar >>>> problem >>>> >>>> >>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >>>> >>>> I do understand that checkpoint if helps in recovery and failures but I >>>> would like to know "how to restart streaming query programmatically without >>>> stopping my streaming application" >>>> >>>> In place of query.awaittermination should I need to have an logic to >>>> restart query? Please suggest >>>> >>>> >>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust < >>>> mich...@databricks.com> wrote: >>>> >>>>> See >>>>> https://spark.apache.org/docs/latest/structur
Re: Restart streaming query spark 2.1 structured streaming
Thanks tathagata das actually I'm planning to something like this activeQuery.stop() //unpersist and persist cached data frame df.unpersist() //read the updated data //data size of df is around 100gb df.persist() activeQuery = startQuery() the cached data frame size around 100gb ,so the question is this the right place to refresh this huge cached data frame ? I'm also trying to refresh cached data frame in onqueryprogress() method in a class which extends StreamingQuerylistner Would like to know which is the best place to refresh cached data frame and why Thanks again for the below response On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <tathagata.das1...@gmail.com> wrote: > You can do something like this. > > > def startQuery(): StreamingQuery = { >// create your streaming dataframes >// start the query with the same checkpoint directory} > > // handle to the active queryvar activeQuery: StreamingQuery = null > while(!stopped) { > >if (activeQuery = null) { // if query not active, start query > activeQuery = startQuery() > >} else if (shouldRestartQuery()) { // check your condition and > restart query > activeQuery.stop() > activeQuery = startQuery() >} > >activeQuery.awaitTermination(100) // wait for 100 ms. >// if there is any error it will throw exception and quit the loop >// otherwise it will keep checking the condition every 100ms} > > > > > On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Thanks Michael >> >> I guess my question is little confusing ..let me try again >> >> >> I would like to restart streaming query programmatically while my >> streaming application is running based on a condition and why I want to do >> this >> >> I want to refresh a cached data frame based on a condition and the best >> way to do this restart streaming query suggested by Tdas below for similar >> problem >> >> >> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >> >> I do understand that checkpoint if helps in recovery and failures but I >> would like to know "how to restart streaming query programmatically without >> stopping my streaming application" >> >> In place of query.awaittermination should I need to have an logic to >> restart query? Please suggest >> >> >> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com> >> wrote: >> >>> See >>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing >>> >>> Though I think that this currently doesn't work with the console sink. >>> >>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>>> >>>>> I'm trying to restart a streaming query to refresh cached data frame >>>>> >>>>> Where and how should I restart streaming query >>>>> >>>> >>>> >>>> val sparkSes = SparkSession >>>> >>>> .builder >>>> >>>> .config("spark.master", "local") >>>> >>>> .appName("StreamingCahcePoc") >>>> >>>> .getOrCreate() >>>> >>>> >>>> >>>> import sparkSes.implicits._ >>>> >>>> >>>> >>>> val dataDF = sparkSes.readStream >>>> >>>> .schema(streamSchema) >>>> >>>> .csv("testData") >>>> >>>> >>>> >>>> >>>> >>>>val query = counts.writeStream >>>> >>>> .outputMode("complete") >>>> >>>> .format("console") >>>> >>>> .start() >>>> >>>> >>>> query.awaittermination() >>>> >>>> >>>> >>>>> >>>>> >>>>> >>> >
Re: Restart streaming query spark 2.1 structured streaming
Thanks Michael I guess my question is little confusing ..let me try again I would like to restart streaming query programmatically while my streaming application is running based on a condition and why I want to do this I want to refresh a cached data frame based on a condition and the best way to do this restart streaming query suggested by Tdas below for similar problem http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e I do understand that checkpoint if helps in recovery and failures but I would like to know "how to restart streaming query programmatically without stopping my streaming application" In place of query.awaittermination should I need to have an logic to restart query? Please suggest On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com> wrote: > See > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing > > Though I think that this currently doesn't work with the console sink. > > On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Hi, >> >>> >>> I'm trying to restart a streaming query to refresh cached data frame >>> >>> Where and how should I restart streaming query >>> >> >> >> val sparkSes = SparkSession >> >> .builder >> >> .config("spark.master", "local") >> >> .appName("StreamingCahcePoc") >> >> .getOrCreate() >> >> >> >> import sparkSes.implicits._ >> >> >> >> val dataDF = sparkSes.readStream >> >> .schema(streamSchema) >> >> .csv("testData") >> >> >> >> >> >>val query = counts.writeStream >> >> .outputMode("complete") >> >> .format("console") >> >> .start() >> >> >> query.awaittermination() >> >> >> >>> >>> >>> >
Restart streaming query spark 2.1 structured streaming
Hi, > > I'm trying to restart a streaming query to refresh cached data frame > > Where and how should I restart streaming query > val sparkSes = SparkSession .builder .config("spark.master", "local") .appName("StreamingCahcePoc") .getOrCreate() import sparkSes.implicits._ val dataDF = sparkSes.readStream .schema(streamSchema) .csv("testData") val query = counts.writeStream .outputMode("complete") .format("console") .start() query.awaittermination() > > >
StreamingQueryListner spark structered Streaming
Im working on structered streaming application wherein im reading from Kafka as stream and for each batch of streams i need to perform S3 lookup file (which is nearly 200gb) to fetch some attributes .So im using df.persist() (basically caching the lookup) but i need to refresh the dataframe as the S3 lookup data changes frequently.im using below code class RefreshcachedDF(sparkSession: SparkSession) extends StreamingQueryListener { override def onQueryStarted(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent): Unit = {} override def onQueryTerminated(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent): Unit = {} override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { val currTime = System.currentTimeMillis() if (currTime > (latestrefreshtime mentioned in a globaltempview)) { //oldDF is a cached Dataframe created from GlobalTempView which is of size 150GB. oldDF.unpersist() //I guess this is async call ,should i use unpersist(true) which is blocking?and is it safe ? val inputDf: DataFrame = readFile(spec, sparkSession) val recreateddf = inputDf.persist() val count = recreateddf.count() } } } } Is the above approach is a better solution to refresh cached dataframe? and the trigger for this refresh is will store the expirydate of cache for S3 in a globaltempview . Note:S3 is one lookup source but i do have other sources which has data size of 20 to 30 GB - So the question is this the right place to refresh the cached df ? - if yes should i use blocking or non-blocking unpersist method as the data is huge 15GB? - For similar issue i see below response from Tdas with subject as Re: Refreshing a persisted RDD "Yes, you will have to recreate the streaming Dataframe along with the static Dataframe, and restart the query. There isnt a currently feasible to do this without a query restart. But restarting a query WITHOUT restarting the whole application + spark cluster, is reasonably fast. If your applicatoin can tolerate 10 second latencies, then stopping and restarting a query within the same Spark application is a reasonable solution." [http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/browser] [1]: http://SparkMailingList So if thats better solution should i restart query as below query.processAllavaialble() query.stop() df.unpersist() val inputDf: DataFrame = readFile(spec, sparkSession) //read file from S3 or anyother source val recreateddf = inputDf.persist() query.start() when i looked into spark documentation of above methods void processAllAvailable() ///documentation says This method is intended for testing/// Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. Note that in the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a Source prior to invocation. (i.e. getOffset must immediately reflect the addition). stop() Stops the execution of this query if it is running. This method blocks until the threads performing execution has stopped. https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQuery.html#processAllAvailable() Please suggest a better approach to refresh the cache.