[FLINK-6606] Hide WrapperMasterHook by making it private
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6a596fe Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6a596fe Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6a596fe Branch: refs/heads/release-1.3 Commit: f6a596fe3f8924602c34636959e3d7e48cb2262f Parents: 3dba48e Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri May 19 15:06:35 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri May 19 15:21:24 2017 +0200 ---------------------------------------------------------------------- .../runtime/checkpoint/hooks/MasterHooks.java | 41 +++++++++++--------- .../executiongraph/ExecutionGraphBuilder.java | 5 ++- .../checkpoint/hooks/MasterHooksTest.java | 11 ++---- 3 files changed, 29 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java index 737e816..1851eb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.checkpoint.hooks; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -28,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import javax.annotation.Nullable; @@ -278,25 +278,25 @@ public class MasterHooks { * @param userClassLoader the classloader to use */ public static <T> MasterTriggerRestoreHook<T> wrapHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) { - return new WrappedMasterHook<T>(hook, userClassLoader); + return new WrappedMasterHook<>(hook, userClassLoader); } - @VisibleForTesting - static class WrappedMasterHook<T> implements MasterTriggerRestoreHook<T> { + private static class WrappedMasterHook<T> implements MasterTriggerRestoreHook<T> { private final MasterTriggerRestoreHook<T> hook; private final ClassLoader userClassLoader; WrappedMasterHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) { - this.hook = hook; - this.userClassLoader = userClassLoader; + this.hook = Preconditions.checkNotNull(hook); + this.userClassLoader = Preconditions.checkNotNull(userClassLoader); } @Override public String getIdentifier() { - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader(); thread.setContextClassLoader(userClassLoader); + try { return hook.getIdentifier(); } @@ -315,9 +315,10 @@ public class MasterHooks { } }; - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader(); thread.setContextClassLoader(userClassLoader); + try { return hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor); } @@ -328,9 +329,10 @@ public class MasterHooks { @Override public void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception { - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader(); thread.setContextClassLoader(userClassLoader); + try { hook.restoreCheckpoint(checkpointId, checkpointData); } @@ -342,9 +344,10 @@ public class MasterHooks { @Nullable @Override public SimpleVersionedSerializer<T> createCheckpointDataSerializer() { - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader(); thread.setContextClassLoader(userClassLoader); + try { return hook.createCheckpointDataSerializer(); } @@ -353,17 +356,19 @@ public class MasterHooks { } } - class WrappedCommand implements Runnable { + private class WrappedCommand implements Runnable { private final Runnable command; + WrappedCommand(Runnable command) { - this.command = command; + this.command = Preconditions.checkNotNull(command); } @Override public void run() { - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader(); thread.setContextClassLoader(userClassLoader); + try { command.run(); } http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index ecac2e4..db22da6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -258,9 +258,10 @@ public class ExecutionGraphBuilder { throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e); } - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader(); thread.setContextClassLoader(classLoader); + try { hooks = new ArrayList<>(hookFactories.length); for (MasterTriggerRestoreHook.Factory factory : hookFactories) { http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java index 3f8a48c..f4270dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint.hooks; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.TestLogger; import org.junit.Test; import javax.annotation.Nullable; @@ -31,7 +32,6 @@ import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -39,7 +39,7 @@ import static org.mockito.Mockito.verify; /** * Tests for the MasterHooks utility class. */ -public class MasterHooksTest { +public class MasterHooksTest extends TestLogger { // ------------------------------------------------------------------------ // hook management @@ -117,15 +117,10 @@ public class MasterHooksTest { private static class TestExecutor implements Executor { Runnable command; + @Override public void execute(Runnable command) { this.command = command; } } - - private static <T> T mockGeneric(Class<?> clazz) { - @SuppressWarnings("unchecked") - Class<T> typedClass = (Class<T>) clazz; - return mock(typedClass); - } }