mr-runner: support SourceMetrics, this fixes MetricsTest.testBoundedSourceMetrics().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c62b3ad4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c62b3ad4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c62b3ad4 Branch: refs/heads/mr-runner Commit: c62b3ad462c2c07ce36cce025dc52204e7eb87d2 Parents: 5248ce4 Author: Pei He <[email protected]> Authored: Fri Sep 1 16:55:19 2017 +0800 Committer: Pei He <[email protected]> Committed: Fri Sep 1 17:15:08 2017 +0800 ---------------------------------------------------------------------- .../mapreduce/translation/BeamInputFormat.java | 39 +++++++++++++++----- .../translation/FileReadOperation.java | 4 ++ .../translation/FlattenTranslator.java | 12 +++--- .../mapreduce/translation/GraphPlanner.java | 3 +- .../mapreduce/translation/JobPrototype.java | 4 +- .../translation/ReadBoundedTranslator.java | 7 ++-- .../mapreduce/translation/ReadOperation.java | 5 ++- .../translation/SourceReadOperation.java | 6 ++- 8 files changed, 56 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java index 10d9ada..9dc3396 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java @@ -26,12 +26,15 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -87,7 +90,8 @@ public class BeamInputFormat<T> extends InputFormat { .transform(new Function<BoundedSource<?>, ReadOperation.TaggedSource>() { @Override public ReadOperation.TaggedSource apply(BoundedSource<?> input) { - return ReadOperation.TaggedSource.of(input, taggedSource.getTag()); + return ReadOperation.TaggedSource.of( + taggedSource.getStepName(), input, taggedSource.getTag()); }}); } catch (Exception e) { Throwables.throwIfUnchecked(e); @@ -98,7 +102,8 @@ public class BeamInputFormat<T> extends InputFormat { .transform(new Function<ReadOperation.TaggedSource, InputSplit>() { @Override public InputSplit apply(ReadOperation.TaggedSource taggedSource) { - return new BeamInputSplit(taggedSource.getSource(), options, taggedSource.getTag()); + return new BeamInputSplit(taggedSource.getStepName(), taggedSource.getSource(), + options, taggedSource.getTag()); }}) .toList(); } catch (Exception e) { @@ -113,6 +118,7 @@ public class BeamInputFormat<T> extends InputFormat { } public static class BeamInputSplit<T> extends InputSplit implements Writable { + private String stepName; private BoundedSource<T> boundedSource; private SerializedPipelineOptions options; private TupleTag<?> tupleTag; @@ -121,9 +127,11 @@ public class BeamInputFormat<T> extends InputFormat { } public BeamInputSplit( + String stepName, BoundedSource<T> boundedSource, SerializedPipelineOptions options, TupleTag<?> tupleTag) { + this.stepName = checkNotNull(stepName, "stepName"); this.boundedSource = checkNotNull(boundedSource, "boundedSources"); this.options = checkNotNull(options, "options"); this.tupleTag = checkNotNull(tupleTag, "tupleTag"); @@ -131,7 +139,7 @@ public class BeamInputFormat<T> extends InputFormat { public BeamRecordReader<T> createReader() throws IOException { return new BeamRecordReader<>( - boundedSource.createReader(options.getPipelineOptions()), tupleTag); + stepName, boundedSource.createReader(options.getPipelineOptions()), tupleTag); } @Override @@ -154,6 +162,7 @@ public class BeamInputFormat<T> extends InputFormat { @Override public void write(DataOutput out) throws IOException { ByteArrayOutputStream stream = new ByteArrayOutputStream(); + StringUtf8Coder.of().encode(stepName, stream); SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream); SerializableCoder.of(SerializedPipelineOptions.class).encode(options, stream); SerializableCoder.of(TupleTag.class).encode(tupleTag, stream); @@ -170,6 +179,7 @@ public class BeamInputFormat<T> extends InputFormat { in.readFully(bytes); ByteArrayInputStream inStream = new ByteArrayInputStream(bytes); + stepName = StringUtf8Coder.of().decode(inStream); boundedSource = SerializableCoder.of(BoundedSource.class).decode(inStream); options = SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream); tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream); @@ -178,11 +188,15 @@ public class BeamInputFormat<T> extends InputFormat { private static class BeamRecordReader<T> extends RecordReader { + private final String stepName; private final BoundedSource.BoundedReader<T> reader; - private TupleTag<?> tupleTag; + private final TupleTag<?> tupleTag; + private MetricsReporter metricsReporter; private boolean started; - public BeamRecordReader(BoundedSource.BoundedReader<T> reader, TupleTag<?> tupleTag) { + public BeamRecordReader( + String stepName, BoundedSource.BoundedReader<T> reader, TupleTag<?> tupleTag) { + this.stepName = checkNotNull(stepName, "stepName"); this.reader = checkNotNull(reader, "reader"); this.tupleTag = checkNotNull(tupleTag, "tupleTag"); this.started = false; @@ -191,15 +205,19 @@ public class BeamInputFormat<T> extends InputFormat { @Override public void initialize( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + this.metricsReporter = new MetricsReporter(context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { - if (!started) { - started = true; - return reader.start(); - } else { - return reader.advance(); + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { + if (!started) { + started = true; + return reader.start(); + } else { + return reader.advance(); + } } } @@ -233,6 +251,7 @@ public class BeamInputFormat<T> extends InputFormat { @Override public void close() throws IOException { reader.close(); + metricsReporter.updateMetrics(); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java index cbbfbd2..f212252 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java @@ -46,15 +46,18 @@ import org.apache.hadoop.io.SequenceFile; */ public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>> { + private final String stepName; private final String fileName; private final Coder<WindowedValue<T>> coder; private final TupleTag<?> tupleTag; public FileReadOperation( + String stepName, String fileName, Coder<WindowedValue<T>> coder, TupleTag<?> tupleTag) { super(); + this.stepName = checkNotNull(stepName, "stepName"); this.fileName = checkNotNull(fileName, "fileName"); this.coder = checkNotNull(coder, "coder"); this.tupleTag = checkNotNull(tupleTag, "tupleTag"); @@ -63,6 +66,7 @@ public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>> { @Override TaggedSource getTaggedSource(Configuration conf) { return TaggedSource.of( + stepName, new FileBoundedSource<>(fileName, coder, new SerializableConfiguration(conf)), tupleTag); } http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java index 817f2bf..06ad367 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java @@ -55,18 +55,20 @@ public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PC } } + String stepName = userGraphContext.getStepName(); if (inputTagToCount.isEmpty()) { // Create a empty source Operation<?> operation = - new SourceReadOperation(new EmptySource(), userGraphContext.getOnlyOutputTag()); + new SourceReadOperation( + stepName, new EmptySource(), userGraphContext.getOnlyOutputTag()); context.addInitStep( - Graphs.Step.of(userGraphContext.getStepName(), operation), + Graphs.Step.of(stepName, operation), userGraphContext.getInputTags(), userGraphContext.getOutputTags()); } else if (!containsDuplicates) { Operation<?> operation = new FlattenOperation(1); context.addInitStep( - Graphs.Step.of(userGraphContext.getStepName(), operation), + Graphs.Step.of(stepName, operation), userGraphContext.getInputTags(), userGraphContext.getOutputTags()); } else { @@ -79,7 +81,7 @@ public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PC if (dupFactor == 1) { intermediateTags.add(inTag); } else { - String dupStepName = userGraphContext.getStepName() + "/Dup-" + dupFactor; + String dupStepName = stepName + "/Dup-" + dupFactor; Graphs.Tag outTag = Graphs.Tag.of( dupStepName + ".out", new TupleTag<T>(), @@ -93,7 +95,7 @@ public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PC } } context.addInitStep( - Graphs.Step.of(userGraphContext.getStepName(), new FlattenOperation(1)), + Graphs.Step.of(stepName, new FlattenOperation(1)), intermediateTags, userGraphContext.getOutputTags()); } http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index 6c79277..09998ea 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -83,7 +83,8 @@ public class GraphPlanner { consumer.addStep( Graphs.Step.of( readStepName, - new FileReadOperation(filePath, writeValueCoder, tag.getTupleTag())), + new FileReadOperation( + readStepName, filePath, writeValueCoder, tag.getTupleTag())), ImmutableList.<Graphs.Tag>of(), ImmutableList.of(readOutput)); } http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index 44f279b..e8e6eab 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; @@ -77,8 +78,9 @@ public class JobPrototype { "io.serializations", "org.apache.hadoop.io.serializer.WritableSerialization," + "org.apache.hadoop.io.serializer.JavaSerialization"); + conf.set("mapreduce.job.counters.group.name.max", "512"); + Limits.init(conf); - //TODO: config out dir with PipelineOptions. conf.set( FileOutputFormat.OUTDIR, configUtils.getFileOutputDir(fusedStep.getStageId())); http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java index 138c00e..5e5c99b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java @@ -27,10 +27,11 @@ class ReadBoundedTranslator<T> extends TransformTranslator.Default<Read.Bounded< public void translateNode(Read.Bounded transform, TranslationContext context) { TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - ReadOperation operation = - new SourceReadOperation(transform.getSource(), userGraphContext.getOnlyOutputTag()); + String stepName = userGraphContext.getStepName(); + ReadOperation operation = new SourceReadOperation( + stepName, transform.getSource(), userGraphContext.getOnlyOutputTag()); context.addInitStep( - Graphs.Step.of(userGraphContext.getStepName(), operation), + Graphs.Step.of(stepName, operation), userGraphContext.getInputTags(), userGraphContext.getOutputTags()); } http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java index cb8b00e..a3e1d77 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java @@ -46,12 +46,13 @@ abstract class ReadOperation<T> extends Operation<T> { @AutoValue abstract static class TaggedSource implements Serializable { + abstract String getStepName(); abstract BoundedSource<?> getSource(); abstract TupleTag<?> getTag(); - static TaggedSource of(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) { + static TaggedSource of(String stepName, BoundedSource<?> boundedSource, TupleTag<?> tupleTag) { return new org.apache.beam.runners.mapreduce.translation - .AutoValue_ReadOperation_TaggedSource(boundedSource, tupleTag); + .AutoValue_ReadOperation_TaggedSource(stepName, boundedSource, tupleTag); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java index 19b0320..55b46a4 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java @@ -27,12 +27,14 @@ import org.apache.hadoop.conf.Configuration; * Operation that reads from {@link BoundedSource}. */ public class SourceReadOperation extends ReadOperation { + private final String stepName; private final TaggedSource source; - SourceReadOperation(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) { + SourceReadOperation(String stepName, BoundedSource<?> boundedSource, TupleTag<?> tupleTag) { + this.stepName = checkNotNull(stepName, "stepName"); checkNotNull(boundedSource, "boundedSource"); checkNotNull(tupleTag, "tupleTag"); - this.source = TaggedSource.of(boundedSource, tupleTag); + this.source = TaggedSource.of(stepName, boundedSource, tupleTag); } @Override
