This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80452f19e1712fd09d3b6b9a99c82d954f471a88
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Jul 30 12:00:07 2019 +0200

    [FLINK-13499][maprfs] Handle MapR dependency purely through reflection
    
    This allows us to remove the MapR dependency from the module.
    The MapR maven dependency has frequently caused issues.
---
 flink-filesystems/flink-mapr-fs/pom.xml            |  38 +----
 .../flink/runtime/fs/maprfs/MapRFileSystem.java    | 181 ---------------------
 .../flink/runtime/fs/maprfs/MapRFsFactory.java     | 170 ++++++++++++++++++-
 .../src/test/java/com/mapr/fs/MapRFileSystem.java  |  90 ++++++++++
 tools/travis_controller.sh                         |   3 +-
 5 files changed, 263 insertions(+), 219 deletions(-)

diff --git a/flink-filesystems/flink-mapr-fs/pom.xml 
b/flink-filesystems/flink-mapr-fs/pom.xml
index 2683a8e..955f4d0 100644
--- a/flink-filesystems/flink-mapr-fs/pom.xml
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -32,35 +32,6 @@ under the License.
 
        <packaging>jar</packaging>
 
-       <repositories>
-               <repository>
-                       <id>mapr-releases</id>
-                       <url>https://repository.mapr.com/maven/</url>
-                       <snapshots><enabled>false</enabled></snapshots>
-                       <releases><enabled>true</enabled></releases>
-               </repository>
-       </repositories>
-
-       <profiles>
-               <profile>
-                       <id>unsafe-mapr-repo</id>
-                       <activation>
-                               <property>
-                                       <name>unsafe-mapr-repo</name>
-                               </property>
-                       </activation>
-                       <repositories>
-                               <!-- MapR -->
-                               <repository>
-                                       <id>mapr-releases</id>
-                                       
<url>http://repository.mapr.com/maven/</url>
-                                       
<snapshots><enabled>false</enabled></snapshots>
-                                       
<releases><enabled>true</enabled></releases>
-                               </repository>
-                       </repositories>
-               </profile>
-       </profiles>
-
        <dependencies>
 
                <dependency>
@@ -75,13 +46,10 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
-               <!-- MapR dependencies as optional dependency, so we can hard 
depend on this without -->
-               <!-- pulling in MapR libraries by default -->
-
                <dependency>
-                       <groupId>com.mapr.hadoop</groupId>
-                       <artifactId>maprfs</artifactId>
-                       <version>5.2.1-mapr</version>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-hadoop2</artifactId>
+                       <version>${hadoop.version}-${project.version}</version>
                        <optional>true</optional>
                </dependency>
 
diff --git 
a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
 
