[
https://issues.apache.org/jira/browse/BEAM-313?focusedWorklogId=138351&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-138351
]
ASF GitHub Bot logged work on BEAM-313:
---------------------------------------
Author: ASF GitHub Bot
Created on: 27/Aug/18 11:39
Start Date: 27/Aug/18 11:39
Worklog Time Spent: 10m
Work Description: amarouni commented on issue #401: [BEAM-313] Enable the
use of an existing spark context with the SparkPipelineRunner
URL: https://github.com/apache/beam/pull/401#issuecomment-416199029
@kohlerm
Here's an old Scala snippet that shows how to use Beam & SJS, it's based on
old versions of SJS & Beam so it probably won't compile with new SJS/Beam
versions but you'll get the idea :
```scala
import com.typesafe.config.Config
import org.apache.beam.runners.spark.{ SparkContextOptions, SparkRunner }
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.transforms.Create
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import spark.jobserver.{ SparkJob, SparkJobInvalid, SparkJobValid,
SparkJobValidation }
import scala.collection.JavaConversions
import scala.util.Try
/**
* Beam wordcount test. Returns the word count of a fixed String seq.
*/
object BeamWordCount extends SparkJob {
override def validate(sc: SparkContext, config: Config):
SparkJobValidation = {
Try(config.getStringList("wordList"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No wordList in context config"))
}
override def runJob(sc: SparkContext, jobConfig: Config): Any = {
// Input test list
val inputBuffer =
scala.collection.JavaConversions.asScalaBuffer(jobConfig.getStringList("wordList"))
val WORDS = inputBuffer.toList
// Pipeline options
val sparkPipelineOptions =
PipelineOptionsFactory.as(classOf[SparkContextOptions])
sparkPipelineOptions.setAppName("Beam WordCount test")
sparkPipelineOptions.setRunner(classOf[SparkRunner])
sparkPipelineOptions.setUsesProvidedSparkContext(true)
sparkPipelineOptions.setProvidedSparkContext(new JavaSparkContext(sc))
// Pipeline
val pipeline = Pipeline.create(sparkPipelineOptions)
// Input + processing + Output
val output = pipeline
.apply(Create.of(JavaConversions.seqAsJavaList(WORDS)).withCoder(StringUtf8Coder.of()))
.apply(new CountWords())
// Result
// val result: EvaluationResult =
pipeline.run().asInstanceOf[EvaluationResult]
// Run job & wait until finish
pipeline.run().waitUntilFinish()
}
}
```
It'd be nice to contribute this as new documentation if you manage to get it
to work.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 138351)
Time Spent: 1h 10m (was: 1h)
> Enable the use of an existing spark context with the SparkPipelineRunner
> ------------------------------------------------------------------------
>
> Key: BEAM-313
> URL: https://issues.apache.org/jira/browse/BEAM-313
> Project: Beam
> Issue Type: New Feature
> Components: runner-spark
> Reporter: Abbass Marouni
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Fix For: 0.3.0-incubating
>
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> The general use case is that the SparkPipelineRunner creates its own Spark
> context and uses it for the pipeline execution.
> Another alternative is to provide the SparkPipelineRunner with an existing
> spark context. This can be interesting for a lot of use cases where the Spark
> context is managed outside of beam (context reuse, advanced context
> management, spark job server, ...).
> code sample :
> https://github.com/amarouni/incubator-beam/commit/fe0bb517bf0ccde07ef5a61f3e44df695b75f076
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)