Repository: kylin
Updated Branches:
  refs/heads/2.0-rc 5f7f3a750 -> 62b00f9ea


KYLIN-1280 convert other hdfs path to master name node


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/62b00f9e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/62b00f9e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/62b00f9e

Branch: refs/heads/2.0-rc
Commit: 62b00f9eaac63d5c18158d9c1e98ebbd857b5266
Parents: 5f7f3a7
Author: fengyu <[email protected]>
Authored: Wed Jan 20 10:22:38 2016 +0800
Committer: fengyu <[email protected]>
Committed: Wed Jan 20 16:43:07 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 +
 .../org/apache/kylin/engine/mr/HadoopUtil.java  | 82 ++++++++++++++++++++
 .../apache/kylin/engine/spark/SparkCubing.java  |  4 +
 .../kylin/storage/hbase/steps/CubeHFileJob.java |  4 +
 4 files changed, 94 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/62b00f9e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4c640f3..5f91d53 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -482,6 +482,10 @@ public class KylinConfigBase implements Serializable {
     public boolean isGetJobStatusWithKerberos() {
         return 
Boolean.valueOf(this.getOptional("kylin.job.status.with.kerberos", "false"));
     }
+    
+    public boolean isTransformPathToMasterNN() {
+        return 
Boolean.valueOf(this.getOptional("kylin.transform.hdfs.path.enable", "false"));
+    }
 
     public String getKylinOwner() {
         return this.getOptional("kylin.owner", "");

http://git-wip-us.apache.org/repos/asf/kylin/blob/62b00f9e/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
index 9ce2bab..f8d469e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -21,18 +21,25 @@ package org.apache.kylin.engine.mr;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.io.Writable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.log4j.Logger;
 
 public class HadoopUtil {
     private static final ThreadLocal<Configuration> hadoopConfig = new 
ThreadLocal<>();
+    private static final Logger logger = Logger.getLogger(HadoopUtil.class);
 
     public static void setCurrentConfiguration(Configuration conf) {
         hadoopConfig.set(conf);
@@ -115,5 +122,80 @@ public class HadoopUtil {
             throw new RuntimeException(e);
         }
     }
+    
+    public static Configuration convertCurrentConfig(String path) {
+        Configuration currentConfig = getCurrentConfiguration();
+        if(path == null)
+            return currentConfig;
+        String nameService = currentConfig.get(FileSystem.FS_DEFAULT_NAME_KEY);
+        logger.debug("Current convert path " + path);
+        logger.debug("Current default name service " + nameService);
+        try {
+            URI pathUri = new URI(path);
+            String host = pathUri.getHost();
+            //do not transform path with default name service.
+            if(nameService != null) {
+                URI defaultUri = new URI(nameService);
+                
if(pathUri.getScheme().equalsIgnoreCase(defaultUri.getScheme()) && 
host.equalsIgnoreCase(defaultUri.getHost())) {
+                    return currentConfig;
+                }
+            }
+            //get namespace to real name node map..
+            Map<String, Map<String, InetSocketAddress>> map = 
DFSUtil.getHaNnRpcAddresses(currentConfig);
+            Map<String, InetSocketAddress> addressesInNN = map.get(host);
+            //if do not exist this namespace, such as we use real name node
+            if(addressesInNN == null)
+                return currentConfig;
+            for(InetSocketAddress addr : addressesInNN.values()) {
+                String name = addr.getHostName();
+                int port = addr.getPort();
+                String target = String.format("%s://%s:%d/", 
HdfsConstants.HDFS_URI_SCHEME, name, port);
+                Configuration tmpConfig = new Configuration();
+                tmpConfig.set(FileSystem.FS_DEFAULT_NAME_KEY, target);
+                FileSystem tmpFs = FileSystem.get(tmpConfig);
+                try {
+                       //try every name node address test whether it is 
standby server.
+                    tmpFs.listFiles(new Path("/"), false);
+                    logger.debug("Transform path nameservice " + host + " to 
real name node " + target);
+                    return tmpConfig;
+                } catch (Exception e) {
+                    logger.warn(String.format("Transforming hadoop namenode 
%s, real host %s is standby server !", 
+                            nameService, target));
+                    continue;
+                }
+            }
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Cannot create FileSystem from 
current hbase cluster conf", e);
+        } catch (URISyntaxException e1) {
+            throw new IllegalArgumentException("Cannot create path to URI", 
e1);
+        }
+        return currentConfig;
+    }
 
+    /*
+     * transform a path without default name service(such as path in hbase 
hdfs, hfile location) 
+     * to path point to master name node.
+     * @param path to transform, such as hbase location.
+     * @return a equal path with physical domain name which is master namenode.
+     */
+    public static Path transformPathToNN(Path path) {        
+        //without schema, using default config
+        if(path == null || 
!path.toString().startsWith(HdfsConstants.HDFS_URI_SCHEME))
+            return path;
+        
+        try {
+            URI uri = new URI(path.toString());
+            String rawPath = uri.getRawPath();
+            //try to convert default filesystem to master nn.
+            Configuration newConfig = convertCurrentConfig(path.toString());
+            if(newConfig == null) {
+                newConfig = getCurrentConfiguration();
+            }
+            FileSystem fs = FileSystem.get(newConfig);
+            return fs.makeQualified(new Path(rawPath));
+        } catch (Exception e) {
+            logger.warn("Transform path " + path + " error !", e);
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/62b00f9e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 488945f..25e1057 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -78,6 +78,7 @@ import org.apache.kylin.cube.util.CubingUtils;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
 import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
@@ -389,6 +390,9 @@ public class SparkCubing extends AbstractApplication {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Configuration conf = 
getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier());
         Path path = new Path(kylinConfig.getHdfsWorkingDirectory(), "hfile_" + 
UUID.randomUUID().toString());
+        if(kylinConfig.isTransformPathToMasterNN()) {
+               path = HadoopUtil.transformPathToNN(path);
+        }
         Preconditions.checkArgument(!FileSystem.get(conf).exists(path));
         String url = conf.get("fs.defaultFS") + path.toString();
         System.out.println("use " + url + " as hfile");

http://git-wip-us.apache.org/repos/asf/kylin/blob/62b00f9e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 1f0b1a0..596bbff 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.slf4j.Logger;
@@ -57,6 +58,9 @@ public class CubeHFileJob extends AbstractHadoopJob {
             parseOptions(options, args);
 
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            if(KylinConfig.getInstanceFromEnv().isTransformPathToMasterNN()) {
+                output = HadoopUtil.transformPathToNN(output);
+            }
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
 
             CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());

Reply via email to