b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
deleted file mode 100644
index 5aec4a4..0000000
--- 
a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.maprfs;
-
-import org.apache.flink.core.fs.FileSystemKind;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A MapR file system client for Flink.
- *
- * <p>Internally, this class wraps the {@link org.apache.hadoop.fs.FileSystem} 
implementation
- * of the MapR file system client.
- */
-public class MapRFileSystem extends HadoopFileSystem {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(MapRFileSystem.class);
-
-       /** Name of the environment variable to determine the location of the 
MapR
-        * installation. */
-       private static final String MAPR_HOME_ENV = "MAPR_HOME";
-
-       /** The default location of the MapR installation. */
-       private static final String DEFAULT_MAPR_HOME = "/opt/mapr/";
-
-       /** The path relative to the MAPR_HOME where MapR stores how to access 
the
-        * configured clusters. */
-       private static final String MAPR_CLUSTER_CONF_FILE = 
"/conf/mapr-clusters.conf";
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a MapRFileSystem for the given URI.
-        *
-        * @param fsUri The URI describing the file system
-        * @throws IOException Thrown if the file system could not be 
initialized.
-        */
-       public MapRFileSystem(URI fsUri) throws IOException {
-               super(instantiateMapRFileSystem(fsUri));
-       }
-
-       private static org.apache.hadoop.fs.FileSystem 
instantiateMapRFileSystem(URI fsUri) throws IOException {
-               checkNotNull(fsUri, "fsUri");
-
-               final org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
-               final com.mapr.fs.MapRFileSystem fs;
-
-               final String authority = fsUri.getAuthority();
-               if (authority == null || authority.isEmpty()) {
-
-                       // Use the default constructor to instantiate MapR file 
system object
-                       fs = new com.mapr.fs.MapRFileSystem();
-               }
-               else {
-                       // We have an authority, check the MapR cluster 
configuration to
-                       // find the CLDB locations.
-                       final String[] cldbLocations = 
getCLDBLocations(authority);
-                       fs = new com.mapr.fs.MapRFileSystem(authority, 
cldbLocations);
-               }
-
-               // now initialize the Hadoop File System object
-               fs.initialize(fsUri, conf);
-
-               return fs;
-       }
-
-       /**
-        * Retrieves the CLDB locations for the given MapR cluster name.
-        *
-        * @param authority
-        *            the name of the MapR cluster
-        * @return a list of CLDB locations
-        * @throws IOException
-        *             thrown if the CLDB locations for the given MapR cluster 
name
-        *             cannot be determined
-        */
-       private static String[] getCLDBLocations(String authority) throws 
IOException {
-
-               // Determine the MapR home
-               String maprHome = System.getenv(MAPR_HOME_ENV);
-               if (maprHome == null) {
-                       maprHome = DEFAULT_MAPR_HOME;
-               }
-
-               final File maprClusterConf = new File(maprHome, 
MAPR_CLUSTER_CONF_FILE);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(String.format(
-                                       "Trying to retrieve MapR cluster 
configuration from %s",
-                                       maprClusterConf));
-               }
-
-               if (!maprClusterConf.exists()) {
-                       throw new IOException("Could not find CLDB 
configuration '" + maprClusterConf.getAbsolutePath() +
-                                       "', assuming MapR home is '" + maprHome 
+ "'.");
-               }
-
-               // Read the cluster configuration file, format is specified at
-               // http://doc.mapr.com/display/MapR/mapr-clusters.conf
-
-               try (BufferedReader br = new BufferedReader(new 
FileReader(maprClusterConf))) {
-
-                       String line;
-                       while ((line = br.readLine()) != null) {
-
-                               // Normalize the string
-                               line = line.trim();
-                               line = line.replace('\t', ' ');
-
-                               final String[] fields = line.split(" ");
-                               if (fields.length < 1) {
-                                       continue;
-                               }
-
-                               final String clusterName = fields[0];
-
-                               if (!clusterName.equals(authority)) {
-                                       continue;
-                               }
-
-                               final List<String> cldbLocations = new 
ArrayList<>();
-
-                               for (int i = 1; i < fields.length; ++i) {
-
-                                       // Make sure this is not a key-value 
pair MapR recently
-                                       // introduced in the file format along 
with their security
-                                       // features.
-                                       if (!fields[i].isEmpty() && 
!fields[i].contains("=")) {
-                                               cldbLocations.add(fields[i]);
-                                       }
-                               }
-
-                               if (cldbLocations.isEmpty()) {
-                                       throw new IOException(
-                                                       String.format(
-                                                                       "%s 
contains entry for cluster %s but no CLDB locations.",
-                                                                       
maprClusterConf, authority));
-                               }
-
-                               return cldbLocations.toArray(new 
String[cldbLocations.size()]);
-                       }
-
-               }
-
-               throw new IOException(String.format(
-                               "Unable to find CLDB locations for cluster %s", 
authority));
-       }
-
-       @Override
-       public FileSystemKind getKind() {
-               return FileSystemKind.FILE_SYSTEM;
-       }
-}
diff --git 
a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
 
