Repository: kylin
Updated Branches:
  refs/heads/master-hbase0.98 9323c7c2b -> 4e41c3637 (forced update)


KYLIN-2374 code review


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5eae37ef
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5eae37ef
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5eae37ef

Branch: refs/heads/master-hbase0.98
Commit: 5eae37ef18ca51027c6bb2cfd3410fefc7982f2a
Parents: a2a59c4
Author: shaofengshi <shaofeng...@apache.org>
Authored: Thu Jan 26 09:55:48 2017 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Thu Jan 26 09:55:48 2017 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |  3 +-
 build/deploy/spark-defaults.conf                |  1 -
 .../apache/kylin/common/KylinConfigBase.java    |  8 --
 .../kylin/common/persistence/ResourceStore.java |  3 +
 .../org/apache/kylin/cube/model/CubeDesc.java   |  2 +-
 .../ExtendedColumnMeasureType.java              |  8 +-
 .../storage/hdfs/ITHDFSResourceStoreTest.java   | 36 +++++++-
 .../kylin/storage/hbase/HBaseResourceStore.java |  3 +-
 .../kylin/storage/hdfs/HDFSResourceStore.java   | 90 +++++++++++---------
 9 files changed, 97 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index eceb886..43ea17d 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -211,8 +211,9 @@ kylin.engine.spark-conf.spark.executor.memory=4G
 kylin.engine.spark-conf.spark.executor.cores=4
 kylin.engine.spark-conf.spark.executor.instances=8
 kylin.engine.spark-conf.spark.storage.memoryFraction=0.3
-kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
+kylin.engine.spark-conf.spark.eventLog.enabled=true
 kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
+kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
 ## manually upload spark-assembly jar to HDFS and then set this property will 
avoid repeatedly uploading jar at runtime
 
#kylin.engine.spark-conf.spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
 
#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/build/deploy/spark-defaults.conf
----------------------------------------------------------------------
diff --git a/build/deploy/spark-defaults.conf b/build/deploy/spark-defaults.conf
index 36c0ab3..78a4bc9 100644
--- a/build/deploy/spark-defaults.conf
+++ b/build/deploy/spark-defaults.conf
@@ -1,5 +1,4 @@
 spark.yarn.submit.file.replication=1
-spark.eventLog.enabled=true
 spark.yarn.max.executor.failures=3
 spark.driver.extraJavaOptions=-Dhdp.version=current
 spark.yarn.am.extraJavaOptions=-Dhdp.version=current

http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5932197..b1acbbf 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -193,14 +193,6 @@ abstract public class KylinConfigBase implements 
Serializable {
         return new 
StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', 
'-')).append("/").toString();
     }
 
-    public String getRawHdfsWorkingDirectory() {
-        String root = getRequired("kylin.env.hdfs-working-dir");
-        if (!root.endsWith("/")) {
-            root += "/";
-        }
-        return root;
-    }
-
     // 
============================================================================
     // METADATA
     // 
