@kohlerm 

Here's an old Scala snippet that shows how to use Beam & SJS, it's based on old 
versions of SJS & Beam so it probably won't compile with new SJS/Beam versions 
but you'll get the idea : 

```scala
import com.typesafe.config.Config
import org.apache.beam.runners.spark.{ SparkContextOptions, SparkRunner }
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.transforms.Create
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import spark.jobserver.{ SparkJob, SparkJobInvalid, SparkJobValid, 
SparkJobValidation }

import scala.collection.JavaConversions
import scala.util.Try

/**
 * Beam wordcount test. Returns the word count of a fixed String seq.
 */
object BeamWordCount extends SparkJob {
  override def validate(sc: SparkContext, config: Config): SparkJobValidation = 
{

    Try(config.getStringList("wordList"))
      .map(x => SparkJobValid)
      .getOrElse(SparkJobInvalid("No wordList in context config"))
  }

  override def runJob(sc: SparkContext, jobConfig: Config): Any = {

    // Input test list
    val inputBuffer = 
scala.collection.JavaConversions.asScalaBuffer(jobConfig.getStringList("wordList"))
    val WORDS = inputBuffer.toList

    // Pipeline options
    val sparkPipelineOptions = 
PipelineOptionsFactory.as(classOf[SparkContextOptions])
    sparkPipelineOptions.setAppName("Beam WordCount test")
    sparkPipelineOptions.setRunner(classOf[SparkRunner])
    sparkPipelineOptions.setUsesProvidedSparkContext(true)
    sparkPipelineOptions.setProvidedSparkContext(new JavaSparkContext(sc))

    // Pipeline
    val pipeline = Pipeline.create(sparkPipelineOptions)

    // Input + processing + Output
    val output = pipeline
      
.apply(Create.of(JavaConversions.seqAsJavaList(WORDS)).withCoder(StringUtf8Coder.of()))
      .apply(new CountWords())

    // Result
    // val result: EvaluationResult = 
pipeline.run().asInstanceOf[EvaluationResult]

    // Run job & wait until finish
    pipeline.run().waitUntilFinish()
  }
}
```

It'd be nice to contribute this as new documentation if you manage to get it to 
work. 

[ Full content available at: https://github.com/apache/beam/pull/401 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to