b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
index e163f63..f6939e7 100644
--- 
a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
+++ 
b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
@@ -18,15 +18,24 @@
 
 package org.apache.flink.runtime.fs.maprfs;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -40,6 +49,20 @@ public class MapRFsFactory implements FileSystemFactory {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(MapRFsFactory.class);
 
+       /** Name of the environment variable to determine the location of the 
MapR
+        * installation. */
+       private static final String MAPR_HOME_ENV = "MAPR_HOME";
+
+       /** The default location of the MapR installation. */
+       private static final String DEFAULT_MAPR_HOME = "/opt/mapr/";
+
+       /** The path relative to the MAPR_HOME where MapR stores how to access 
the
+        * configured clusters. */
+       private static final String MAPR_CLUSTER_CONF_FILE = 
"/conf/mapr-clusters.conf";
+
+       /** Name of the class implementing the MapRFileSystem. */
+       private static final String MAPR_FS_CLASS_NAME = 
"com.mapr.fs.MapRFileSystem";
+
        // 
------------------------------------------------------------------------
 
        @Override
@@ -56,10 +79,31 @@ public class MapRFsFactory implements FileSystemFactory {
        public FileSystem create(URI fsUri) throws IOException {
                checkNotNull(fsUri, "fsUri");
 
+               checkMaprFsClassInClassPath();
+
                try {
                        LOG.info("Trying to load and instantiate MapR File 
System");
 
-                       return new MapRFileSystem(fsUri);
+                       final org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
+                       final org.apache.hadoop.fs.FileSystem fs;
+
+                       final String authority = fsUri.getAuthority();
+                       if (authority == null || authority.isEmpty()) {
+
+                               // Use the default constructor to instantiate 
MapR file system object
+                               fs = instantiateMapRFsClass();
+                       }
+                       else {
+                               // We have an authority, check the MapR cluster 
configuration to
+                               // find the CLDB locations.
+                               final String[] cldbLocations = 
getCLDBLocations(authority);
+                               fs = instantiateMapRFsClass(authority, 
cldbLocations);
+                       }
+
+                       // now initialize the Hadoop File System object
+                       fs.initialize(fsUri, conf);
+
+                       return new HadoopFileSystem(fs);
                }
                catch (LinkageError e) {
                        throw new IOException("Could not load MapR file system. 
"  +
@@ -72,4 +116,128 @@ public class MapRFsFactory implements FileSystemFactory {
                        throw new IOException("Could not instantiate MapR file 
system.", t);
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  MapR Config Loading
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Retrieves the CLDB locations for the given MapR cluster name.
+        *
+        * @param authority
+        *            the name of the MapR cluster
+        * @return a list of CLDB locations
+        * @throws IOException
+        *             thrown if the CLDB locations for the given MapR cluster 
name
+        *             cannot be determined
+        */
+       private static String[] getCLDBLocations(String authority) throws 
IOException {
+
+               // Determine the MapR home
+               String maprHome = System.getenv(MAPR_HOME_ENV);
+               if (maprHome == null) {
+                       maprHome = DEFAULT_MAPR_HOME;
+               }
+
+               final File maprClusterConf = new File(maprHome, 
MAPR_CLUSTER_CONF_FILE);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(String.format(
+                               "Trying to retrieve MapR cluster configuration 
from %s",
+                               maprClusterConf));
+               }
+
+               if (!maprClusterConf.exists()) {
+                       throw new IOException("Could not find CLDB 
configuration '" + maprClusterConf.getAbsolutePath() +
+                               "', assuming MapR home is '" + maprHome + "'.");
+               }
+
+               // Read the cluster configuration file, format is specified at
+               // http://doc.mapr.com/display/MapR/mapr-clusters.conf
+
+               try (BufferedReader br = new BufferedReader(new 
FileReader(maprClusterConf))) {
+
+                       String line;
+                       while ((line = br.readLine()) != null) {
+
+                               // Normalize the string
+                               line = line.trim();
+                               line = line.replace('\t', ' ');
+
+                               final String[] fields = line.split(" ");
+                               if (fields.length < 1) {
+                                       continue;
+                               }
+
+                               final String clusterName = fields[0];
+
+                               if (!clusterName.equals(authority)) {
+                                       continue;
+                               }
+
+                               final List<String> cldbLocations = new 
ArrayList<>();
+
+                               for (int i = 1; i < fields.length; ++i) {
+
+                                       // Make sure this is not a key-value 
pair MapR recently
+                                       // introduced in the file format along 
with their security
+                                       // features.
+                                       if (!fields[i].isEmpty() && 
!fields[i].contains("=")) {
+                                               cldbLocations.add(fields[i]);
+                                       }
+                               }
+
+                               if (cldbLocations.isEmpty()) {
+                                       throw new IOException(
+                                               String.format(
+                                                       "%s contains entry for 
cluster %s but no CLDB locations.",
+                                                       maprClusterConf, 
authority));
+                               }
+
+                               return cldbLocations.toArray(new 
String[cldbLocations.size()]);
+                       }
+
+               }
+
+               throw new IOException(String.format(
+                       "Unable to find CLDB locations for cluster %s", 
authority));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Reflective FS Instantiation
+       // 
------------------------------------------------------------------------
+
+       private static void checkMaprFsClassInClassPath() throws IOException {
+               try {
+                       Class.forName(MAPR_FS_CLASS_NAME, false, 
MapRFsFactory.class.getClassLoader());
+               }
+               catch (ClassNotFoundException e) {
+                       throw new IOException("Cannot find MapR FS in 
classpath: " + MAPR_FS_CLASS_NAME, e);
+               }
+       }
+
+       @VisibleForTesting
+       static org.apache.hadoop.fs.FileSystem instantiateMapRFsClass(Object... 
args) throws IOException {
+               final Class<? extends org.apache.hadoop.fs.FileSystem> fsClazz;
+
+               try {
+                       fsClazz = Class
+                               .forName(MAPR_FS_CLASS_NAME)
+                               
.asSubclass(org.apache.hadoop.fs.FileSystem.class);
+               } catch (ClassNotFoundException e) {
+                       throw new IOException("Cannot load MapR FS. Class 
missing in classpath", e);
+               } catch (ClassCastException e) {
+                       throw new IOException("Class '" + MAPR_FS_CLASS_NAME + 
"' is not a subclass of org.apache.hadoop.fs.FileSystem");
+               }
+
+               final Class<?>[] constructorArgs = 
Arrays.stream(args).map(Object::getClass).toArray(Class[]::new);
+               try {
+                       final Constructor<? extends 
org.apache.hadoop.fs.FileSystem> ctor =
+                               fsClazz.getConstructor(constructorArgs);
+
+                       return ctor.newInstance(args);
+               } catch (Exception e) {
+                       throw new IOException("Cannot instantiate MapR FS 
class", e);
+               }
+       }
 }
diff --git 
a/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java 
b/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java
new file mode 100644
index 0000000..b027487
--- /dev/null
+++ 
b/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mapr.fs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Test class that mocks the MapRFileSystem.
+ */
+public class MapRFileSystem extends org.apache.hadoop.fs.FileSystem {
+
+       @Override
+       public URI getUri() {
+               return URI.create("maprfs:/");
+       }
+
+       @Override
+       public FSDataInputStream open(Path path, int i) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public FSDataOutputStream create(Path path, FsPermission fsPermission, 
boolean b, int i, short i1, long l, Progressable progressable) throws 
IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public FSDataOutputStream append(Path path, int i, Progressable 
progressable) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean rename(Path path, Path path1) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean delete(Path path, boolean b) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public FileStatus[] listStatus(Path path) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setWorkingDirectory(Path path) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Path getWorkingDirectory() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean mkdirs(Path path, FsPermission fsPermission) throws 
IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public FileStatus getFileStatus(Path path) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+}
diff --git a/tools/travis_controller.sh b/tools/travis_controller.sh
index 18e6b1b..c0f70a5 100755
--- a/tools/travis_controller.sh
+++ b/tools/travis_controller.sh
@@ -59,8 +59,7 @@ EXIT_CODE=0
 
 # Run actual compile&test steps
 if [ $STAGE == "$STAGE_COMPILE" ]; then
-    # We use -Punsafe-mapr-repo since the https version fails on Travis for 
some reason.
-       MVN="mvn clean install -nsu -Punsafe-mapr-repo 
-Dflink.convergence.phase=install -Pcheck-convergence -Dflink.forkCount=2 
-Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests 
$PROFILE"
+       MVN="mvn clean install -nsu -Dflink.convergence.phase=install 
-Pcheck-convergence -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dmaven.javadoc.skip=true -B -DskipTests $PROFILE"
        $MVN
        EXIT_CODE=$?
 

Reply via email to