mr-runner: setMetricsSupported to run ValidatesRunner tests with TestPipeline.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d3386d4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d3386d4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d3386d4 Branch: refs/heads/mr-runner Commit: 8d3386d479b5704fa9448c7a9b1eab9c66e75549 Parents: 4e7062c Author: Pei He <[email protected]> Authored: Wed Aug 30 19:40:24 2017 +0800 Committer: Pei He <[email protected]> Committed: Fri Sep 1 17:13:39 2017 +0800 ---------------------------------------------------------------------- runners/map-reduce/pom.xml | 14 ++++++++++++ .../beam/runners/mapreduce/MapReduceRunner.java | 5 +++++ .../translation/ConfigurationUtils.java | 23 +++++++++++++++----- .../mapreduce/translation/GraphPlanner.java | 11 +++++----- .../mapreduce/translation/JobPrototype.java | 4 +++- 5 files changed, 44 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/pom.xml ---------------------------------------------------------------------- diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml index e858031..d65bb34 100644 --- a/runners/map-reduce/pom.xml +++ b/runners/map-reduce/pom.xml @@ -56,6 +56,20 @@ <groups> org.apache.beam.sdk.testing.ValidatesRunner </groups> + <excludes> + <exclude>org.apache.beam.sdk.testing.PAssertTest.java</exclude> + </excludes> + <excludedGroups> + org.apache.beam.sdk.testing.UsesSetState, + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesDistributionMetrics, + org.apache.beam.sdk.testing.UsesGaugeMetrics, + org.apache.beam.sdk.testing.UsesCommittedMetrics, + org.apache.beam.sdk.testing.LargeKeys$Above10MB, + org.apache.beam.sdk.testing.UsesTimersInParDo, + org.apache.beam.sdk.testing.UsesStatefulParDo, + org.apache.beam.sdk.testing.UsesTestStream + </excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> <dependenciesToScan> http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index 88ed01e..71edf1a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -31,9 +31,11 @@ import org.apache.beam.runners.mapreduce.translation.TranslationContext; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; +import org.apache.log4j.BasicConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +64,9 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult> { @Override public PipelineResult run(Pipeline pipeline) { + BasicConfigurator.configure(); + MetricsEnvironment.setMetricsSupported(true); + TranslationContext context = new TranslationContext(options); GraphConverter graphConverter = new GraphConverter(context); pipeline.traverseTopologically(graphConverter); http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java index 6d7a81a..4ec50bd 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java @@ -17,9 +17,13 @@ */ package org.apache.beam.runners.mapreduce.translation; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -28,21 +32,28 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; */ public class ConfigurationUtils { + private final MapReducePipelineOptions options; + + public ConfigurationUtils(MapReducePipelineOptions options) { + this.options = checkNotNull(options, "options"); + } + public static ResourceId getResourceIdForOutput(String fileName, Configuration conf) { ResourceId outDir = FileSystems.matchNewResource(conf.get(FileOutputFormat.OUTDIR), true); return outDir.resolve(fileName, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); } - public static String getFileOutputDir(String baseFileOutputDir, int stageId) { - if (baseFileOutputDir.endsWith("/")) { - return String.format("%sstage-%d", baseFileOutputDir, stageId); + public String getFileOutputDir(int stageId) { + String fileOutputDir = options.getFileOutputDir(); + if (fileOutputDir.endsWith("/")) { + return String.format("%s%s/stage-%d", fileOutputDir, options.getJobName(), stageId); } else { - return String.format("%s/stage-%d", baseFileOutputDir, stageId); + return String.format("%s/%s/stage-%d", fileOutputDir, options.getJobName(), stageId); } } - public static String getFileOutputPath(String baseFileOutputDir, int stageId, String fileName) { - return String.format("%s/%s", getFileOutputDir(baseFileOutputDir, stageId), fileName); + public String getFileOutputPath(int stageId, String fileName) { + return String.format("%s/%s", getFileOutputDir(stageId), fileName); } public static String toFileName(String tagName) { http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index b6e134e..608b304 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -36,10 +36,11 @@ import org.apache.beam.sdk.values.WindowingStrategy; */ public class GraphPlanner { - private final MapReducePipelineOptions options; + private final ConfigurationUtils configUtils; public GraphPlanner(MapReducePipelineOptions options) { - this.options = checkNotNull(options, "options"); + checkNotNull(options, "options"); + this.configUtils = new ConfigurationUtils(options); } public Graphs.FusedGraph plan(Graphs.FusedGraph fusedGraph) { @@ -79,8 +80,7 @@ public class GraphPlanner { } consumer.removeTag(tag); - String filePath = ConfigurationUtils.getFileOutputPath( - options.getFileOutputDir(), fusedStep.getStageId(), fileName); + String filePath = configUtils.getFileOutputPath(fusedStep.getStageId(), fileName); consumer.addStep( Graphs.Step.of( readStepName, @@ -133,8 +133,7 @@ public class GraphPlanner { for (Graphs.Tag sideInTag : sideInputTags) { tupleTagToFilePath.put( sideInTag.getTupleTag(), - ConfigurationUtils.getFileOutputPath( - options.getFileOutputDir(), + configUtils.getFileOutputPath( fusedGraph.getProducer(sideInTag).getStageId(), ConfigurationUtils.toFileName(sideInTag.getName()))); } http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index a0c6626..93ae33a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -60,11 +60,13 @@ public class JobPrototype { private final int stageId; private final Graphs.FusedStep fusedStep; private final MapReducePipelineOptions options; + private final ConfigurationUtils configUtils; private JobPrototype(int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions options) { this.stageId = stageId; this.fusedStep = checkNotNull(fusedStep, "fusedStep"); this.options = checkNotNull(options, "options"); + this.configUtils = new ConfigurationUtils(options); } public Job build(Class<?> jarClass, Configuration initConf) throws IOException { @@ -79,7 +81,7 @@ public class JobPrototype { //TODO: config out dir with PipelineOptions. conf.set( FileOutputFormat.OUTDIR, - ConfigurationUtils.getFileOutputDir(options.getFileOutputDir(), fusedStep.getStageId())); + configUtils.getFileOutputDir(fusedStep.getStageId())); // Setup BoundedSources in BeamInputFormat. Graphs.Step startStep = Iterables.getOnlyElement(fusedStep.getStartSteps());
