[ 
https://issues.apache.org/jira/browse/HAMA-983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15730450#comment-15730450
 ] 

Edward J. Yoon commented on HAMA-983:
-------------------------------------

Here's my skeleton code with example that counts the words. You should 
implement the HamaPipelineRunner. Just translate and execute batch job. I think 
you can find how to translate them from flink's code: 
https://github.com/dataArtisans/flink-dataflow/blob/aad5d936abd41240f3e15d294ea181fb9cca05e0/runner/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java#L410

{code}
public class WordCountTest {

  static final String[] WORDS_ARRAY = new String[] { "hi there", "hi",
      "hi sue bob", "hi sue", "", "bob hi" };

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

  static final String[] COUNTS_ARRAY = new String[] { "hi: 5", "there: 1",
      "sue: 2", "bob: 2" };

  /**
   * Example test that tests a PTransform by using an in-memory input and
   * inspecting the output.
   */
  @Test
  @Category(RunnableOnService.class)
  public void testCountWords() throws Exception {
    HamaOptions options = PipelineOptionsFactory.as(HamaOptions.class);
    options.setRunner(HamaPipelineRunner.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> input = p.apply(Create.of(WORDS).withCoder(
        StringUtf8Coder.of()));

    PCollection<String> output = input
        .apply(new WordCount())
        .apply(MapElements.via(new FormatAsTextFn()));
        //.apply(TextIO.Write.to("/tmp/result"));

    PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
    p.run().waitUntilFinish();
  }

  public static class WordCount extends
      PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    
    private static final long serialVersionUID = 1L;

    @Override
    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new DoFn<String, 
String>() {
        private static final long serialVersionUID = 1L;
        private final Aggregator<Long, Long> emptyLines =
            createAggregator("emptyLines", new Sum.SumLongFn());

        @ProcessElement
        public void processElement(ProcessContext c) {
          if (c.element().trim().isEmpty()) {
            emptyLines.addValue(1L);
          }

          // Split the line into words.
          String[] words = c.element().split("[^a-zA-Z']+");

          // Output each word encountered into the output PCollection.
          for (String word : words) {
            if (!word.isEmpty()) {
              c.output(word);
            }
          }
        }
      }));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count
          .<String> perElement());

      return wordCounts;
    }
  }

  // ///// TODO
  public static class HamaPipelineRunner extends
      PipelineRunner<HamaPipelineResult> {

    public static HamaPipelineRunner fromOptions(PipelineOptions x) {
      return new HamaPipelineRunner();
    }

    @Override
    public <Output extends POutput, Input extends PInput> Output apply(
                    PTransform<Input, Output> transform, Input input) {
            return super.apply(transform, input);
    }
    
    @Override
    public HamaPipelineResult run(Pipeline pipeline) {
      // TODO Auto-generated method stub
      System.out.println("Executing pipeline using HamaPipelineRunner.");

      // TODO you need to translate pipeline to Hama program
      // and execute pipeline
      // return the result
      return null;
    }

  }

  public class HamaPipelineResult implements PipelineResult {

    @Override
    public State getState() {
      // TODO Auto-generated method stub
      return null;
    }

    @Override
    public State cancel() throws IOException {
      // TODO Auto-generated method stub
      return null;
    }

    @Override
    public State waitUntilFinish(Duration duration) {
      // TODO Auto-generated method stub
      return null;
    }

    @Override
    public State waitUntilFinish() {
      // TODO Auto-generated method stub
      return null;
    }

    @Override
    public <T> AggregatorValues<T> getAggregatorValues(
        Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
      // TODO Auto-generated method stub
      return null;
    }

    @Override
    public MetricResults metrics() {
      // TODO Auto-generated method stub
      return null;
    }

  }

  public static interface HamaOptions extends PipelineOptions {

  }

}
{code}

> Hama runner for DataFlow
> ------------------------
>
>                 Key: HAMA-983
>                 URL: https://issues.apache.org/jira/browse/HAMA-983
>             Project: Hama
>          Issue Type: Bug
>            Reporter: Edward J. Yoon
>              Labels: gsoc2016
>
> As you already know, Apache Beam provides unified programming model for both 
> batch and streaming inputs.
> The APIs are generally associated with data filtering and transforming. So 
> we'll need to implement some data processing runner like 
> https://github.com/dapurv5/MapReduce-BSP-Adapter/blob/master/src/main/java/org/apache/hama/mapreduce/examples/WordCount.java
> Also, implementing similarity join can be funny. According to 
> http://www.ruizhang.info/publications/TPDS2015-Heads_Join.pdf, Apache Hama is 
> clearly winner among Apache Hadoop and Apache Spark.
> Since it consists of transformation, aggregation, and partition computations, 
> I think it's possible to implement using Apache Beam APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to