============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 25a0801..c441618 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -63,6 +63,9 @@ abstract public class ResourceStore {
     public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
     public static final String BAD_QUERY_RESOURCE_ROOT = "/bad_query";
 
+
+    protected static final String DEFAULT_STORE_NAME = "kylin_metadata";
+
     private static final ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = 
new ConcurrentHashMap<KylinConfig, ResourceStore>();
 
     private static final ArrayList<Class<? extends ResourceStore>> knownImpl = 
new ArrayList<Class<? extends ResourceStore>>();

http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 7e599da..5e970bf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -891,7 +891,7 @@ public class CubeDesc extends RootPersistentEntity 
implements IEngineAware {
             func.init(model);
             allColumns.addAll(func.getParameter().getColRefs());
 
-            if 
(ExtendedColumnMeasureType.FUNC_RAW.equalsIgnoreCase(m.getFunction().getExpression()))
 {
+            if 
(ExtendedColumnMeasureType.FUNC_EXTENDED_COLUMN.equalsIgnoreCase(m.getFunction().getExpression()))
 {
                 FunctionDesc functionDesc = m.getFunction();
 
                 List<TblColRef> hosts = 
ExtendedColumnMeasureType.getExtendedColumnHosts(functionDesc);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
index 1b2cda3..de5ee25 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
@@ -47,8 +47,8 @@ public class ExtendedColumnMeasureType extends 
MeasureType<ByteArray> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ExtendedColumnMeasureType.class);
 
-    public static final String FUNC_RAW = "EXTENDED_COLUMN";
-    public static final String DATATYPE_RAW = "extendedcolumn";
+    public static final String FUNC_EXTENDED_COLUMN = "EXTENDED_COLUMN";
+    public static final String DATATYPE_EXTENDED_COLUMN = "extendedcolumn";
     private final DataType dataType;
 
     public static class Factory extends MeasureTypeFactory<ByteArray> {
@@ -60,12 +60,12 @@ public class ExtendedColumnMeasureType extends 
MeasureType<ByteArray> {
 
         @Override
         public String getAggrFunctionName() {
-            return FUNC_RAW;
+            return FUNC_EXTENDED_COLUMN;
         }
 
         @Override
         public String getAggrDataTypeName() {
-            return DATATYPE_RAW;
+            return DATATYPE_EXTENDED_COLUMN;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
index ff66048..ec12722 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -18,21 +18,28 @@
 
 package org.apache.kylin.storage.hdfs;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStoreTest;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static junit.framework.TestCase.assertTrue;
+
 public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
 
     KylinConfig kylinConfig;
+    FileSystem fs;
 
     @Before
     public void setup() throws Exception {
         this.createTestMetadata();
         kylinConfig = KylinConfig.getInstanceFromEnv();
+        fs = HadoopUtil.getWorkingFileSystem();
     }
 
     @After
@@ -41,12 +48,37 @@ public class ITHDFSResourceStoreTest extends 
HBaseMetadataTestCase {
     }
 
     @Test
-    public void testResourceStoreBasic() throws Exception {
+    public void testBasic() throws Exception {
+        String oldUrl = kylinConfig.getMetadataUrl();
+        String path = "/kylin/kylin_metadata/metadata";
+        kylinConfig.setProperty("kylin.metadata.url", path + "@hdfs");
+        HDFSResourceStore store = new HDFSResourceStore(kylinConfig);
+        ResourceStoreTest.testAStore(store);
+        kylinConfig.setProperty("kylin.metadata.url", oldUrl);
+        assertTrue(fs.exists(new Path(path)));
+    }
+
+    @Test
+    public void testQalifiedName() throws Exception {
         String oldUrl = kylinConfig.getMetadataUrl();
-        kylinConfig.setProperty("kylin.metadata.url", "kylin_metadata@hdfs");
+        String path = "hdfs:///kylin/kylin_metadata/metadata_test1";
+        kylinConfig.setProperty("kylin.metadata.url", path + "@hdfs");
         HDFSResourceStore store = new HDFSResourceStore(kylinConfig);
         ResourceStoreTest.testAStore(store);
         kylinConfig.setProperty("kylin.metadata.url", oldUrl);
+        assertTrue(fs.exists(new Path(path)));
     }
 
+    @Test
+    public void testFullQalifiedName() throws Exception {
+        String oldUrl = kylinConfig.getMetadataUrl();
+        String path = 
"hdfs://sandbox.hortonworks.com:8020/kylin/kylin_metadata/metadata_test2";
+        kylinConfig.setProperty("kylin.metadata.url", path + "@hdfs");
+        HDFSResourceStore store = new HDFSResourceStore(kylinConfig);
+        ResourceStoreTest.testAStore(store);
+        kylinConfig.setProperty("kylin.metadata.url", oldUrl);
+        assertTrue(fs.exists(new Path(path)));
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 0901b54..501f1e4 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -59,7 +59,6 @@ public class HBaseResourceStore extends ResourceStore {
 
     private static final Logger logger = 
LoggerFactory.getLogger(HBaseResourceStore.class);
 
-    private static final String DEFAULT_TABLE_NAME = "kylin_metadata";
     private static final String FAMILY = "f";
     private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY);
     private static final String COLUMN = "c";
@@ -80,7 +79,7 @@ public class HBaseResourceStore extends ResourceStore {
         String metadataUrl = kylinConfig.getMetadataUrl();
         // split TABLE@HBASE_URL
         int cut = metadataUrl.indexOf('@');
-        tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : 
metadataUrl.substring(0, cut);
+        tableNameBase = cut < 0 ? DEFAULT_STORE_NAME : 
metadataUrl.substring(0, cut);
         hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
         if (!hbaseUrl.equals("hbase"))
             throw new IOException("Can not create HBaseResourceStore. Url not 
match. Url:" + hbaseUrl);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index 38acfb0..d24d3b4 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -46,11 +46,7 @@ public class HDFSResourceStore extends ResourceStore {
 
     private static final Logger logger = 
LoggerFactory.getLogger(HDFSResourceStore.class);
 
-    private static final long DEFAULT_ACQUIRE_LOCK_TIMEOUT = 10;
-
-    private static final String DEFAULT_FOLDER_NAME = "kylin_metadata";
-
-    private static final String DEFAULT_METADATA_FOLDER_NAME = "hdfs_metadata";
+    private static final long DEFAULT_ACQUIRE_LOCK_TIMEOUT = 2;
 
     private Path hdfsMetaPath;
 
@@ -62,42 +58,43 @@ public class HDFSResourceStore extends ResourceStore {
         super(kylinConfig);
         String metadataUrl = kylinConfig.getMetadataUrl();
         int cut = metadataUrl.indexOf('@');
-        String metaDirName = cut < 0 ? DEFAULT_FOLDER_NAME : 
metadataUrl.substring(0, cut);
-        String hdfsUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 
1);
-        if (!hdfsUrl.equals("hdfs"))
-            throw new IOException("Can not create HDFSResourceStore. Url not 
match. Url:" + hdfsUrl);
-        metaDirName += "/" + DEFAULT_METADATA_FOLDER_NAME;
-        logger.info("meta dir name :" + metaDirName);
-        createMetaFolder(metaDirName, kylinConfig);
-    }
-
-    private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) 
throws Exception {
-        String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
-        fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
-        logger.info("hdfs working dir : " + hdfsWorkingDir);
-        Path hdfsWorkingPath = new Path(hdfsWorkingDir);
-        if (!fs.exists(hdfsWorkingPath)) {
-            throw new IOException("HDFS working dir not exist");
+        if (cut < 0) {
+            throw new IOException("kylin.metadata.url not recognized for 
HDFSResourceStore: " + metadataUrl);
         }
+        String suffix = metadataUrl.substring(cut + 1);
+        if (!suffix.equals("hdfs"))
+            throw new IOException("kylin.metadata.url not recognized for 
HDFSResourceStore:" + suffix);
+
+        String path = metadataUrl.substring(0, cut);
+        fs = HadoopUtil.getFileSystem(path);
+        Path metadataPath = new Path(path);
         //creat lock manager
-        this.lockManager = new LockManager(kylinConfig, 
kylinConfig.getRawHdfsWorkingDirectory() + metaDirName);
+        this.lockManager = new LockManager(kylinConfig, 
getRelativePath(metadataPath));
+        if (fs.exists(metadataPath) == false) {
+            logger.warn("Path not exist in HDFS, create it: " + path);
+            createMetaFolder(metadataPath, kylinConfig);
+        }
+
+        hdfsMetaPath = metadataPath;
+        logger.info("hdfs meta path : " + hdfsMetaPath.toString());
+
+    }
+
+
+
+    private void createMetaFolder(Path metaDirName, KylinConfig kylinConfig) 
throws Exception {
         //create hdfs meta path
-        hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
-        if (!fs.exists(hdfsMetaPath)) {
-            ResourceLock lock = 
lockManager.getLock(lockManager.getLockPath("/"));
-            try {
-                if (lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, 
TimeUnit.MINUTES)) {
-                    logger.info("get root lock successfully");
-                    if (!fs.exists(hdfsMetaPath)) {
-                        fs.mkdirs(hdfsMetaPath);
-                        logger.info("create hdfs meta path");
-                    }
+        ResourceLock lock = lockManager.getLock(getRelativePath(metaDirName));
+        try {
+            if (lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS)) {
+                if (!fs.exists(metaDirName)) {
+                    fs.mkdirs(metaDirName);
                 }
-            } finally {
-                lockManager.releaseLock(lock);
             }
+        } finally {
+            lockManager.releaseLock(lock);
         }
-        logger.info("hdfs meta path : " + hdfsMetaPath.toString());
+        logger.info("hdfs meta path created: " + metaDirName.toString());
     }
 
     @Override
@@ -170,7 +167,7 @@ public class HDFSResourceStore extends ResourceStore {
         ResourceLock lock = null;
         try {
             lock = lockManager.getLock(resPath);
-            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
+            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS);
             in = fs.open(p);
             long t = in.readLong();
             return t;
@@ -192,7 +189,7 @@ public class HDFSResourceStore extends ResourceStore {
         ResourceLock lock = null;
         try {
             lock = lockManager.getLock(resPath);
-            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
+            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS);
             out = fs.create(p, true);
             out.writeLong(ts);
             IOUtils.copy(content, out);
@@ -228,7 +225,7 @@ public class HDFSResourceStore extends ResourceStore {
         ResourceLock lock = null;
         try {
             lock = lockManager.getLock(resPath);
-            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
+            lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS);
             Path p = getRealHDFSPath(resPath);
             if (fs.exists(p)) {
                 fs.delete(p, true);
@@ -253,4 +250,21 @@ public class HDFSResourceStore extends ResourceStore {
         return new Path(this.hdfsMetaPath, resourcePath);
     }
 
+    private static String getRelativePath(Path hdfsPath) {
+        String path = hdfsPath.toString();
+        int index = path.indexOf("://");
+        if (index > 0) {
+            path = path.substring(index + 3);
+        }
+
+        if (path.startsWith("/") == false) {
+            if (path.indexOf("/") > 0) {
+                path = path.substring(path.indexOf("/"));
+            } else {
+                path = "/" + path;
+            }
+        }
+        return path;
+    }
+
 }

Reply via email to