This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 80452f19e1712fd09d3b6b9a99c82d954f471a88 Author: Stephan Ewen <[email protected]> AuthorDate: Tue Jul 30 12:00:07 2019 +0200 [FLINK-13499][maprfs] Handle MapR dependency purely through reflection This allows us to remove the MapR dependency from the module. The MapR maven dependency has frequently caused issues. --- flink-filesystems/flink-mapr-fs/pom.xml | 38 +---- .../flink/runtime/fs/maprfs/MapRFileSystem.java | 181 --------------------- .../flink/runtime/fs/maprfs/MapRFsFactory.java | 170 ++++++++++++++++++- .../src/test/java/com/mapr/fs/MapRFileSystem.java | 90 ++++++++++ tools/travis_controller.sh | 3 +- 5 files changed, 263 insertions(+), 219 deletions(-) diff --git a/flink-filesystems/flink-mapr-fs/pom.xml b/flink-filesystems/flink-mapr-fs/pom.xml index 2683a8e..955f4d0 100644 --- a/flink-filesystems/flink-mapr-fs/pom.xml +++ b/flink-filesystems/flink-mapr-fs/pom.xml @@ -32,35 +32,6 @@ under the License. <packaging>jar</packaging> - <repositories> - <repository> - <id>mapr-releases</id> - <url>https://repository.mapr.com/maven/</url> - <snapshots><enabled>false</enabled></snapshots> - <releases><enabled>true</enabled></releases> - </repository> - </repositories> - - <profiles> - <profile> - <id>unsafe-mapr-repo</id> - <activation> - <property> - <name>unsafe-mapr-repo</name> - </property> - </activation> - <repositories> - <!-- MapR --> - <repository> - <id>mapr-releases</id> - <url>http://repository.mapr.com/maven/</url> - <snapshots><enabled>false</enabled></snapshots> - <releases><enabled>true</enabled></releases> - </repository> - </repositories> - </profile> - </profiles> - <dependencies> <dependency> @@ -75,13 +46,10 @@ under the License. <version>${project.version}</version> </dependency> - <!-- MapR dependencies as optional dependency, so we can hard depend on this without --> - <!-- pulling in MapR libraries by default --> - <dependency> - <groupId>com.mapr.hadoop</groupId> - <artifactId>maprfs</artifactId> - <version>5.2.1-mapr</version> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop2</artifactId> + <version>${hadoop.version}-${project.version}</version> <optional>true</optional> </dependency> diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java deleted file mode 100644 index 5aec4a4..0000000 --- a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.fs.maprfs; - -import org.apache.flink.core.fs.FileSystemKind; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A MapR file system client for Flink. - * - * <p>Internally, this class wraps the {@link org.apache.hadoop.fs.FileSystem} implementation - * of the MapR file system client. - */ -public class MapRFileSystem extends HadoopFileSystem { - - private static final Logger LOG = LoggerFactory.getLogger(MapRFileSystem.class); - - /** Name of the environment variable to determine the location of the MapR - * installation. */ - private static final String MAPR_HOME_ENV = "MAPR_HOME"; - - /** The default location of the MapR installation. */ - private static final String DEFAULT_MAPR_HOME = "/opt/mapr/"; - - /** The path relative to the MAPR_HOME where MapR stores how to access the - * configured clusters. */ - private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf"; - - // ------------------------------------------------------------------------ - - /** - * Creates a MapRFileSystem for the given URI. - * - * @param fsUri The URI describing the file system - * @throws IOException Thrown if the file system could not be initialized. - */ - public MapRFileSystem(URI fsUri) throws IOException { - super(instantiateMapRFileSystem(fsUri)); - } - - private static org.apache.hadoop.fs.FileSystem instantiateMapRFileSystem(URI fsUri) throws IOException { - checkNotNull(fsUri, "fsUri"); - - final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - final com.mapr.fs.MapRFileSystem fs; - - final String authority = fsUri.getAuthority(); - if (authority == null || authority.isEmpty()) { - - // Use the default constructor to instantiate MapR file system object - fs = new com.mapr.fs.MapRFileSystem(); - } - else { - // We have an authority, check the MapR cluster configuration to - // find the CLDB locations. - final String[] cldbLocations = getCLDBLocations(authority); - fs = new com.mapr.fs.MapRFileSystem(authority, cldbLocations); - } - - // now initialize the Hadoop File System object - fs.initialize(fsUri, conf); - - return fs; - } - - /** - * Retrieves the CLDB locations for the given MapR cluster name. - * - * @param authority - * the name of the MapR cluster - * @return a list of CLDB locations - * @throws IOException - * thrown if the CLDB locations for the given MapR cluster name - * cannot be determined - */ - private static String[] getCLDBLocations(String authority) throws IOException { - - // Determine the MapR home - String maprHome = System.getenv(MAPR_HOME_ENV); - if (maprHome == null) { - maprHome = DEFAULT_MAPR_HOME; - } - - final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE); - - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Trying to retrieve MapR cluster configuration from %s", - maprClusterConf)); - } - - if (!maprClusterConf.exists()) { - throw new IOException("Could not find CLDB configuration '" + maprClusterConf.getAbsolutePath() + - "', assuming MapR home is '" + maprHome + "'."); - } - - // Read the cluster configuration file, format is specified at - // http://doc.mapr.com/display/MapR/mapr-clusters.conf - - try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) { - - String line; - while ((line = br.readLine()) != null) { - - // Normalize the string - line = line.trim(); - line = line.replace('\t', ' '); - - final String[] fields = line.split(" "); - if (fields.length < 1) { - continue; - } - - final String clusterName = fields[0]; - - if (!clusterName.equals(authority)) { - continue; - } - - final List<String> cldbLocations = new ArrayList<>(); - - for (int i = 1; i < fields.length; ++i) { - - // Make sure this is not a key-value pair MapR recently - // introduced in the file format along with their security - // features. - if (!fields[i].isEmpty() && !fields[i].contains("=")) { - cldbLocations.add(fields[i]); - } - } - - if (cldbLocations.isEmpty()) { - throw new IOException( - String.format( - "%s contains entry for cluster %s but no CLDB locations.", - maprClusterConf, authority)); - } - - return cldbLocations.toArray(new String[cldbLocations.size()]); - } - - } - - throw new IOException(String.format( - "Unable to find CLDB locations for cluster %s", authority)); - } - - @Override - public FileSystemKind getKind() { - return FileSystemKind.FILE_SYSTEM; - } -} diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java index e163f63..f6939e7 100644 --- a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java +++ b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java @@ -18,15 +18,24 @@ package org.apache.flink.runtime.fs.maprfs; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -40,6 +49,20 @@ public class MapRFsFactory implements FileSystemFactory { private static final Logger LOG = LoggerFactory.getLogger(MapRFsFactory.class); + /** Name of the environment variable to determine the location of the MapR + * installation. */ + private static final String MAPR_HOME_ENV = "MAPR_HOME"; + + /** The default location of the MapR installation. */ + private static final String DEFAULT_MAPR_HOME = "/opt/mapr/"; + + /** The path relative to the MAPR_HOME where MapR stores how to access the + * configured clusters. */ + private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf"; + + /** Name of the class implementing the MapRFileSystem. */ + private static final String MAPR_FS_CLASS_NAME = "com.mapr.fs.MapRFileSystem"; + // ------------------------------------------------------------------------ @Override @@ -56,10 +79,31 @@ public class MapRFsFactory implements FileSystemFactory { public FileSystem create(URI fsUri) throws IOException { checkNotNull(fsUri, "fsUri"); + checkMaprFsClassInClassPath(); + try { LOG.info("Trying to load and instantiate MapR File System"); - return new MapRFileSystem(fsUri); + final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + final org.apache.hadoop.fs.FileSystem fs; + + final String authority = fsUri.getAuthority(); + if (authority == null || authority.isEmpty()) { + + // Use the default constructor to instantiate MapR file system object + fs = instantiateMapRFsClass(); + } + else { + // We have an authority, check the MapR cluster configuration to + // find the CLDB locations. + final String[] cldbLocations = getCLDBLocations(authority); + fs = instantiateMapRFsClass(authority, cldbLocations); + } + + // now initialize the Hadoop File System object + fs.initialize(fsUri, conf); + + return new HadoopFileSystem(fs); } catch (LinkageError e) { throw new IOException("Could not load MapR file system. " + @@ -72,4 +116,128 @@ public class MapRFsFactory implements FileSystemFactory { throw new IOException("Could not instantiate MapR file system.", t); } } + + // ------------------------------------------------------------------------ + // MapR Config Loading + // ------------------------------------------------------------------------ + + /** + * Retrieves the CLDB locations for the given MapR cluster name. + * + * @param authority + * the name of the MapR cluster + * @return a list of CLDB locations + * @throws IOException + * thrown if the CLDB locations for the given MapR cluster name + * cannot be determined + */ + private static String[] getCLDBLocations(String authority) throws IOException { + + // Determine the MapR home + String maprHome = System.getenv(MAPR_HOME_ENV); + if (maprHome == null) { + maprHome = DEFAULT_MAPR_HOME; + } + + final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE); + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Trying to retrieve MapR cluster configuration from %s", + maprClusterConf)); + } + + if (!maprClusterConf.exists()) { + throw new IOException("Could not find CLDB configuration '" + maprClusterConf.getAbsolutePath() + + "', assuming MapR home is '" + maprHome + "'."); + } + + // Read the cluster configuration file, format is specified at + // http://doc.mapr.com/display/MapR/mapr-clusters.conf + + try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) { + + String line; + while ((line = br.readLine()) != null) { + + // Normalize the string + line = line.trim(); + line = line.replace('\t', ' '); + + final String[] fields = line.split(" "); + if (fields.length < 1) { + continue; + } + + final String clusterName = fields[0]; + + if (!clusterName.equals(authority)) { + continue; + } + + final List<String> cldbLocations = new ArrayList<>(); + + for (int i = 1; i < fields.length; ++i) { + + // Make sure this is not a key-value pair MapR recently + // introduced in the file format along with their security + // features. + if (!fields[i].isEmpty() && !fields[i].contains("=")) { + cldbLocations.add(fields[i]); + } + } + + if (cldbLocations.isEmpty()) { + throw new IOException( + String.format( + "%s contains entry for cluster %s but no CLDB locations.", + maprClusterConf, authority)); + } + + return cldbLocations.toArray(new String[cldbLocations.size()]); + } + + } + + throw new IOException(String.format( + "Unable to find CLDB locations for cluster %s", authority)); + } + + // ------------------------------------------------------------------------ + // Reflective FS Instantiation + // ------------------------------------------------------------------------ + + private static void checkMaprFsClassInClassPath() throws IOException { + try { + Class.forName(MAPR_FS_CLASS_NAME, false, MapRFsFactory.class.getClassLoader()); + } + catch (ClassNotFoundException e) { + throw new IOException("Cannot find MapR FS in classpath: " + MAPR_FS_CLASS_NAME, e); + } + } + + @VisibleForTesting + static org.apache.hadoop.fs.FileSystem instantiateMapRFsClass(Object... args) throws IOException { + final Class<? extends org.apache.hadoop.fs.FileSystem> fsClazz; + + try { + fsClazz = Class + .forName(MAPR_FS_CLASS_NAME) + .asSubclass(org.apache.hadoop.fs.FileSystem.class); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot load MapR FS. Class missing in classpath", e); + } catch (ClassCastException e) { + throw new IOException("Class '" + MAPR_FS_CLASS_NAME + "' is not a subclass of org.apache.hadoop.fs.FileSystem"); + } + + final Class<?>[] constructorArgs = Arrays.stream(args).map(Object::getClass).toArray(Class[]::new); + try { + final Constructor<? extends org.apache.hadoop.fs.FileSystem> ctor = + fsClazz.getConstructor(constructorArgs); + + return ctor.newInstance(args); + } catch (Exception e) { + throw new IOException("Cannot instantiate MapR FS class", e); + } + } } diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java b/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java new file mode 100644 index 0000000..b027487 --- /dev/null +++ b/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mapr.fs; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import java.io.IOException; +import java.net.URI; + +/** + * Test class that mocks the MapRFileSystem. + */ +public class MapRFileSystem extends org.apache.hadoop.fs.FileSystem { + + @Override + public URI getUri() { + return URI.create("maprfs:/"); + } + + @Override + public FSDataInputStream open(Path path, int i) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l, Progressable progressable) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean rename(Path path, Path path1) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean delete(Path path, boolean b) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void setWorkingDirectory(Path path) { + throw new UnsupportedOperationException(); + } + + @Override + public Path getWorkingDirectory() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/tools/travis_controller.sh b/tools/travis_controller.sh index 18e6b1b..c0f70a5 100755 --- a/tools/travis_controller.sh +++ b/tools/travis_controller.sh @@ -59,8 +59,7 @@ EXIT_CODE=0 # Run actual compile&test steps if [ $STAGE == "$STAGE_COMPILE" ]; then - # We use -Punsafe-mapr-repo since the https version fails on Travis for some reason. - MVN="mvn clean install -nsu -Punsafe-mapr-repo -Dflink.convergence.phase=install -Pcheck-convergence -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests $PROFILE" + MVN="mvn clean install -nsu -Dflink.convergence.phase=install -Pcheck-convergence -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests $PROFILE" $MVN EXIT_CODE=$?
