[ https://issues.apache.org/jira/browse/HAMA-983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15730450#comment-15730450 ]
Edward J. Yoon commented on HAMA-983: ------------------------------------- Here's my skeleton code with example that counts the words. You should implement the HamaPipelineRunner. Just translate and execute batch job. I think you can find how to translate them from flink's code: https://github.com/dataArtisans/flink-dataflow/blob/aad5d936abd41240f3e15d294ea181fb9cca05e0/runner/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java#L410 {code} public class WordCountTest { static final String[] WORDS_ARRAY = new String[] { "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi" }; static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); static final String[] COUNTS_ARRAY = new String[] { "hi: 5", "there: 1", "sue: 2", "bob: 2" }; /** * Example test that tests a PTransform by using an in-memory input and * inspecting the output. */ @Test @Category(RunnableOnService.class) public void testCountWords() throws Exception { HamaOptions options = PipelineOptionsFactory.as(HamaOptions.class); options.setRunner(HamaPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection<String> input = p.apply(Create.of(WORDS).withCoder( StringUtf8Coder.of())); PCollection<String> output = input .apply(new WordCount()) .apply(MapElements.via(new FormatAsTextFn())); //.apply(TextIO.Write.to("/tmp/result")); PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); p.run().waitUntilFinish(); } public static class WordCount extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { private static final long serialVersionUID = 1L; @Override public PCollection<KV<String, Long>> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply(ParDo.of(new DoFn<String, String>() { private static final long serialVersionUID = 1L; private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { emptyLines.addValue(1L); } // Split the line into words. String[] words = c.element().split("[^a-zA-Z']+"); // Output each word encountered into the output PCollection. for (String word : words) { if (!word.isEmpty()) { c.output(word); } } } })); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count .<String> perElement()); return wordCounts; } } // ///// TODO public static class HamaPipelineRunner extends PipelineRunner<HamaPipelineResult> { public static HamaPipelineRunner fromOptions(PipelineOptions x) { return new HamaPipelineRunner(); } @Override public <Output extends POutput, Input extends PInput> Output apply( PTransform<Input, Output> transform, Input input) { return super.apply(transform, input); } @Override public HamaPipelineResult run(Pipeline pipeline) { // TODO Auto-generated method stub System.out.println("Executing pipeline using HamaPipelineRunner."); // TODO you need to translate pipeline to Hama program // and execute pipeline // return the result return null; } } public class HamaPipelineResult implements PipelineResult { @Override public State getState() { // TODO Auto-generated method stub return null; } @Override public State cancel() throws IOException { // TODO Auto-generated method stub return null; } @Override public State waitUntilFinish(Duration duration) { // TODO Auto-generated method stub return null; } @Override public State waitUntilFinish() { // TODO Auto-generated method stub return null; } @Override public <T> AggregatorValues<T> getAggregatorValues( Aggregator<?, T> aggregator) throws AggregatorRetrievalException { // TODO Auto-generated method stub return null; } @Override public MetricResults metrics() { // TODO Auto-generated method stub return null; } } public static interface HamaOptions extends PipelineOptions { } } {code} > Hama runner for DataFlow > ------------------------ > > Key: HAMA-983 > URL: https://issues.apache.org/jira/browse/HAMA-983 > Project: Hama > Issue Type: Bug > Reporter: Edward J. Yoon > Labels: gsoc2016 > > As you already know, Apache Beam provides unified programming model for both > batch and streaming inputs. > The APIs are generally associated with data filtering and transforming. So > we'll need to implement some data processing runner like > https://github.com/dapurv5/MapReduce-BSP-Adapter/blob/master/src/main/java/org/apache/hama/mapreduce/examples/WordCount.java > Also, implementing similarity join can be funny. According to > http://www.ruizhang.info/publications/TPDS2015-Heads_Join.pdf, Apache Hama is > clearly winner among Apache Hadoop and Apache Spark. > Since it consists of transformation, aggregation, and partition computations, > I think it's possible to implement using Apache Beam APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)