mr-runner: add JarClassInstanceFactory to run ValidatesRunner tests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee1cce92 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee1cce92 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee1cce92 Branch: refs/heads/mr-runner Commit: ee1cce92d620c78b0243ee0b9c7c0f6ae232b0cb Parents: c6a3a18 Author: Pei He <p...@apache.org> Authored: Thu Jul 27 15:05:06 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 14:13:48 2017 +0800 ---------------------------------------------------------------------- runners/map-reduce/pom.xml | 2 +- .../mapreduce/MapReducePipelineOptions.java | 51 ++++++++++++++++++++ runners/pom.xml | 1 + 3 files changed, 53 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ee1cce92/runners/map-reduce/pom.xml ---------------------------------------------------------------------- diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml index 226c5c0..06e5227 100644 --- a/runners/map-reduce/pom.xml +++ b/runners/map-reduce/pom.xml @@ -39,7 +39,7 @@ <!-- This profile adds execution of ValidatesRunner integration tests against a hadoop local cluster. --> <id>local-validates-runner-tests</id> - <activation><activeByDefault>false</activeByDefault></activation> + <activation><activeByDefault>true</activeByDefault></activation> <build> <plugins> <plugin> http://git-wip-us.apache.org/repos/asf/beam/blob/ee1cce92/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java index ce8f937..7fe66ba 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java @@ -1,14 +1,65 @@ package org.apache.beam.runners.mapreduce; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; +import java.util.Iterator; +import java.util.Set; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; /** * {@link PipelineOptions} for {@link MapReduceRunner}. */ public interface MapReducePipelineOptions extends PipelineOptions { + /** Classes that are used as the boundary in the stack trace to find the callers class name. */ + Set<String> PIPELINE_OPTIONS_FACTORY_CLASSES = ImmutableSet.of( + PipelineOptionsFactory.class.getName(), + PipelineOptionsFactory.Builder.class.getName(), + "org.apache.beam.sdk.options.ProxyInvocationHandler"); + + @Description("The jar class of the user Beam program.") + @Default.InstanceFactory(JarClassInstanceFactory.class) Class<?> getJarClass(); void setJarClass(Class<?> jarClass); + + class JarClassInstanceFactory implements DefaultValueFactory<Class<?>> { + @Override + public Class<?> create(PipelineOptions options) { + return findCallersClassName(options); + } + + /** + * Returns the simple name of the calling class using the current threads stack. + */ + private static Class<?> findCallersClassName(PipelineOptions options) { + Iterator<StackTraceElement> elements = + Iterators.forArray(Thread.currentThread().getStackTrace()); + // First find the PipelineOptionsFactory/Builder class in the stack trace. + while (elements.hasNext()) { + StackTraceElement next = elements.next(); + if (PIPELINE_OPTIONS_FACTORY_CLASSES.contains(next.getClassName())) { + break; + } + } + // Then find the first instance after that is not the PipelineOptionsFactory/Builder class. + while (elements.hasNext()) { + StackTraceElement next = elements.next(); + if (!PIPELINE_OPTIONS_FACTORY_CLASSES.contains(next.getClassName()) + && !next.getClassName().contains("com.sun.proxy.$Proxy") + && !next.getClassName().equals(options.getRunner().getName())) { + try { + return Class.forName(next.getClassName()); + } catch (ClassNotFoundException e) { + break; + } + } + } + return null; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/ee1cce92/runners/pom.xml ---------------------------------------------------------------------- diff --git a/runners/pom.xml b/runners/pom.xml index b00ba9c..4cba41a 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -35,6 +35,7 @@ <modules> <module>core-construction-java</module> <module>core-java</module> + <module>map-reduce</module> <module>direct-java</module> <module>flink</module> <module>google-cloud-dataflow-java</module>