Hi,  I am trying to run a Spark processor on Spring XD for streaming operation.

The spark processor module on Spring XD works when spark is pointing to local. 
The processor fails to run when we point spark to spark standalone (running on 
the same machine) or yarn-client.  Is it possible to run spark processor on 
spark standalone or yarn inside spring XD or is spark local the only option 
here ?

The processor module is:

class WordCount extends Processor[String, (String, Int)] {

  def process(input: ReceiverInputDStream[String]): DStream[(String, Int)] = {
      val words = input.flatMap(_.split(" "))
      val pairs = words.map(word => (word, 1))
      val wordCounts = pairs.reduceByKey(_ + _)
      wordCounts
  }

  @SparkConfig
  def properties : Properties = {
    val props = new Properties()
    // Any specific Spark configuration properties would go here.
    // These properties always get the highest precedence
    //props.setProperty("spark.master", "spark://a.b.c.d:7077")
    props.setProperty("spark.master", "spark://abcd.hadoop.ambari:7077")
    props
  }

}

Below is the error log that I get:

// Error Log
====================================================================
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'log' for stream 
'spark-streaming-word-count'
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@6dbc4f81 
moduleName = 'log', moduleLabel = 'log', group = 'spark-streaming-word-count', 
sourceChannelName = [null], sinkChannelName = [null], index = 2, type = sink, 
parameters = map[[empty]], children = list[[empty]]]
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Path cache event: 
path=/deployments/modules/allocated/4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06/spark-streaming-word-count.processor.processor.1,
 type=CHILD_ADDED
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'processor' for stream 
'spark-streaming-word-count'
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@5e16dafb 
moduleName = 'scala-word-count', moduleLabel = 'processor', group = 
'spark-streaming-word-count', sourceChannelName = [null], sinkChannelName = 
[null], index = 1, type = processor, parameters = map[[empty]], children = 
list[[empty]]]
2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN DeploymentsPathChildrenCache-0 
util.NativeCodeLoader - Unable to load native-hadoop library for your 
platform... using builtin-java classes where applicable
2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN 
sparkDriver-akka.actor.default-dispatcher-3 remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] 
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2015-09-16T14:29:09+0530 1.2.0.RELEASE WARN 
sparkDriver-akka.actor.default-dispatcher-4 remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] 
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Path cache event: 
path=/deployments/modules/allocated/8d07cdba-557e-458a-9225-b90e5a5778ce/spark-streaming-word-count.source.http.1,
 type=CHILD_ADDED
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'http' for stream 
'spark-streaming-word-count'
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@610e43b0 
moduleName = 'http', moduleLabel = 'http', group = 
'spark-streaming-word-count', sourceChannelName = [null], sinkChannelName = 
[null], index = 0, type = source, parameters = map[[empty]], children = 
list[[empty]]]
2015-09-16T14:29:19+0530 1.2.0.RELEASE INFO DeploymentSupervisor-0 
zk.ZKStreamDeploymentHandler - Deployment status for stream 
'spark-streaming-word-count': DeploymentStatus{state=failed,error(s)=Deployment 
of module 'ModuleDeploymentKey{stream='spark-streaming-word-count', 
type=processor, label='processor'}' to container 
'4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06' timed out after 30000 ms}
2015-09-16T14:29:29+0530 1.2.0.RELEASE WARN 
sparkDriver-akka.actor.default-dispatcher-4 remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] 
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2015-09-16T14:29:49+0530 1.2.0.RELEASE ERROR 
sparkDriver-akka.actor.default-dispatcher-3 cluster.SparkDeploySchedulerBackend 
- Application has been killed. Reason: All masters are unresponsive! Giving up.
2015-09-16T14:29:49+0530 1.2.0.RELEASE WARN DeploymentsPathChildrenCache-0 
cluster.SparkDeploySchedulerBackend - Application ID is not initialized yet.
2015-09-16T14:29:49+0530 1.2.0.RELEASE ERROR 
sparkDriver-akka.actor.default-dispatcher-3 scheduler.TaskSchedulerImpl - 
Exiting due to error from cluster scheduler: All masters are unresponsive! 
Giving up.
2015-09-16T14:29:50+0530 1.2.0.RELEASE INFO DeploymentSupervisor-0 
zk.ContainerListener - Path cache event: 
path=/containers/4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06, type=CHILD_REMOVED
2015-09-16T14:29:50+0530 1.2.0.RELEASE INFO DeploymentSupervisor-0 
zk.ContainerListener - Container departed: 
Container{name='4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06', attributes={groups=, 
host=abcd.hadoop.ambari, id=4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06, 
managementPort=54998, ip=a.b.c.d, pid=4597}}

Reply via email to