mr-runner: add JarClassInstanceFactory to run ValidatesRunner tests.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee1cce92
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee1cce92
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee1cce92

Branch: refs/heads/mr-runner
Commit: ee1cce92d620c78b0243ee0b9c7c0f6ae232b0cb
Parents: c6a3a18
Author: Pei He <p...@apache.org>
Authored: Thu Jul 27 15:05:06 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 14:13:48 2017 +0800

----------------------------------------------------------------------
 runners/map-reduce/pom.xml                      |  2 +-
 .../mapreduce/MapReducePipelineOptions.java     | 51 ++++++++++++++++++++
 runners/pom.xml                                 |  1 +
 3 files changed, 53 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ee1cce92/runners/map-reduce/pom.xml
----------------------------------------------------------------------
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index 226c5c0..06e5227 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -39,7 +39,7 @@
       <!-- This profile adds execution of ValidatesRunner integration tests
            against a hadoop local cluster. -->
       <id>local-validates-runner-tests</id>
-      <activation><activeByDefault>false</activeByDefault></activation>
+      <activation><activeByDefault>true</activeByDefault></activation>
       <build>
         <plugins>
           <plugin>

http://git-wip-us.apache.org/repos/asf/beam/blob/ee1cce92/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index ce8f937..7fe66ba 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -1,14 +1,65 @@
 package org.apache.beam.runners.mapreduce;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 
 /**
  * {@link PipelineOptions} for {@link MapReduceRunner}.
  */
 public interface MapReducePipelineOptions extends PipelineOptions {
 
+  /** Classes that are used as the boundary in the stack trace to find the 
callers class name. */
+  Set<String> PIPELINE_OPTIONS_FACTORY_CLASSES = ImmutableSet.of(
+      PipelineOptionsFactory.class.getName(),
+      PipelineOptionsFactory.Builder.class.getName(),
+      "org.apache.beam.sdk.options.ProxyInvocationHandler");
+
+
   @Description("The jar class of the user Beam program.")
+  @Default.InstanceFactory(JarClassInstanceFactory.class)
   Class<?> getJarClass();
   void setJarClass(Class<?> jarClass);
+
+  class JarClassInstanceFactory implements DefaultValueFactory<Class<?>> {
+    @Override
+    public Class<?> create(PipelineOptions options) {
+      return findCallersClassName(options);
+    }
+
+    /**
+     * Returns the simple name of the calling class using the current threads 
stack.
+     */
+    private static Class<?> findCallersClassName(PipelineOptions options) {
+      Iterator<StackTraceElement> elements =
+          Iterators.forArray(Thread.currentThread().getStackTrace());
+      // First find the PipelineOptionsFactory/Builder class in the stack 
trace.
+      while (elements.hasNext()) {
+        StackTraceElement next = elements.next();
+        if (PIPELINE_OPTIONS_FACTORY_CLASSES.contains(next.getClassName())) {
+          break;
+        }
+      }
+      // Then find the first instance after that is not the 
PipelineOptionsFactory/Builder class.
+      while (elements.hasNext()) {
+        StackTraceElement next = elements.next();
+        if (!PIPELINE_OPTIONS_FACTORY_CLASSES.contains(next.getClassName())
+            && !next.getClassName().contains("com.sun.proxy.$Proxy")
+            && !next.getClassName().equals(options.getRunner().getName())) {
+          try {
+            return Class.forName(next.getClassName());
+          } catch (ClassNotFoundException e) {
+            break;
+          }
+        }
+      }
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ee1cce92/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index b00ba9c..4cba41a 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
   <modules>
     <module>core-construction-java</module>
     <module>core-java</module>
+    <module>map-reduce</module>
     <module>direct-java</module>
     <module>flink</module>
     <module>google-cloud-dataflow-java</module>

Reply via email to