[ 
https://issues.apache.org/jira/browse/BEAM-313?focusedWorklogId=138351&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-138351
 ]

ASF GitHub Bot logged work on BEAM-313:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Aug/18 11:39
            Start Date: 27/Aug/18 11:39
    Worklog Time Spent: 10m 
      Work Description: amarouni commented on issue #401: [BEAM-313] Enable the 
use of an existing spark context with the SparkPipelineRunner
URL: https://github.com/apache/beam/pull/401#issuecomment-416199029
 
 
   @kohlerm 
   
   Here's an old Scala snippet that shows how to use Beam & SJS, it's based on 
old versions of SJS & Beam so it probably won't compile with new SJS/Beam 
versions but you'll get the idea : 
   
   ```scala
   import com.typesafe.config.Config
   import org.apache.beam.runners.spark.{ SparkContextOptions, SparkRunner }
   import org.apache.beam.sdk.Pipeline
   import org.apache.beam.sdk.coders.StringUtf8Coder
   import org.apache.beam.sdk.options.PipelineOptionsFactory
   import org.apache.beam.sdk.transforms.Create
   import org.apache.spark.SparkContext
   import org.apache.spark.api.java.JavaSparkContext
   import spark.jobserver.{ SparkJob, SparkJobInvalid, SparkJobValid, 
SparkJobValidation }
   
   import scala.collection.JavaConversions
   import scala.util.Try
   
   /**
    * Beam wordcount test. Returns the word count of a fixed String seq.
    */
   object BeamWordCount extends SparkJob {
     override def validate(sc: SparkContext, config: Config): 
SparkJobValidation = {
   
       Try(config.getStringList("wordList"))
         .map(x => SparkJobValid)
         .getOrElse(SparkJobInvalid("No wordList in context config"))
     }
   
     override def runJob(sc: SparkContext, jobConfig: Config): Any = {
   
       // Input test list
       val inputBuffer = 
scala.collection.JavaConversions.asScalaBuffer(jobConfig.getStringList("wordList"))
       val WORDS = inputBuffer.toList
   
       // Pipeline options
       val sparkPipelineOptions = 
PipelineOptionsFactory.as(classOf[SparkContextOptions])
       sparkPipelineOptions.setAppName("Beam WordCount test")
       sparkPipelineOptions.setRunner(classOf[SparkRunner])
       sparkPipelineOptions.setUsesProvidedSparkContext(true)
       sparkPipelineOptions.setProvidedSparkContext(new JavaSparkContext(sc))
   
       // Pipeline
       val pipeline = Pipeline.create(sparkPipelineOptions)
   
       // Input + processing + Output
       val output = pipeline
         
.apply(Create.of(JavaConversions.seqAsJavaList(WORDS)).withCoder(StringUtf8Coder.of()))
         .apply(new CountWords())
   
       // Result
       // val result: EvaluationResult = 
pipeline.run().asInstanceOf[EvaluationResult]
   
       // Run job & wait until finish
       pipeline.run().waitUntilFinish()
     }
   }
   ```
   
   It'd be nice to contribute this as new documentation if you manage to get it 
to work. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 138351)
    Time Spent: 1h 10m  (was: 1h)

> Enable the use of an existing spark context with the SparkPipelineRunner
> ------------------------------------------------------------------------
>
>                 Key: BEAM-313
>                 URL: https://issues.apache.org/jira/browse/BEAM-313
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-spark
>            Reporter: Abbass Marouni
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>             Fix For: 0.3.0-incubating
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The general use case is that the SparkPipelineRunner creates its own Spark 
> context and uses it for the pipeline execution.
> Another alternative is to provide the SparkPipelineRunner with an existing 
> spark context. This can be interesting for a lot of use cases where the Spark 
> context is managed outside of beam (context reuse, advanced context 
> management, spark job server, ...).
> code sample : 
> https://github.com/amarouni/incubator-beam/commit/fe0bb517bf0ccde07ef5a61f3e44df695b75f076



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to