`
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.examples;

import java.io.Serializable;

import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.spark.api.java.JavaSparkContext;
import spark.jobserver.api.JobEnvironment;
import spark.jobserver.japi.JSparkJob;
import org.apache.beam.runners.spark.SparkContextOptions;
import com.typesafe.config.Config;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

//
//
//


/**
 * An example that counts words in Shakespeare and includes Beam best practices.
 *
 * <p>This class, {@link WordCount}, is the second in a series of four 
successively more detailed
 * 'word count' examples. You may first want to take a look at {@link 
MinimalWordCount}.
 * After you've looked at this example, then see the {@link DebuggingWordCount}
 * pipeline, for introduction of additional concepts.
 *
 * <p>For a detailed walkthrough of this example, see
 *   <a href="https://beam.apache.org/get-started/wordcount-example/";>
 *   https://beam.apache.org/get-started/wordcount-example/
 *   </a>
 *
 * <p>Basic concepts, also in the MinimalWordCount example:
 * Reading text files; counting a PCollection; writing to text files
 *
 * <p>New Concepts:
 * <pre>
 *   1. Executing a Pipeline both locally and using the selected runner
 *   2. Using ParDo with static DoFns defined out-of-line
 *   3. Building a composite transform
 *   4. Defining your own pipeline options
 * </pre>
 *
 * <p>Concept #1: you can execute this pipeline either locally or using by 
selecting another runner.
 * These are now command-line options and not hard-coded as they were in the 
MinimalWordCount
 * example.
 *
 * <p>To change the runner, specify:
 * <pre>{@code
 *   --runner=YOUR_SELECTED_RUNNER
 * }
 * </pre>
 *
 * <p>To execute this pipeline, specify a local output file (if using the
 * {@code DirectRunner}) or output prefix on a supported distributed file 
system.
 * <pre>{@code
 *   --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
 * }</pre>
 *
 * <p>The input file defaults to a public data set containing the text of of 
King Lear,
 * by William Shakespeare. You can override it and choose your own input with 
{@code --inputFile}.
 */
public class WordCountJS implements JSparkJob<String>, Serializable {
        
  /**
   * Concept #2: You can make your pipeline assembly code less verbose by 
defining your DoFns
   * statically out-of-line. This DoFn tokenizes lines of text into individual 
words; we pass it
   * to a ParDo in the pipeline.
   */
  private static final Logger LOG = LoggerFactory.getLogger(WordCountJS.class);

    static List<String> lines = new ArrayList<String>(Arrays.asList(
          "It does not matter",
          "what temperature the room is",
          "is always room temperature"));

        public String run(JavaSparkContext context, JobEnvironment runtime, 
Config config) {
//          WordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
//                    .as(WordCountOptions.class);
      WordCountOptions options = 
PipelineOptionsFactory.create().as(WordCountOptions.class);
      String text = null;
      try {
        // get jobserver config data
        text = config.getString("input.string");
        LOG.info("config input.text: " + text);
      }
      catch (com.typesafe.config.ConfigException.Missing ex) {
        LOG.info("could not read config");
      }
      if(text != null) {
        try {
          lines.add(text);
        }
        catch(java.lang.Exception ex) {
          LOG.info("could not add text from config: " + ex.getMessage());
        }
      }
      // add the sample text to options
      options.setText(lines);
      // pass in jobserver context
      options.setProvidedSparkContext(context);
      // tell sparkrunner to use provided context from jobserver
      options.setUsesProvidedSparkContext(true);
      runWordCount(options);
      return null;
        }
        public Config verify(JavaSparkContext context, JobEnvironment runtime, 
Config config) {
      // config is optional - no need to validate
          return config;
        }

  static class ExtractWordsFn extends DoFn<String, String> {
    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, 
"emptyLines");
    private final Distribution lineLenDist = Metrics.distribution(
        ExtractWordsFn.class, "lineLenDistro");

    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<String> 
receiver) {
      lineLenDist.update(element.length());
      if (element.trim().isEmpty()) {
        emptyLines.inc();
      }

      // Split the line into words.
      String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);

      // Output each word encountered into the output PCollection.
      for (String word : words) {
        if (!word.isEmpty()) {
          receiver.output(word);
        }
      }
    }
  }

  /** A SimpleFunction that converts a Word and Count into a printable string. 
*/
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, 
String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }

  /** A SimpleFunction that simply logs the string value without any other 
transformation */
  public static class LogCounts extends SimpleFunction<String, String> {
    @Override
    public String apply(String input) {
      // simply log count result
      LOG.info("Count result: " + input);
      return input;
    }
  }
  /**
   * A PTransform that converts a PCollection containing lines of text into a 
PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two 
transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms 
allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords extends PTransform<PCollection<String>,
      PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = 
words.apply(Count.perElement());

      return wordCounts;
    }
  }

  /**
   * Options supported by {@link WordCount}.
   *
   * <p>Concept #4: Defining your own configuration options. Here, you can add 
your own arguments
   * to be processed by the command-line parser, and specify default values for 
them. You can then
   * access the options values in your pipeline code.
   *
   * <p>Inherits standard configuration options.
   */
  public interface WordCountOptions extends SparkContextOptions {
    /**
     * One property to pass in static text for the word count
     */

    @Description("Sample text")
    @Required
    List<String> getText();
    void setText(List<String> value);
  }

  static void runWordCount(WordCountOptions options) {
    options.setRunner(SparkRunner.class);
    Pipeline p = Pipeline.create(options);

    // Concepts #2 and #3: Our pipeline applies the composite CountWords 
transform, and passes the
    // static FormatAsTextFn() to the ParDo transform.
    p.apply("ReadLines", Create.of(lines)).setCoder(StringUtf8Coder.of())
     .apply(new CountWords())
     .apply(MapElements.via(new FormatAsTextFn()))
            .apply(MapElements.via(new LogCounts()));

    p.run().waitUntilFinish();
  }

}`

[ Full content available at: https://github.com/apache/beam/pull/401 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to