This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33b182cfa946cfe98459207d0c428afbfd041cca
Author: Aleksey Pak <[email protected]>
AuthorDate: Fri May 10 14:30:45 2019 +0200

    [FLINK-12143] PluginLoader: use system level always parents first patterns 
in class loading
---
 .../apache/flink/configuration/CoreOptions.java    |  8 +---
 .../org/apache/flink/core/plugin/PluginConfig.java | 28 ++++++++----
 .../org/apache/flink/core/plugin/PluginLoader.java |  5 +-
 .../apache/flink/core/plugin/PluginManager.java    | 14 ++++--
 .../org/apache/flink/core/plugin/PluginUtils.java  |  4 +-
 .../java/org/apache/flink/util/ArrayUtils.java     | 41 +++++++++++++++++
 .../java/org/apache/flink/util/ArrayUtilsTest.java | 53 ++++++++++++++++++++++
 .../apache/flink/test/plugin/PluginLoaderTest.java |  6 +--
 .../flink/test/plugin/PluginManagerTest.java       |  2 +-
 9 files changed, 134 insertions(+), 27 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index a70c020..6132284 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.docs.ConfigGroup;
 import org.apache.flink.annotation.docs.ConfigGroups;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
+import org.apache.flink.util.ArrayUtils;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -116,12 +117,7 @@ public class CoreOptions {
                if (append.isEmpty()) {
                        return basePatterns;
                } else {
-                       String[] appendPatterns = append.split(";");
-
-                       String[] joinedPatterns = new 
String[basePatterns.length + appendPatterns.length];
-                       System.arraycopy(basePatterns, 0, joinedPatterns, 0, 
basePatterns.length);
-                       System.arraycopy(appendPatterns, 0, joinedPatterns, 
basePatterns.length, appendPatterns.length);
-                       return joinedPatterns;
+                       return ArrayUtils.concat(basePatterns, 
append.split(";"));
                }
        }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java
index c4a3b2b..b90c87c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java
@@ -20,6 +20,7 @@ package org.apache.flink.core.plugin;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 
 import java.io.File;
 import java.nio.file.Path;
@@ -28,31 +29,40 @@ import java.util.Optional;
 /**
  * Stores the configuration for plugins mechanism.
  */
