Repository: kylin Updated Branches: refs/heads/master-hbase0.98 9323c7c2b -> 4e41c3637 (forced update)
KYLIN-2374 code review Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5eae37ef Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5eae37ef Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5eae37ef Branch: refs/heads/master-hbase0.98 Commit: 5eae37ef18ca51027c6bb2cfd3410fefc7982f2a Parents: a2a59c4 Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Jan 26 09:55:48 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Jan 26 09:55:48 2017 +0800 ---------------------------------------------------------------------- build/conf/kylin.properties | 3 +- build/deploy/spark-defaults.conf | 1 - .../apache/kylin/common/KylinConfigBase.java | 8 -- .../kylin/common/persistence/ResourceStore.java | 3 + .../org/apache/kylin/cube/model/CubeDesc.java | 2 +- .../ExtendedColumnMeasureType.java | 8 +- .../storage/hdfs/ITHDFSResourceStoreTest.java | 36 +++++++- .../kylin/storage/hbase/HBaseResourceStore.java | 3 +- .../kylin/storage/hdfs/HDFSResourceStore.java | 90 +++++++++++--------- 9 files changed, 97 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index eceb886..43ea17d 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -211,8 +211,9 @@ kylin.engine.spark-conf.spark.executor.memory=4G kylin.engine.spark-conf.spark.executor.cores=4 kylin.engine.spark-conf.spark.executor.instances=8 kylin.engine.spark-conf.spark.storage.memoryFraction=0.3 -kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history +kylin.engine.spark-conf.spark.eventLog.enabled=true kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history +kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history ## manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime #kylin.engine.spark-conf.spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar #kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/build/deploy/spark-defaults.conf ---------------------------------------------------------------------- diff --git a/build/deploy/spark-defaults.conf b/build/deploy/spark-defaults.conf index 36c0ab3..78a4bc9 100644 --- a/build/deploy/spark-defaults.conf +++ b/build/deploy/spark-defaults.conf @@ -1,5 +1,4 @@ spark.yarn.submit.file.replication=1 -spark.eventLog.enabled=true spark.yarn.max.executor.failures=3 spark.driver.extraJavaOptions=-Dhdp.version=current spark.yarn.am.extraJavaOptions=-Dhdp.version=current http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 5932197..b1acbbf 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -193,14 +193,6 @@ abstract public class KylinConfigBase implements Serializable { return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString(); } - public String getRawHdfsWorkingDirectory() { - String root = getRequired("kylin.env.hdfs-working-dir"); - if (!root.endsWith("/")) { - root += "/"; - } - return root; - } - // ============================================================================ // METADATA // ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index 25a0801..c441618 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -63,6 +63,9 @@ abstract public class ResourceStore { public static final String CUBE_STATISTICS_ROOT = "/cube_statistics"; public static final String BAD_QUERY_RESOURCE_ROOT = "/bad_query"; + + protected static final String DEFAULT_STORE_NAME = "kylin_metadata"; + private static final ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>(); private static final ArrayList<Class<? extends ResourceStore>> knownImpl = new ArrayList<Class<? extends ResourceStore>>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 7e599da..5e970bf 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -891,7 +891,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { func.init(model); allColumns.addAll(func.getParameter().getColRefs()); - if (ExtendedColumnMeasureType.FUNC_RAW.equalsIgnoreCase(m.getFunction().getExpression())) { + if (ExtendedColumnMeasureType.FUNC_EXTENDED_COLUMN.equalsIgnoreCase(m.getFunction().getExpression())) { FunctionDesc functionDesc = m.getFunction(); List<TblColRef> hosts = ExtendedColumnMeasureType.getExtendedColumnHosts(functionDesc); http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java index 1b2cda3..de5ee25 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java @@ -47,8 +47,8 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> { private static final Logger logger = LoggerFactory.getLogger(ExtendedColumnMeasureType.class); - public static final String FUNC_RAW = "EXTENDED_COLUMN"; - public static final String DATATYPE_RAW = "extendedcolumn"; + public static final String FUNC_EXTENDED_COLUMN = "EXTENDED_COLUMN"; + public static final String DATATYPE_EXTENDED_COLUMN = "extendedcolumn"; private final DataType dataType; public static class Factory extends MeasureTypeFactory<ByteArray> { @@ -60,12 +60,12 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> { @Override public String getAggrFunctionName() { - return FUNC_RAW; + return FUNC_EXTENDED_COLUMN; } @Override public String getAggrDataTypeName() { - return DATATYPE_RAW; + return DATATYPE_EXTENDED_COLUMN; } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java index ff66048..ec12722 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java @@ -18,21 +18,28 @@ package org.apache.kylin.storage.hdfs; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStoreTest; import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.common.util.HadoopUtil; import org.junit.After; import org.junit.Before; import org.junit.Test; +import static junit.framework.TestCase.assertTrue; + public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase { KylinConfig kylinConfig; + FileSystem fs; @Before public void setup() throws Exception { this.createTestMetadata(); kylinConfig = KylinConfig.getInstanceFromEnv(); + fs = HadoopUtil.getWorkingFileSystem(); } @After @@ -41,12 +48,37 @@ public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase { } @Test - public void testResourceStoreBasic() throws Exception { + public void testBasic() throws Exception { + String oldUrl = kylinConfig.getMetadataUrl(); + String path = "/kylin/kylin_metadata/metadata"; + kylinConfig.setProperty("kylin.metadata.url", path + "@hdfs"); + HDFSResourceStore store = new HDFSResourceStore(kylinConfig); + ResourceStoreTest.testAStore(store); + kylinConfig.setProperty("kylin.metadata.url", oldUrl); + assertTrue(fs.exists(new Path(path))); + } + + @Test + public void testQalifiedName() throws Exception { String oldUrl = kylinConfig.getMetadataUrl(); - kylinConfig.setProperty("kylin.metadata.url", "kylin_metadata@hdfs"); + String path = "hdfs:///kylin/kylin_metadata/metadata_test1"; + kylinConfig.setProperty("kylin.metadata.url", path + "@hdfs"); HDFSResourceStore store = new HDFSResourceStore(kylinConfig); ResourceStoreTest.testAStore(store); kylinConfig.setProperty("kylin.metadata.url", oldUrl); + assertTrue(fs.exists(new Path(path))); } + @Test + public void testFullQalifiedName() throws Exception { + String oldUrl = kylinConfig.getMetadataUrl(); + String path = "hdfs://sandbox.hortonworks.com:8020/kylin/kylin_metadata/metadata_test2"; + kylinConfig.setProperty("kylin.metadata.url", path + "@hdfs"); + HDFSResourceStore store = new HDFSResourceStore(kylinConfig); + ResourceStoreTest.testAStore(store); + kylinConfig.setProperty("kylin.metadata.url", oldUrl); + assertTrue(fs.exists(new Path(path))); + } + + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 0901b54..501f1e4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -59,7 +59,6 @@ public class HBaseResourceStore extends ResourceStore { private static final Logger logger = LoggerFactory.getLogger(HBaseResourceStore.class); - private static final String DEFAULT_TABLE_NAME = "kylin_metadata"; private static final String FAMILY = "f"; private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY); private static final String COLUMN = "c"; @@ -80,7 +79,7 @@ public class HBaseResourceStore extends ResourceStore { String metadataUrl = kylinConfig.getMetadataUrl(); // split TABLE@HBASE_URL int cut = metadataUrl.indexOf('@'); - tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut); + tableNameBase = cut < 0 ? DEFAULT_STORE_NAME : metadataUrl.substring(0, cut); hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); if (!hbaseUrl.equals("hbase")) throw new IOException("Can not create HBaseResourceStore. Url not match. Url:" + hbaseUrl); http://git-wip-us.apache.org/repos/asf/kylin/blob/5eae37ef/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java index 38acfb0..d24d3b4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java @@ -46,11 +46,7 @@ public class HDFSResourceStore extends ResourceStore { private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class); - private static final long DEFAULT_ACQUIRE_LOCK_TIMEOUT = 10; - - private static final String DEFAULT_FOLDER_NAME = "kylin_metadata"; - - private static final String DEFAULT_METADATA_FOLDER_NAME = "hdfs_metadata"; + private static final long DEFAULT_ACQUIRE_LOCK_TIMEOUT = 2; private Path hdfsMetaPath; @@ -62,42 +58,43 @@ public class HDFSResourceStore extends ResourceStore { super(kylinConfig); String metadataUrl = kylinConfig.getMetadataUrl(); int cut = metadataUrl.indexOf('@'); - String metaDirName = cut < 0 ? DEFAULT_FOLDER_NAME : metadataUrl.substring(0, cut); - String hdfsUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); - if (!hdfsUrl.equals("hdfs")) - throw new IOException("Can not create HDFSResourceStore. Url not match. Url:" + hdfsUrl); - metaDirName += "/" + DEFAULT_METADATA_FOLDER_NAME; - logger.info("meta dir name :" + metaDirName); - createMetaFolder(metaDirName, kylinConfig); - } - - private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws Exception { - String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory(); - fs = HadoopUtil.getFileSystem(hdfsWorkingDir); - logger.info("hdfs working dir : " + hdfsWorkingDir); - Path hdfsWorkingPath = new Path(hdfsWorkingDir); - if (!fs.exists(hdfsWorkingPath)) { - throw new IOException("HDFS working dir not exist"); + if (cut < 0) { + throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore: " + metadataUrl); } + String suffix = metadataUrl.substring(cut + 1); + if (!suffix.equals("hdfs")) + throw new IOException("kylin.metadata.url not recognized for HDFSResourceStore:" + suffix); + + String path = metadataUrl.substring(0, cut); + fs = HadoopUtil.getFileSystem(path); + Path metadataPath = new Path(path); //creat lock manager - this.lockManager = new LockManager(kylinConfig, kylinConfig.getRawHdfsWorkingDirectory() + metaDirName); + this.lockManager = new LockManager(kylinConfig, getRelativePath(metadataPath)); + if (fs.exists(metadataPath) == false) { + logger.warn("Path not exist in HDFS, create it: " + path); + createMetaFolder(metadataPath, kylinConfig); + } + + hdfsMetaPath = metadataPath; + logger.info("hdfs meta path : " + hdfsMetaPath.toString()); + + } + + + + private void createMetaFolder(Path metaDirName, KylinConfig kylinConfig) throws Exception { //create hdfs meta path - hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName); - if (!fs.exists(hdfsMetaPath)) { - ResourceLock lock = lockManager.getLock(lockManager.getLockPath("/")); - try { - if (lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES)) { - logger.info("get root lock successfully"); - if (!fs.exists(hdfsMetaPath)) { - fs.mkdirs(hdfsMetaPath); - logger.info("create hdfs meta path"); - } + ResourceLock lock = lockManager.getLock(getRelativePath(metaDirName)); + try { + if (lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS)) { + if (!fs.exists(metaDirName)) { + fs.mkdirs(metaDirName); } - } finally { - lockManager.releaseLock(lock); } + } finally { + lockManager.releaseLock(lock); } - logger.info("hdfs meta path : " + hdfsMetaPath.toString()); + logger.info("hdfs meta path created: " + metaDirName.toString()); } @Override @@ -170,7 +167,7 @@ public class HDFSResourceStore extends ResourceStore { ResourceLock lock = null; try { lock = lockManager.getLock(resPath); - lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES); + lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS); in = fs.open(p); long t = in.readLong(); return t; @@ -192,7 +189,7 @@ public class HDFSResourceStore extends ResourceStore { ResourceLock lock = null; try { lock = lockManager.getLock(resPath); - lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES); + lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS); out = fs.create(p, true); out.writeLong(ts); IOUtils.copy(content, out); @@ -228,7 +225,7 @@ public class HDFSResourceStore extends ResourceStore { ResourceLock lock = null; try { lock = lockManager.getLock(resPath); - lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES); + lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.SECONDS); Path p = getRealHDFSPath(resPath); if (fs.exists(p)) { fs.delete(p, true); @@ -253,4 +250,21 @@ public class HDFSResourceStore extends ResourceStore { return new Path(this.hdfsMetaPath, resourcePath); } + private static String getRelativePath(Path hdfsPath) { + String path = hdfsPath.toString(); + int index = path.indexOf("://"); + if (index > 0) { + path = path.substring(index + 3); + } + + if (path.startsWith("/") == false) { + if (path.indexOf("/") > 0) { + path = path.substring(path.indexOf("/")); + } else { + path = "/" + path; + } + } + return path; + } + }