This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 33b182cfa946cfe98459207d0c428afbfd041cca Author: Aleksey Pak <[email protected]> AuthorDate: Fri May 10 14:30:45 2019 +0200 [FLINK-12143] PluginLoader: use system level always parents first patterns in class loading --- .../apache/flink/configuration/CoreOptions.java | 8 +--- .../org/apache/flink/core/plugin/PluginConfig.java | 28 ++++++++---- .../org/apache/flink/core/plugin/PluginLoader.java | 5 +- .../apache/flink/core/plugin/PluginManager.java | 14 ++++-- .../org/apache/flink/core/plugin/PluginUtils.java | 4 +- .../java/org/apache/flink/util/ArrayUtils.java | 41 +++++++++++++++++ .../java/org/apache/flink/util/ArrayUtilsTest.java | 53 ++++++++++++++++++++++ .../apache/flink/test/plugin/PluginLoaderTest.java | 6 +-- .../flink/test/plugin/PluginManagerTest.java | 2 +- 9 files changed, 134 insertions(+), 27 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index a70c020..6132284 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.docs.ConfigGroup; import org.apache.flink.annotation.docs.ConfigGroups; import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.description.Description; +import org.apache.flink.util.ArrayUtils; import static org.apache.flink.configuration.ConfigOptions.key; @@ -116,12 +117,7 @@ public class CoreOptions { if (append.isEmpty()) { return basePatterns; } else { - String[] appendPatterns = append.split(";"); - - String[] joinedPatterns = new String[basePatterns.length + appendPatterns.length]; - System.arraycopy(basePatterns, 0, joinedPatterns, 0, basePatterns.length); - System.arraycopy(appendPatterns, 0, joinedPatterns, basePatterns.length, appendPatterns.length); - return joinedPatterns; + return ArrayUtils.concat(basePatterns, append.split(";")); } } diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java index c4a3b2b..b90c87c 100644 --- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java @@ -20,6 +20,7 @@ package org.apache.flink.core.plugin; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import java.io.File; import java.nio.file.Path; @@ -28,31 +29,40 @@ import java.util.Optional; /** * Stores the configuration for plugins mechanism. */ +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") public class PluginConfig { private final Optional<Path> pluginsPath; - private PluginConfig() { - this.pluginsPath = Optional.empty(); - } + private final String[] alwaysParentFirstPatterns; - private PluginConfig(Path pluginsPath) { - this.pluginsPath = Optional.of(pluginsPath); + private PluginConfig(Optional<Path> pluginsPath, String[] alwaysParentFirstPatterns) { + this.pluginsPath = pluginsPath; + this.alwaysParentFirstPatterns = alwaysParentFirstPatterns; } public Optional<Path> getPluginsPath() { return pluginsPath; } + public String[] getAlwaysParentFirstPatterns() { + return alwaysParentFirstPatterns; + } + public static PluginConfig fromConfiguration(Configuration configuration) { + return new PluginConfig( + getPluginsDirPath(configuration), + CoreOptions.getParentFirstLoaderPatterns(configuration)); + } + + private static Optional<Path> getPluginsDirPath(Configuration configuration) { String pluginsDir = configuration.getString(ConfigConstants.ENV_FLINK_PLUGINS_DIR, null); if (pluginsDir == null) { - return new PluginConfig(); + return Optional.empty(); } - File pluginsDirFile = new File(pluginsDir); if (!pluginsDirFile.isDirectory()) { - return new PluginConfig(); + return Optional.empty(); } - return new PluginConfig(pluginsDirFile.toPath()); + return Optional.of(pluginsDirFile.toPath()); } } diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java index 70468bf..f90bda8 100644 --- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java @@ -19,6 +19,7 @@ package org.apache.flink.core.plugin; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.ArrayUtils; import org.apache.flink.util.ChildFirstClassLoader; import javax.annotation.concurrent.ThreadSafe; @@ -39,12 +40,12 @@ public class PluginLoader { private final ClassLoader pluginClassLoader; @VisibleForTesting - public PluginLoader(PluginDescriptor pluginDescriptor, ClassLoader parentClassLoader) { + public PluginLoader(PluginDescriptor pluginDescriptor, ClassLoader parentClassLoader, String[] alwaysParentFirstPatterns) { this.pluginClassLoader = new ChildFirstClassLoader( pluginDescriptor.getPluginResourceURLs(), parentClassLoader, - pluginDescriptor.getLoaderExcludePatterns()); + ArrayUtils.concat(alwaysParentFirstPatterns, pluginDescriptor.getLoaderExcludePatterns())); } /** diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java index 00971fa..c6835a2 100644 --- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java @@ -25,6 +25,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; import javax.annotation.concurrent.ThreadSafe; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -41,13 +42,17 @@ public class PluginManager { /** A collection of descriptions of all plugins known to this plugin manager. */ private final Collection<PluginDescriptor> pluginDescriptors; - public PluginManager(Collection<PluginDescriptor> pluginDescriptors) { - this(pluginDescriptors, PluginManager.class.getClassLoader()); + /** List of patterns for classes that should always be resolved from the parent ClassLoader. */ + private final String[] alwaysParentFirstPatterns; + + public PluginManager(Collection<PluginDescriptor> pluginDescriptors, String[] alwaysParentFirstPatterns) { + this(pluginDescriptors, PluginManager.class.getClassLoader(), alwaysParentFirstPatterns); } - public PluginManager(Collection<PluginDescriptor> pluginDescriptors, ClassLoader parentClassLoader) { + public PluginManager(Collection<PluginDescriptor> pluginDescriptors, ClassLoader parentClassLoader, String[] alwaysParentFirstPatterns) { this.pluginDescriptors = pluginDescriptors; this.parentClassLoader = parentClassLoader; + this.alwaysParentFirstPatterns = alwaysParentFirstPatterns; } /** @@ -61,7 +66,7 @@ public class PluginManager { public <P extends Plugin> Iterator<P> load(Class<P> service) { ArrayList<Iterator<P>> combinedIterators = new ArrayList<>(pluginDescriptors.size()); for (PluginDescriptor pluginDescriptor : pluginDescriptors) { - PluginLoader pluginLoader = new PluginLoader(pluginDescriptor, parentClassLoader); + PluginLoader pluginLoader = new PluginLoader(pluginDescriptor, parentClassLoader, alwaysParentFirstPatterns); combinedIterators.add(pluginLoader.load(service)); } return Iterators.concat(combinedIterators.iterator()); @@ -72,6 +77,7 @@ public class PluginManager { return "PluginManager{" + "parentClassLoader=" + parentClassLoader + ", pluginDescriptors=" + pluginDescriptors + + ", alwaysParentFirstPatterns=" + Arrays.toString(alwaysParentFirstPatterns) + '}'; } } diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java index 0f9f0d3..b6500bf 100644 --- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java @@ -43,13 +43,13 @@ public final class PluginUtils { try { Collection<PluginDescriptor> pluginDescriptors = new DirectoryBasedPluginFinder(pluginConfig.getPluginsPath().get()).findPlugins(); - return new PluginManager(pluginDescriptors); + return new PluginManager(pluginDescriptors, pluginConfig.getAlwaysParentFirstPatterns()); } catch (IOException e) { throw new FlinkRuntimeException("Exception when trying to initialize plugin system.", e); } } else { - return new PluginManager(Collections.emptyList()); + return new PluginManager(Collections.emptyList(), pluginConfig.getAlwaysParentFirstPatterns()); } } } diff --git a/flink-core/src/main/java/org/apache/flink/util/ArrayUtils.java b/flink-core/src/main/java/org/apache/flink/util/ArrayUtils.java new file mode 100644 index 0000000..9877dba --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/ArrayUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Internal; + +/** + * Utility class for Java arrays. + */ +@Internal +public final class ArrayUtils { + + public static String[] concat(String[] array1, String[] array2) { + if (array1.length == 0) { + return array2; + } + if (array2.length == 0) { + return array1; + } + String[] resultArray = new String[array1.length + array2.length]; + System.arraycopy(array1, 0, resultArray, 0, array1.length); + System.arraycopy(array2, 0, resultArray, array1.length, array2.length); + return resultArray; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/util/ArrayUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/ArrayUtilsTest.java new file mode 100644 index 0000000..775bae5 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/util/ArrayUtilsTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link ArrayUtils}. + */ +public class ArrayUtilsTest extends TestLogger { + + @Test + public void concatWithEmptyArray() { + String[] emptyArray = new String[] {}; + String[] nonEmptyArray = new String[] {"some value"}; + + assertThat("Should return the non empty array", + ArrayUtils.concat(emptyArray, nonEmptyArray), sameInstance(nonEmptyArray)); + + assertThat("Should return the non empty array", + ArrayUtils.concat(nonEmptyArray, emptyArray), sameInstance(nonEmptyArray)); + } + + @Test + public void concatArrays() { + String[] array1 = new String[] {"A", "B", "C", "D", "E", "F", "G"}; + String[] array2 = new String[] {"1", "2", "3"}; + + assertThat(ArrayUtils.concat(array1, array2), is(new String[]{"A", "B", "C", "D", "E", "F", "G", "1", "2", "3"})); + + assertThat(ArrayUtils.concat(array2, array1), is(new String[]{"1", "2", "3", "A", "B", "C", "D", "E", "F", "G"})); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java index 2460ad9..52bad19 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java @@ -31,7 +31,7 @@ import java.util.Iterator; /** * Test for {@link PluginLoader}. */ -public class PluginLoaderTest extends PluginTestBase{ +public class PluginLoaderTest extends PluginTestBase { @Test public void testPluginLoading() throws Exception { @@ -39,7 +39,7 @@ public class PluginLoaderTest extends PluginTestBase{ final URL classpathA = createPluginJarURLFromString(PLUGIN_A); PluginDescriptor pluginDescriptorA = new PluginDescriptor("A", new URL[]{classpathA}, new String[0]); - final PluginLoader pluginLoaderA = new PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER); + final PluginLoader pluginLoaderA = new PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER, new String[0]); Iterator<TestSpi> testSpiIteratorA = pluginLoaderA.load(TestSpi.class); @@ -59,7 +59,7 @@ public class PluginLoaderTest extends PluginTestBase{ Assert.assertFalse(testSpiA instanceof TestServiceA); // In the following we check for isolation of classes between different plugin loaders. - final PluginLoader secondPluginLoaderA = new PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER); + final PluginLoader secondPluginLoaderA = new PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER, new String[0]); TestSpi secondTestSpiA = secondPluginLoaderA.load(TestSpi.class).next(); Assert.assertNotNull(secondTestSpiA.testMethod()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java index af5667c..7439da1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java @@ -83,7 +83,7 @@ public class PluginManagerTest extends PluginTestBase { @Test public void testPluginLoading() { - final PluginManager pluginManager = new PluginManager(descriptors, PARENT_CLASS_LOADER); + final PluginManager pluginManager = new PluginManager(descriptors, PARENT_CLASS_LOADER, new String[0]); final List<TestSpi> serviceImplList = Lists.newArrayList(pluginManager.load(TestSpi.class)); Assert.assertEquals(2, serviceImplList.size());
