mr-runner: add JobPrototype and translate it to a MR job.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0cbdc5b7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0cbdc5b7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0cbdc5b7 Branch: refs/heads/mr-runner Commit: 0cbdc5b75ed5581ffef8d129b4e61e339d459697 Parents: a884a2f Author: Pei He <[email protected]> Authored: Mon Jul 24 20:15:37 2017 +0800 Committer: Pei He <[email protected]> Committed: Thu Aug 31 14:13:47 2017 +0800 ---------------------------------------------------------------------- .../mapreduce/MapReducePipelineOptions.java | 5 ++ .../beam/runners/mapreduce/MapReduceRunner.java | 41 ++++++++- .../runners/mapreduce/MapReduceWordCount.java | 2 +- .../mapreduce/translation/BeamInputFormat.java | 44 ++++----- .../mapreduce/translation/BeamMapper.java | 75 +++++++++++++--- .../runners/mapreduce/translation/Graph.java | 5 ++ .../mapreduce/translation/JobPrototype.java | 95 ++++++++++++++++++++ .../beam/runners/mapreduce/WordCountTest.java | 42 +++------ 8 files changed, 244 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java index da29931..ce8f937 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java @@ -1,9 +1,14 @@ package org.apache.beam.runners.mapreduce; +import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; /** * {@link PipelineOptions} for {@link MapReduceRunner}. */ public interface MapReducePipelineOptions extends PipelineOptions { + + @Description("The jar class of the user Beam program.") + Class<?> getJarClass(); + void setJarClass(Class<?> jarClass); } http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index 247a8e5..0e3142c 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -1,9 +1,21 @@ package org.apache.beam.runners.mapreduce; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Throwables; +import java.io.IOException; +import org.apache.beam.runners.mapreduce.translation.Graph; +import org.apache.beam.runners.mapreduce.translation.GraphConverter; +import org.apache.beam.runners.mapreduce.translation.GraphPlanner; +import org.apache.beam.runners.mapreduce.translation.JobPrototype; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; /** * {@link PipelineRunner} for crunch. @@ -17,11 +29,38 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult> { * @return The newly created runner. */ public static MapReduceRunner fromOptions(PipelineOptions options) { - return new MapReduceRunner(); + return new MapReduceRunner(options.as(MapReducePipelineOptions.class)); + } + + private final MapReducePipelineOptions options; + + MapReduceRunner(MapReducePipelineOptions options) { + this.options = checkNotNull(options, "options"); } @Override public PipelineResult run(Pipeline pipeline) { + GraphConverter graphConverter = new GraphConverter(); + pipeline.traverseTopologically(graphConverter); + + Graph graph = graphConverter.getGraph(); + + GraphPlanner planner = new GraphPlanner(); + Graph fusedGraph = planner.plan(graph); + for (Graph.Vertex vertex : fusedGraph.getAllVertices()) { + if (vertex.getTransform() instanceof GroupByKey + || vertex.getTransform() instanceof Read.Bounded) { + continue; + } else { + JobPrototype jobPrototype = JobPrototype.create(1, vertex); + try { + Job job = jobPrototype.build(options.getJarClass(), new Configuration()); + job.waitForCompletion(true); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + } + } + } return null; } } http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java index 4ba3a29..d0c7b78 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java @@ -199,7 +199,7 @@ public class MapReduceWordCount { KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())); conf.set( - "source", + BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE, Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source))); Job job = Job.getInstance(conf, "word count"); http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java index 8c4155a..0cfb14b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java @@ -3,17 +3,19 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Function; +import com.google.common.base.Strings; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.Serializable; import java.util.List; -import javax.annotation.Nullable; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TimestampedValue; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; @@ -25,31 +27,30 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * Adaptor from Beam {@link BoundedSource} to MapReduce {@link InputFormat}. */ -public class BeamInputFormat<K, V> extends InputFormat { +public class BeamInputFormat<T> extends InputFormat { + public static final String BEAM_SERIALIZED_BOUNDED_SOURCE = "beam-serialized-bounded-source"; private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000; - private BoundedSource<KV<K, V>> source; + private BoundedSource<T> source; private PipelineOptions options; public BeamInputFormat() { } - public BeamInputFormat(BoundedSource<KV<K, V>> source, PipelineOptions options) { - this.source = checkNotNull(source, "source"); - this.options = checkNotNull(options, "options"); - } - @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { - source = (BoundedSource<KV<K,V>>) SerializableUtils.deserializeFromByteArray( - Base64.decodeBase64(context.getConfiguration().get("source")), - ""); + String serializedBoundedSource = context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE); + if (Strings.isNullOrEmpty(serializedBoundedSource)) { + return ImmutableList.of(); + } + source = (BoundedSource<T>) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(serializedBoundedSource), "BoundedSource"); try { return FluentIterable.from(source.split(DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options)) - .transform(new Function<BoundedSource<KV<K, V>>, InputSplit>() { + .transform(new Function<BoundedSource<T>, InputSplit>() { @Override - public InputSplit apply(BoundedSource<KV<K, V>> source) { + public InputSplit apply(BoundedSource<T> source) { try { return new BeamInputSplit(source.getEstimatedSizeBytes(options)); } catch (Exception e) { @@ -65,8 +66,8 @@ public class BeamInputFormat<K, V> extends InputFormat { @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - source = (BoundedSource<KV<K,V>>) SerializableUtils.deserializeFromByteArray( - Base64.decodeBase64(context.getConfiguration().get("source")), + source = (BoundedSource<T>) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE)), ""); return new BeamRecordReader<>(source.createReader(options)); } @@ -102,12 +103,12 @@ public class BeamInputFormat<K, V> extends InputFormat { } } - private class BeamRecordReader<K, V> extends RecordReader { + private class BeamRecordReader<T> extends RecordReader { - private final BoundedSource.BoundedReader<KV<K, V>> reader; + private final BoundedSource.BoundedReader<T> reader; private boolean started; - public BeamRecordReader(BoundedSource.BoundedReader<KV<K, V>> reader) { + public BeamRecordReader(BoundedSource.BoundedReader<T> reader) { this.reader = checkNotNull(reader, "reader"); this.started = false; } @@ -128,12 +129,13 @@ public class BeamInputFormat<K, V> extends InputFormat { @Override public Object getCurrentKey() throws IOException, InterruptedException { - return reader.getCurrent().getKey(); + return "global"; } @Override public Object getCurrentValue() throws IOException, InterruptedException { - return reader.getCurrent().getValue(); + return WindowedValue.timestampedValueInGlobalWindow( + reader.getCurrent(), reader.getCurrentTimestamp()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java index 88fc8d6..9d2f80d 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java @@ -1,30 +1,83 @@ package org.apache.beam.runners.mapreduce.translation; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.values.KV; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Created by peihe on 21/07/2017. */ -public class BeamMapper<KeyInT, ValueInT, KeyOutT, ValueOutT> - extends Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT> { +public class BeamMapper<ValueInT, ValueOutT> + extends Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>> { + + public static final String BEAM_SERIALIZED_DO_FN = "beam-serialized-do-fn"; + private static final Logger LOG = LoggerFactory.getLogger(BeamMapper.class); - private DoFnInvoker<KV<KeyInT, ValueInT>, KV<KeyOutT, ValueOutT>> doFnInvoker; + private DoFnRunner<ValueInT, ValueOutT> doFnRunner; + private PipelineOptions options; @Override - protected void setup(Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context context) { + protected void setup( + Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) { + String serializedDoFn = checkNotNull( + context.getConfiguration().get(BEAM_SERIALIZED_DO_FN), + BEAM_SERIALIZED_DO_FN); + doFnRunner = DoFnRunners.simpleRunner( + options, + (DoFn<ValueInT, ValueOutT>) SerializableUtils + .deserializeFromByteArray( + Base64.decodeBase64(serializedDoFn), "DoFn"), + NullSideInputReader.empty(), + new MROutputManager(context), + null, + ImmutableList.<TupleTag<?>>of(), + null, + WindowingStrategy.globalDefault()); } @Override protected void map( - KeyInT key, - ValueInT value, - Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context context) { - System.out.print(String.format("key: %s, value: %s", key, value)); + Object key, + WindowedValue<ValueInT> value, + Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) { + LOG.info("key: {}, value: {}.", key, value); + doFnRunner.processElement(value); } @Override - protected void cleanup(Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context context) { + protected void cleanup( + Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) { + } + + class MROutputManager implements DoFnRunners.OutputManager { + + private final Mapper<Object, Object, Object, Object>.Context context; + + MROutputManager(Mapper<?, ?, ?, ?>.Context context) { + this.context = (Mapper<Object, Object, Object, Object>.Context) context; + } + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + try { + context.write("global", output); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + } + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java index 1ca5a05..da31f89 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java @@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -235,6 +236,10 @@ public class Graph { path.addLast(transform); } + public Iterable<PTransform<?, ?>> transforms() { + return path; + } + @Override public boolean equals(Object obj) { if (obj == this) { http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java new file mode 100644 index 0000000..bdbbe5d --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -0,0 +1,95 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; + +/** + * Created by peihe on 24/07/2017. + */ +public class JobPrototype { + + public static JobPrototype create(int stageId, Graph.Vertex vertex) { + return new JobPrototype(stageId, vertex); + } + + private final int stageId; + private final Graph.Vertex vertex; + private final Set<JobPrototype> dependencies; + + private JobPrototype(int stageId, Graph.Vertex vertex) { + this.stageId = stageId; + this.vertex = checkNotNull(vertex, "vertex"); + this.dependencies = Sets.newHashSet(); + } + + public Job build(Class<?> jarClass, Configuration conf) throws IOException { + Job job = new Job(conf); + conf = job.getConfiguration(); + job.setJarByClass(jarClass); + + // Setup BoundedSources in BeamInputFormat. + // TODO: support more than one inputs + Graph.Vertex head = Iterables.getOnlyElement(vertex.getIncoming()).getHead(); + checkState(head.getTransform() instanceof Read.Bounded); + Read.Bounded read = (Read.Bounded) head.getTransform(); + conf.set( + BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE, + Base64.encodeBase64String(SerializableUtils.serializeToByteArray(read.getSource()))); + job.setInputFormatClass(BeamInputFormat.class); + + // Setup DoFns in BeamMapper. + // TODO: support more than one out going edge. + Graph.Edge outEdge = Iterables.getOnlyElement(head.getOutgoing()); + Graph.NodePath outPath = Iterables.getOnlyElement(outEdge.getPaths()); + List<DoFn> doFns = new ArrayList<>(); + doFns.addAll(FluentIterable.from(outPath.transforms()) + .filter(new Predicate<PTransform<?, ?>>() { + @Override + public boolean apply(PTransform<?, ?> input) { + return !(input instanceof Read.Bounded); + } + }) + .transform(new Function<PTransform<?, ?>, DoFn>() { + @Override + public DoFn apply(PTransform<?, ?> input) { + checkArgument( + input instanceof ParDo.SingleOutput, "Only support ParDo.SingleOutput."); + ParDo.SingleOutput parDo = (ParDo.SingleOutput) input; + return parDo.getFn(); + }}) + .toList()); + if (vertex.getTransform() instanceof ParDo.SingleOutput) { + doFns.add(((ParDo.SingleOutput) vertex.getTransform()).getFn()); + } else if (vertex.getTransform() instanceof ParDo.MultiOutput) { + doFns.add(((ParDo.MultiOutput) vertex.getTransform()).getFn()); + } + conf.set( + BeamMapper.BEAM_SERIALIZED_DO_FN, + Base64.encodeBase64String(SerializableUtils.serializeToByteArray( + Iterables.getOnlyElement(doFns)))); + job.setMapperClass(BeamMapper.class); + job.setOutputFormatClass(NullOutputFormat.class); + return job; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java index 51c26f2..80df3e1 100644 --- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java @@ -17,6 +17,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.log4j.BasicConfigurator; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -63,46 +64,25 @@ public class WordCountTest { } } - /** - * A PTransform that converts a PCollection containing lines of text into a PCollection of - * formatted word counts. - * - * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and - * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse, - * modular testing, and an improved monitoring experience. - */ - public static class CountWords extends PTransform<PCollection<String>, - PCollection<KV<String, Long>>> { - @Override - public PCollection<KV<String, Long>> expand(PCollection<String> lines) { - - // Convert lines of text into individual words. - PCollection<String> words = lines.apply( - ParDo.of(new ExtractWordsFn())); - - // Count the number of times each word occurs. - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - return wordCounts; - } - } - @Test public void testWordCount() { - String input = "gs://apache-beam-samples/shakespeare/kinglear.txt"; + BasicConfigurator.configure(); + + String input = "/Users/peihe/github/beam/LICENSE"; String output = "./output"; - PipelineOptions options = PipelineOptionsFactory.create(); + MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class); + options.setJarClass(this.getClass()); options.setRunner(MapReduceRunner.class); Pipeline p = Pipeline.create(options); // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the // static FormatAsTextFn() to the ParDo transform. p.apply("ReadLines", TextIO.read().from(input)) - .apply(new CountWords()) - .apply(MapElements.via(new FormatAsTextFn())) - .apply("WriteCounts", TextIO.write().to(output)); + .apply(ParDo.of(new ExtractWordsFn())); +// .apply(Count.<String>perElement()) +// .apply(MapElements.via(new FormatAsTextFn())) +// .apply("WriteCounts", TextIO.write().to(output)); - p.run().waitUntilFinish(); + p.run(); } }
