[
https://issues.apache.org/jira/browse/BEAM-313?focusedWorklogId=140053&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140053
]
ASF GitHub Bot logged work on BEAM-313:
---------------------------------------
Author: ASF GitHub Bot
Created on: 31/Aug/18 11:07
Start Date: 31/Aug/18 11:07
Worklog Time Spent: 10m
Work Description: kohlerm removed a comment on issue #401: [BEAM-313]
Enable the use of an existing spark context with the SparkPipelineRunner
URL: https://github.com/apache/beam/pull/401#issuecomment-417631312
`
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;
import java.io.Serializable;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.spark.api.java.JavaSparkContext;
import spark.jobserver.api.JobEnvironment;
import spark.jobserver.japi.JSparkJob;
import org.apache.beam.runners.spark.SparkContextOptions;
import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//
//
//
/**
* An example that counts words in Shakespeare and includes Beam best
practices.
*
* <p>This class, {@link WordCount}, is the second in a series of four
successively more detailed
* 'word count' examples. You may first want to take a look at {@link
MinimalWordCount}.
* After you've looked at this example, then see the {@link
DebuggingWordCount}
* pipeline, for introduction of additional concepts.
*
* <p>For a detailed walkthrough of this example, see
* <a href="https://beam.apache.org/get-started/wordcount-example/">
* https://beam.apache.org/get-started/wordcount-example/
* </a>
*
* <p>Basic concepts, also in the MinimalWordCount example:
* Reading text files; counting a PCollection; writing to text files
*
* <p>New Concepts:
* <pre>
* 1. Executing a Pipeline both locally and using the selected runner
* 2. Using ParDo with static DoFns defined out-of-line
* 3. Building a composite transform
* 4. Defining your own pipeline options
* </pre>
*
* <p>Concept #1: you can execute this pipeline either locally or using by
selecting another runner.
* These are now command-line options and not hard-coded as they were in the
MinimalWordCount
* example.
*
* <p>To change the runner, specify:
* <pre>{@code
* --runner=YOUR_SELECTED_RUNNER
* }
* </pre>
*
* <p>To execute this pipeline, specify a local output file (if using the
* {@code DirectRunner}) or output prefix on a supported distributed file
system.
* <pre>{@code
* --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
* }</pre>
*
* <p>The input file defaults to a public data set containing the text of of
King Lear,
* by William Shakespeare. You can override it and choose your own input
with {@code --inputFile}.
*/
public class WordCountJS implements JSparkJob<String>, Serializable {
/**
* Concept #2: You can make your pipeline assembly code less verbose by
defining your DoFns
* statically out-of-line. This DoFn tokenizes lines of text into
individual words; we pass it
* to a ParDo in the pipeline.
*/
private static final Logger LOG =
LoggerFactory.getLogger(WordCountJS.class);
static List<String> lines = new ArrayList<String>(Arrays.asList(
"It does not matter",
"what temperature the room is",
"is always room temperature"));
public String run(JavaSparkContext context, JobEnvironment runtime,
Config config) {
// WordCountOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation()
// .as(WordCountOptions.class);
WordCountOptions options =
PipelineOptionsFactory.create().as(WordCountOptions.class);
String text = null;
try {
// get jobserver config data
text = config.getString("input.string");
LOG.info("config input.text: " + text);
}
catch (com.typesafe.config.ConfigException.Missing ex) {
LOG.info("could not read config");
}
if(text != null) {
try {
lines.add(text);
}
catch(java.lang.Exception ex) {
LOG.info("could not add text from config: " + ex.getMessage());
}
}
// add the sample text to options
options.setText(lines);
// pass in jobserver context
options.setProvidedSparkContext(context);
// tell sparkrunner to use provided context from jobserver
options.setUsesProvidedSparkContext(true);
runWordCount(options);
return null;
}
public Config verify(JavaSparkContext context, JobEnvironment runtime,
Config config) {
// config is optional - no need to validate
return config;
}
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class,
"emptyLines");
private final Distribution lineLenDist = Metrics.distribution(
ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
public void processElement(@Element String element,
OutputReceiver<String> receiver) {
lineLenDist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}
// Split the line into words.
String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}
/** A SimpleFunction that converts a Word and Count into a printable
string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String,
Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
/** A SimpleFunction that simply logs the string value without any other
transformation */
public static class LogCounts extends SimpleFunction<String, String> {
@Override
public String apply(String input) {
// simply log count result
LOG.info("Count result: " + input);
return input;
}
}
/**
* A PTransform that converts a PCollection containing lines of text into
a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two
transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms
allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.perElement());
return wordCounts;
}
}
/**
* Options supported by {@link WordCount}.
*
* <p>Concept #4: Defining your own configuration options. Here, you can
add your own arguments
* to be processed by the command-line parser, and specify default values
for them. You can then
* access the options values in your pipeline code.
*
* <p>Inherits standard configuration options.
*/
public interface WordCountOptions extends SparkContextOptions {
/**
* One property to pass in static text for the word count
*/
@Description("Sample text")
@Required
List<String> getText();
void setText(List<String> value);
}
static void runWordCount(WordCountOptions options) {
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
// Concepts #2 and #3: Our pipeline applies the composite CountWords
transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
p.apply("ReadLines", Create.of(lines)).setCoder(StringUtf8Coder.of())
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply(MapElements.via(new LogCounts()));
p.run().waitUntilFinish();
}
}`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 140053)
Time Spent: 1h 40m (was: 1.5h)
> Enable the use of an existing spark context with the SparkPipelineRunner
> ------------------------------------------------------------------------
>
> Key: BEAM-313
> URL: https://issues.apache.org/jira/browse/BEAM-313
> Project: Beam
> Issue Type: New Feature
> Components: runner-spark
> Reporter: Abbass Marouni
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Fix For: 0.3.0-incubating
>
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> The general use case is that the SparkPipelineRunner creates its own Spark
> context and uses it for the pipeline execution.
> Another alternative is to provide the SparkPipelineRunner with an existing
> spark context. This can be interesting for a lot of use cases where the Spark
> context is managed outside of beam (context reuse, advanced context
> management, spark job server, ...).
> code sample :
> https://github.com/amarouni/incubator-beam/commit/fe0bb517bf0ccde07ef5a61f3e44df695b75f076
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)