+@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
 public class PluginConfig {
        private final Optional<Path> pluginsPath;
 
-       private PluginConfig() {
-               this.pluginsPath = Optional.empty();
-       }
+       private final String[] alwaysParentFirstPatterns;
 
-       private PluginConfig(Path pluginsPath) {
-               this.pluginsPath = Optional.of(pluginsPath);
+       private PluginConfig(Optional<Path> pluginsPath, String[] 
alwaysParentFirstPatterns) {
+               this.pluginsPath = pluginsPath;
+               this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
        }
 
        public Optional<Path> getPluginsPath() {
                return pluginsPath;
        }
 
+       public String[] getAlwaysParentFirstPatterns() {
+               return alwaysParentFirstPatterns;
+       }
+
        public static PluginConfig fromConfiguration(Configuration 
configuration) {
+               return new PluginConfig(
+                       getPluginsDirPath(configuration),
+                       
CoreOptions.getParentFirstLoaderPatterns(configuration));
+       }
+
+       private static Optional<Path> getPluginsDirPath(Configuration 
configuration) {
                String pluginsDir = 
configuration.getString(ConfigConstants.ENV_FLINK_PLUGINS_DIR, null);
                if (pluginsDir == null) {
-                       return new PluginConfig();
+                       return Optional.empty();
                }
-
                File pluginsDirFile = new File(pluginsDir);
                if (!pluginsDirFile.isDirectory()) {
-                       return new PluginConfig();
+                       return Optional.empty();
                }
-               return new PluginConfig(pluginsDirFile.toPath());
+               return Optional.of(pluginsDirFile.toPath());
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
index 70468bf..f90bda8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.plugin;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ArrayUtils;
 import org.apache.flink.util.ChildFirstClassLoader;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -39,12 +40,12 @@ public class PluginLoader {
        private final ClassLoader pluginClassLoader;
 
        @VisibleForTesting
-       public PluginLoader(PluginDescriptor pluginDescriptor, ClassLoader 
parentClassLoader) {
+       public PluginLoader(PluginDescriptor pluginDescriptor, ClassLoader 
parentClassLoader, String[] alwaysParentFirstPatterns) {
                this.pluginClassLoader =
                        new ChildFirstClassLoader(
                                pluginDescriptor.getPluginResourceURLs(),
                                parentClassLoader,
-                               pluginDescriptor.getLoaderExcludePatterns());
+                               ArrayUtils.concat(alwaysParentFirstPatterns, 
pluginDescriptor.getLoaderExcludePatterns()));
        }
 
        /**
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java
index 00971fa..c6835a2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 
@@ -41,13 +42,17 @@ public class PluginManager {
        /** A collection of descriptions of all plugins known to this plugin 
manager. */
        private final Collection<PluginDescriptor> pluginDescriptors;
 
-       public PluginManager(Collection<PluginDescriptor> pluginDescriptors) {
-               this(pluginDescriptors, PluginManager.class.getClassLoader());
+       /** List of patterns for classes that should always be resolved from 
the parent ClassLoader. */
+       private final String[] alwaysParentFirstPatterns;
+
+       public PluginManager(Collection<PluginDescriptor> pluginDescriptors, 
String[] alwaysParentFirstPatterns) {
+               this(pluginDescriptors, PluginManager.class.getClassLoader(), 
alwaysParentFirstPatterns);
        }
 
-       public PluginManager(Collection<PluginDescriptor> pluginDescriptors, 
ClassLoader parentClassLoader) {
+       public PluginManager(Collection<PluginDescriptor> pluginDescriptors, 
ClassLoader parentClassLoader, String[] alwaysParentFirstPatterns) {
                this.pluginDescriptors = pluginDescriptors;
                this.parentClassLoader = parentClassLoader;
+               this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
        }
 
        /**
@@ -61,7 +66,7 @@ public class PluginManager {
        public <P extends Plugin> Iterator<P> load(Class<P> service) {
                ArrayList<Iterator<P>> combinedIterators = new 
ArrayList<>(pluginDescriptors.size());
                for (PluginDescriptor pluginDescriptor : pluginDescriptors) {
-                       PluginLoader pluginLoader = new 
PluginLoader(pluginDescriptor, parentClassLoader);
+                       PluginLoader pluginLoader = new 
PluginLoader(pluginDescriptor, parentClassLoader, alwaysParentFirstPatterns);
                        combinedIterators.add(pluginLoader.load(service));
                }
                return Iterators.concat(combinedIterators.iterator());
@@ -72,6 +77,7 @@ public class PluginManager {
                return "PluginManager{" +
                        "parentClassLoader=" + parentClassLoader +
                        ", pluginDescriptors=" + pluginDescriptors +
+                       ", alwaysParentFirstPatterns=" + 
Arrays.toString(alwaysParentFirstPatterns) +
                        '}';
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
index 0f9f0d3..b6500bf 100644
--- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
@@ -43,13 +43,13 @@ public final class PluginUtils {
                        try {
                                Collection<PluginDescriptor> pluginDescriptors =
                                        new 
DirectoryBasedPluginFinder(pluginConfig.getPluginsPath().get()).findPlugins();
-                               return new PluginManager(pluginDescriptors);
+                               return new PluginManager(pluginDescriptors, 
pluginConfig.getAlwaysParentFirstPatterns());
                        } catch (IOException e) {
                                throw new FlinkRuntimeException("Exception when 
trying to initialize plugin system.", e);
                        }
                }
                else {
-                       return new PluginManager(Collections.emptyList());
+                       return new PluginManager(Collections.emptyList(), 
pluginConfig.getAlwaysParentFirstPatterns());
                }
        }
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/ArrayUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ArrayUtils.java
new file mode 100644
index 0000000..9877dba
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ArrayUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Utility class for Java arrays.
+ */
+@Internal
+public final class ArrayUtils {
+
+       public static String[] concat(String[] array1, String[] array2) {
+               if (array1.length == 0) {
+                       return array2;
+               }
+               if (array2.length == 0) {
+                       return array1;
+               }
+               String[] resultArray = new String[array1.length + 
array2.length];
+               System.arraycopy(array1, 0, resultArray, 0, array1.length);
+               System.arraycopy(array2, 0, resultArray, array1.length, 
array2.length);
+               return resultArray;
+       }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/ArrayUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/ArrayUtilsTest.java
new file mode 100644
index 0000000..775bae5
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/ArrayUtilsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link ArrayUtils}.
+ */
+public class ArrayUtilsTest extends TestLogger {
+
+       @Test
+       public void concatWithEmptyArray() {
+               String[] emptyArray = new String[] {};
+               String[] nonEmptyArray = new String[] {"some value"};
+
+               assertThat("Should return the non empty array",
+                       ArrayUtils.concat(emptyArray, nonEmptyArray), 
sameInstance(nonEmptyArray));
+
+               assertThat("Should return the non empty array",
+                       ArrayUtils.concat(nonEmptyArray, emptyArray), 
sameInstance(nonEmptyArray));
+       }
+
+       @Test
+       public void concatArrays() {
+               String[] array1 = new String[] {"A", "B", "C", "D", "E", "F", 
"G"};
+               String[] array2 = new String[] {"1", "2", "3"};
+
+               assertThat(ArrayUtils.concat(array1, array2), is(new 
String[]{"A", "B", "C", "D", "E", "F", "G", "1", "2", "3"}));
+
+               assertThat(ArrayUtils.concat(array2, array1), is(new 
String[]{"1", "2", "3", "A", "B", "C", "D", "E", "F", "G"}));
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java
index 2460ad9..52bad19 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java
@@ -31,7 +31,7 @@ import java.util.Iterator;
 /**
  * Test for {@link PluginLoader}.
  */
-public class PluginLoaderTest extends PluginTestBase{
+public class PluginLoaderTest extends PluginTestBase {
 
        @Test
        public void testPluginLoading() throws Exception {
@@ -39,7 +39,7 @@ public class PluginLoaderTest extends PluginTestBase{
                final URL classpathA = createPluginJarURLFromString(PLUGIN_A);
 
                PluginDescriptor pluginDescriptorA = new PluginDescriptor("A", 
new URL[]{classpathA}, new String[0]);
-               final PluginLoader pluginLoaderA = new 
PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER);
+               final PluginLoader pluginLoaderA = new 
PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER, new String[0]);
 
                Iterator<TestSpi> testSpiIteratorA = 
pluginLoaderA.load(TestSpi.class);
 
@@ -59,7 +59,7 @@ public class PluginLoaderTest extends PluginTestBase{
                Assert.assertFalse(testSpiA instanceof TestServiceA);
 
                // In the following we check for isolation of classes between 
different plugin loaders.
-               final PluginLoader secondPluginLoaderA = new 
PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER);
+               final PluginLoader secondPluginLoaderA = new 
PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER, new String[0]);
 
                TestSpi secondTestSpiA = 
secondPluginLoaderA.load(TestSpi.class).next();
                Assert.assertNotNull(secondTestSpiA.testMethod());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java
index af5667c..7439da1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java
@@ -83,7 +83,7 @@ public class PluginManagerTest extends PluginTestBase {
        @Test
        public void testPluginLoading() {
 
-               final PluginManager pluginManager = new 
PluginManager(descriptors, PARENT_CLASS_LOADER);
+               final PluginManager pluginManager = new 
PluginManager(descriptors, PARENT_CLASS_LOADER, new String[0]);
                final List<TestSpi> serviceImplList = 
Lists.newArrayList(pluginManager.load(TestSpi.class));
                Assert.assertEquals(2, serviceImplList.size());
 

Reply via email to