This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e86951f  SAMZA-2514: Refactor codes to make runWithClassLoader method 
more generic (#1349)
e86951f is described below

commit e86951f102c7a854d343b1f80dc2796be29c9c81
Author: Alan Zhang <[email protected]>
AuthorDate: Fri Apr 24 09:40:25 2020 -0700

    SAMZA-2514: Refactor codes to make runWithClassLoader method more generic 
(#1349)
    
    API/Usage changes: None
---
 .../clustermanager/ClusterBasedJobCoordinator.java | 65 +---------------
 .../org/apache/samza/util/SplitDeploymentUtil.java | 88 ++++++++++++++++++++++
 .../TestClusterBasedJobCoordinator.java            | 34 ---------
 .../apache/samza/util/TestSplitDeploymentUtil.java | 71 +++++++++++++++++
 4 files changed, 163 insertions(+), 95 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 0bd66dc..ed99ad4 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -20,7 +20,6 @@ package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -71,6 +70,7 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.ConfigUtil;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.SplitDeploymentUtil;
 import org.apache.samza.util.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -464,78 +464,21 @@ public class ClusterBasedJobCoordinator {
    * The entry point for the {@link ClusterBasedJobCoordinator}.
    */
   public static void main(String[] args) {
-    boolean dependencyIsolationEnabled = Boolean.parseBoolean(
-        System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
     Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
         LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. 
Exiting job coordinator", exception);
         System.exit(1);
       });
-    if (!dependencyIsolationEnabled) {
+    if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
       // no isolation enabled, so can just execute 
runClusterBasedJobCoordinator directly
       runClusterBasedJobCoordinator(args);
     } else {
-      runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(), 
args);
+      SplitDeploymentUtil.runWithClassLoader(new 
IsolatingClassLoaderFactory().buildClassLoader(),
+          ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", 
args);
     }
     System.exit(0);
   }
 
   /**
-   * Execute the coordinator using a separate isolated classloader.
-   * @param classLoader {@link ClassLoader} to use to load the {@link 
ClusterBasedJobCoordinator} which will run
-   * @param args arguments to pass when running the {@link 
ClusterBasedJobCoordinator}
-   */
-  @VisibleForTesting
-  static void runWithClassLoader(ClassLoader classLoader, String[] args) {
-    // need to use the isolated classloader to load ClusterBasedJobCoordinator 
and then run using that new class
-    Class<?> clusterBasedJobCoordinatorClass;
-    try {
-      clusterBasedJobCoordinatorClass = 
classLoader.loadClass(ClusterBasedJobCoordinator.class.getName());
-    } catch (ClassNotFoundException e) {
-      throw new SamzaException(
-          "Isolation was enabled, but unable to find 
ClusterBasedJobCoordinator in isolated classloader", e);
-    }
-
-    // save the current context classloader so it can be reset after finishing 
the call to runClusterBasedJobCoordinator
-    ClassLoader previousContextClassLoader = 
Thread.currentThread().getContextClassLoader();
-    // this is needed because certain libraries (e.g. log4j) use the context 
classloader
-    Thread.currentThread().setContextClassLoader(classLoader);
-
-    try {
-      
executeRunClusterBasedJobCoordinatorForClass(clusterBasedJobCoordinatorClass, 
args);
-    } finally {
-      // reset the context class loader; it's good practice, and could be 
important when running a test suite
-      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
-    }
-  }
-
-  /**
-   * Runs the {@link 
ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])} method of 
the given
-   * {@code clusterBasedJobCoordinatorClass} using reflection.
-   * @param clusterBasedJobCoordinatorClass {@link ClusterBasedJobCoordinator} 
{@link Class} for which to execute
-   * {@link ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])}
-   * @param args arguments to pass to {@link 
ClusterBasedJobCoordinator#runClusterBasedJobCoordinator(String[])}
-   */
-  private static void executeRunClusterBasedJobCoordinatorForClass(Class<?> 
clusterBasedJobCoordinatorClass,
-      String[] args) {
-    Method runClusterBasedJobCoordinatorMethod;
-    try {
-      runClusterBasedJobCoordinatorMethod =
-          
clusterBasedJobCoordinatorClass.getDeclaredMethod("runClusterBasedJobCoordinator",
 String[].class);
-    } catch (NoSuchMethodException e) {
-      throw new SamzaException("Isolation was enabled, but unable to find 
runClusterBasedJobCoordinator method", e);
-    }
-    // only sets accessible flag for this Method instance, not other Method 
instances for runClusterBasedJobCoordinator
-    runClusterBasedJobCoordinatorMethod.setAccessible(true);
-
-    try {
-      // wrapping args in object array so that args is passed as a single 
argument to the method
-      runClusterBasedJobCoordinatorMethod.invoke(null, new Object[]{args});
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      throw new SamzaException("Exception while executing 
runClusterBasedJobCoordinator method", e);
-    }
-  }
-
-  /**
    * This is the actual execution for the {@link ClusterBasedJobCoordinator}. 
This is separated out from
    * {@link #main(String[])} so that it can be executed directly or from a 
separate classloader.
    */
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
new file mode 100644
index 0000000..200cd3c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ShellCommandConfig;
+
+
+public final class SplitDeploymentUtil {
+
+  /**
+   * The split deployment feature uses system env {@code 
ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED} to represent
+   * if the user chooses to enable it.
+   * This function helps to detect if the split deployment feature is enabled.
+   *
+   * @return true if split deployment is enabled; vice versa
+   */
+  public static boolean isSplitDeploymentEnabled() {
+    return 
Boolean.parseBoolean(System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
+  }
+
+  /**
+   * Execute the runner class using a separate isolated classloader.
+   * @param classLoader {@link ClassLoader} to use to load the runner class 
which will run
+   * @param originalRunnerClass {@link Class} for which will be executed with 
the new class loader.
+   * @param runMethodName run method name of runner class
+   * @param runMethodArgs arguments to pass to run method
+   */
+  public static void runWithClassLoader(ClassLoader classLoader, Class<?> 
originalRunnerClass, String runMethodName,
+      String[] runMethodArgs) {
+    // need to use the isolated classloader to load run method and then 
execute using that new class
+    Class<?> runnerClass;
+    try {
+      runnerClass = classLoader.loadClass(originalRunnerClass.getName());
+    } catch (ClassNotFoundException e) {
+      throw new SamzaException(String.format(
+          "Isolation was enabled, but unable to find %s in isolated 
classloader", originalRunnerClass.getName()), e);
+    }
+
+    // save the current context classloader so it can be reset after finishing 
the call to run method
+    ClassLoader previousContextClassLoader = 
Thread.currentThread().getContextClassLoader();
+    // this is needed because certain libraries (e.g. log4j) use the context 
classloader
+    Thread.currentThread().setContextClassLoader(classLoader);
+
+    try {
+      executeRunForRunnerClass(runnerClass, runMethodName, runMethodArgs);
+    } finally {
+      // reset the context class loader; it's good practice, and could be 
important when running a test suite
+      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
+    }
+  }
+
+  private static void executeRunForRunnerClass(Class<?> runnerClass, String 
runMethodName, String[] runMethodArgs) {
+    Method runMethod;
+    try {
+      runMethod = runnerClass.getDeclaredMethod(runMethodName, String[].class);
+    } catch (NoSuchMethodException e) {
+      throw new SamzaException(String.format("Isolation was enabled, but 
unable to find %s method", runMethodName), e);
+    }
+    // only sets accessible flag for this method instance
+    runMethod.setAccessible(true);
+
+    try {
+      // wrapping args in object array so that args is passed as a single 
argument to the method
+      runMethod.invoke(null, new Object[]{runMethodArgs});
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      throw new SamzaException(String.format("Exception while executing %s 
method", runMethodName), e);
+    }
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 967bc81..6444451 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -54,7 +54,6 @@ import 
org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.*;
-import static org.mockito.AdditionalMatchers.aryEq;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
@@ -62,7 +61,6 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.verifyPrivate;
 
 
 /**
@@ -176,38 +174,6 @@ public class TestClusterBasedJobCoordinator {
   }
 
   @Test
-  public void testRunWithClassLoader() throws Exception {
-    // partially mock ClusterBasedJobCoordinator (mock 
runClusterBasedJobCoordinator method only)
-    PowerMockito.spy(ClusterBasedJobCoordinator.class);
-    // save the context classloader to make sure that it gets set properly 
once the test is finished
-    ClassLoader previousContextClassLoader = 
Thread.currentThread().getContextClassLoader();
-    ClassLoader classLoader = mock(ClassLoader.class);
-    String[] args = new String[]{"arg0", "arg1"};
-    
doReturn(ClusterBasedJobCoordinator.class).when(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
-
-    // stub the private static method which is called by reflection
-    PowerMockito.doAnswer(invocation -> {
-        // make sure the only calls to this method has the expected arguments
-        assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
-        // checks that the context classloader is set correctly
-        assertEquals(classLoader, 
Thread.currentThread().getContextClassLoader());
-        return null;
-      }).when(ClusterBasedJobCoordinator.class, 
"runClusterBasedJobCoordinator", any());
-
-    try {
-      ClusterBasedJobCoordinator.runWithClassLoader(classLoader, args);
-      assertEquals(previousContextClassLoader, 
Thread.currentThread().getContextClassLoader());
-    } finally {
-      // reset it explicitly just in case runWithClassLoader throws an 
exception
-      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
-    }
-    // make sure that the classloader got used
-    verify(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
-    // make sure runClusterBasedJobCoordinator only got called once
-    
verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator",
 new Object[]{aryEq(args)});
-  }
-
-  @Test
   public void testToArgs() {
     ApplicationConfig appConfig = new ApplicationConfig(new 
MapConfig(ImmutableMap.of(
         JobConfig.JOB_NAME, "test1",
diff --git 
a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java 
b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
new file mode 100644
index 0000000..1336190
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import org.apache.samza.clustermanager.ClusterBasedJobCoordinator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.AdditionalMatchers.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ClusterBasedJobCoordinator.class})
+public class TestSplitDeploymentUtil {
+
+  @Test
+  public void testRunWithIsolatingClassLoader() throws Exception {
+    // partially mock ClusterBasedJobCoordinator (mock 
runClusterBasedJobCoordinator method only)
+    PowerMockito.spy(ClusterBasedJobCoordinator.class);
+    // save the context classloader to make sure that it gets set properly 
once the test is finished
+    ClassLoader previousContextClassLoader = 
Thread.currentThread().getContextClassLoader();
+    ClassLoader classLoader = mock(ClassLoader.class);
+    String[] args = new String[]{"arg0", "arg1"};
+    
doReturn(ClusterBasedJobCoordinator.class).when(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+
+    // stub the private static method which is called by reflection
+    PowerMockito.doAnswer(invocation -> {
+        // make sure the only calls to this method has the expected arguments
+        assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
+        // checks that the context classloader is set correctly
+        assertEquals(classLoader, 
Thread.currentThread().getContextClassLoader());
+        return null;
+      }).when(ClusterBasedJobCoordinator.class, 
"runClusterBasedJobCoordinator", any());
+
+    try {
+      SplitDeploymentUtil.runWithClassLoader(classLoader,
+          ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", 
args);
+      assertEquals(previousContextClassLoader, 
Thread.currentThread().getContextClassLoader());
+    } finally {
+      // reset it explicitly just in case runWithClassLoader throws an 
exception
+      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
+    }
+    // make sure that the classloader got used
+    verify(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+    // make sure runClusterBasedJobCoordinator only got called once
+    
verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator",
 new Object[]{aryEq(args)});
+  }
+}

Reply via email to