Repository: kylin Updated Branches: refs/heads/2.0-rc 5f7f3a750 -> 62b00f9ea
KYLIN-1280 convert other hdfs path to master name node Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/62b00f9e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/62b00f9e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/62b00f9e Branch: refs/heads/2.0-rc Commit: 62b00f9eaac63d5c18158d9c1e98ebbd857b5266 Parents: 5f7f3a7 Author: fengyu <[email protected]> Authored: Wed Jan 20 10:22:38 2016 +0800 Committer: fengyu <[email protected]> Committed: Wed Jan 20 16:43:07 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 + .../org/apache/kylin/engine/mr/HadoopUtil.java | 82 ++++++++++++++++++++ .../apache/kylin/engine/spark/SparkCubing.java | 4 + .../kylin/storage/hbase/steps/CubeHFileJob.java | 4 + 4 files changed, 94 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/62b00f9e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 4c640f3..5f91d53 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -482,6 +482,10 @@ public class KylinConfigBase implements Serializable { public boolean isGetJobStatusWithKerberos() { return Boolean.valueOf(this.getOptional("kylin.job.status.with.kerberos", "false")); } + + public boolean isTransformPathToMasterNN() { + return Boolean.valueOf(this.getOptional("kylin.transform.hdfs.path.enable", "false")); + } public String getKylinOwner() { return this.getOptional("kylin.owner", ""); http://git-wip-us.apache.org/repos/asf/kylin/blob/62b00f9e/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java index 9ce2bab..f8d469e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java @@ -21,18 +21,25 @@ package org.apache.kylin.engine.mr; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.io.Writable; +import org.apache.kylin.common.KylinConfig; +import org.apache.log4j.Logger; public class HadoopUtil { private static final ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>(); + private static final Logger logger = Logger.getLogger(HadoopUtil.class); public static void setCurrentConfiguration(Configuration conf) { hadoopConfig.set(conf); @@ -115,5 +122,80 @@ public class HadoopUtil { throw new RuntimeException(e); } } + + public static Configuration convertCurrentConfig(String path) { + Configuration currentConfig = getCurrentConfiguration(); + if(path == null) + return currentConfig; + String nameService = currentConfig.get(FileSystem.FS_DEFAULT_NAME_KEY); + logger.debug("Current convert path " + path); + logger.debug("Current default name service " + nameService); + try { + URI pathUri = new URI(path); + String host = pathUri.getHost(); + //do not transform path with default name service. + if(nameService != null) { + URI defaultUri = new URI(nameService); + if(pathUri.getScheme().equalsIgnoreCase(defaultUri.getScheme()) && host.equalsIgnoreCase(defaultUri.getHost())) { + return currentConfig; + } + } + //get namespace to real name node map.. + Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses(currentConfig); + Map<String, InetSocketAddress> addressesInNN = map.get(host); + //if do not exist this namespace, such as we use real name node + if(addressesInNN == null) + return currentConfig; + for(InetSocketAddress addr : addressesInNN.values()) { + String name = addr.getHostName(); + int port = addr.getPort(); + String target = String.format("%s://%s:%d/", HdfsConstants.HDFS_URI_SCHEME, name, port); + Configuration tmpConfig = new Configuration(); + tmpConfig.set(FileSystem.FS_DEFAULT_NAME_KEY, target); + FileSystem tmpFs = FileSystem.get(tmpConfig); + try { + //try every name node address test whether it is standby server. + tmpFs.listFiles(new Path("/"), false); + logger.debug("Transform path nameservice " + host + " to real name node " + target); + return tmpConfig; + } catch (Exception e) { + logger.warn(String.format("Transforming hadoop namenode %s, real host %s is standby server !", + nameService, target)); + continue; + } + } + } catch (IOException e) { + throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e); + } catch (URISyntaxException e1) { + throw new IllegalArgumentException("Cannot create path to URI", e1); + } + return currentConfig; + } + /* + * transform a path without default name service(such as path in hbase hdfs, hfile location) + * to path point to master name node. + * @param path to transform, such as hbase location. + * @return a equal path with physical domain name which is master namenode. + */ + public static Path transformPathToNN(Path path) { + //without schema, using default config + if(path == null || !path.toString().startsWith(HdfsConstants.HDFS_URI_SCHEME)) + return path; + + try { + URI uri = new URI(path.toString()); + String rawPath = uri.getRawPath(); + //try to convert default filesystem to master nn. + Configuration newConfig = convertCurrentConfig(path.toString()); + if(newConfig == null) { + newConfig = getCurrentConfiguration(); + } + FileSystem fs = FileSystem.get(newConfig); + return fs.makeQualified(new Path(rawPath)); + } catch (Exception e) { + logger.warn("Transform path " + path + " error !", e); + return null; + } + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/62b00f9e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 488945f..25e1057 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -78,6 +78,7 @@ import org.apache.kylin.cube.util.CubingUtils; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.IterableDictionaryValueEnumerator; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; @@ -389,6 +390,9 @@ public class SparkCubing extends AbstractApplication { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration conf = getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier()); Path path = new Path(kylinConfig.getHdfsWorkingDirectory(), "hfile_" + UUID.randomUUID().toString()); + if(kylinConfig.isTransformPathToMasterNN()) { + path = HadoopUtil.transformPathToNN(path); + } Preconditions.checkArgument(!FileSystem.get(conf).exists(path)); String url = conf.get("fs.defaultFS") + path.toString(); System.out.println("use " + url + " as hfile"); http://git-wip-us.apache.org/repos/asf/kylin/blob/62b00f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 1f0b1a0..596bbff 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -33,6 +33,7 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.slf4j.Logger; @@ -57,6 +58,9 @@ public class CubeHFileJob extends AbstractHadoopJob { parseOptions(options, args); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + if(KylinConfig.getInstanceFromEnv().isTransformPathToMasterNN()) { + output = HadoopUtil.transformPathToNN(output); + } String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
