@kohlerm
Here's an old Scala snippet that shows how to use Beam & SJS, it's based on old
versions of SJS & Beam so it probably won't compile with new SJS/Beam versions
but you'll get the idea :
```scala
import com.typesafe.config.Config
import org.apache.beam.runners.spark.{ SparkContextOptions, SparkRunner }
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.transforms.Create
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import spark.jobserver.{ SparkJob, SparkJobInvalid, SparkJobValid,
SparkJobValidation }
import scala.collection.JavaConversions
import scala.util.Try
/**
* Beam wordcount test. Returns the word count of a fixed String seq.
*/
object BeamWordCount extends SparkJob {
override def validate(sc: SparkContext, config: Config): SparkJobValidation =
{
Try(config.getStringList("wordList"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No wordList in context config"))
}
override def runJob(sc: SparkContext, jobConfig: Config): Any = {
// Input test list
val inputBuffer =
scala.collection.JavaConversions.asScalaBuffer(jobConfig.getStringList("wordList"))
val WORDS = inputBuffer.toList
// Pipeline options
val sparkPipelineOptions =
PipelineOptionsFactory.as(classOf[SparkContextOptions])
sparkPipelineOptions.setAppName("Beam WordCount test")
sparkPipelineOptions.setRunner(classOf[SparkRunner])
sparkPipelineOptions.setUsesProvidedSparkContext(true)
sparkPipelineOptions.setProvidedSparkContext(new JavaSparkContext(sc))
// Pipeline
val pipeline = Pipeline.create(sparkPipelineOptions)
// Input + processing + Output
val output = pipeline
.apply(Create.of(JavaConversions.seqAsJavaList(WORDS)).withCoder(StringUtf8Coder.of()))
.apply(new CountWords())
// Result
// val result: EvaluationResult =
pipeline.run().asInstanceOf[EvaluationResult]
// Run job & wait until finish
pipeline.run().waitUntilFinish()
}
}
```
It'd be nice to contribute this as new documentation if you manage to get it to
work.
[ Full content available at: https://github.com/apache/beam/pull/401 ]
This message was relayed via gitbox.apache.org for [email protected]