KYLIN-2374 Allow kylin to store metadata in HDFS instead of HBase Signed-off-by: shaofengshi <shaofeng...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d23bf930 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d23bf930 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d23bf930 Branch: refs/heads/spark-it Commit: d23bf930da0b542d0e6981917e6bde055839577a Parents: db85d66 Author: xiefan46 <958034...@qq.com> Authored: Wed Jan 11 10:00:19 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Jan 23 16:23:56 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 5 + .../common/persistence/ResourceStoreTest.java | 3 +- .../test_case_data/sandbox/kylin.properties | 1 + .../storage/hdfs/ITHDFSResourceStoreTest.java | 117 +++++++++++ .../org/apache/kylin/storage/hdfs/HDFSLock.java | 41 ++++ .../kylin/storage/hdfs/HDFSLockManager.java | 45 +++++ .../kylin/storage/hdfs/HDFSResourceStore.java | 198 +++++++++++++++++++ 7 files changed, 409 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 05df177..44d636d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -201,6 +201,11 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.metadata.url"); } + //for hdfs resource store + public String getHDFSMetadataUrl() { + return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs"); + } + // for test only public void setMetadataUrl(String metadataUrl) { setProperty("kylin.metadata.url", metadataUrl); http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java index 4c31a15..ddaf481 100644 --- a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java @@ -110,9 +110,10 @@ public class ResourceStoreTest { } // list - NavigableSet<String> list; + NavigableSet<String> list = null; list = store.listResources(dir1); + System.out.println(list); assertTrue(list.contains(path1)); assertTrue(list.contains(path2) == false); http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 6c512dc..b01c377 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -41,6 +41,7 @@ kylin.source.hive.client=cli # The metadata store in hbase kylin.metadata.url=kylin_default_instance@hbase + # The storage for final cube file in hbase kylin.storage.url=hbase http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java new file mode 100644 index 0000000..ef04957 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hdfs; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.ResourceStoreTest; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; + +/** + * Created by xiefan on 17-1-10. + */ +public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase { + + KylinConfig kylinConfig; + + @Before + public void setup() throws Exception { + this.createTestMetadata(); + kylinConfig = KylinConfig.getInstanceFromEnv(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Ignore + @Test + public void testHDFSUrl() throws Exception { + assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl()); + System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory()); + } + + + @Ignore + @Test + public void testMultiThreadWriteHDFS() throws Exception{ + //System.out.println(kylinConfig.getHdfsWorkingDirectory()); + final Path testDir = new Path("hdfs:///test123"); + final FileSystem fs = HadoopUtil.getFileSystem(testDir); + final String fileName = "test.json"; + int threadNum = 3; + ExecutorService service = Executors.newFixedThreadPool(threadNum); + final CountDownLatch latch = new CountDownLatch(threadNum); + Path p = new Path(testDir,fileName); + fs.deleteOnExit(p); + fs.createNewFile(p); + for(int i=0;i<threadNum;i++) { + service.execute(new Runnable() { + @Override + public void run() { + try { + long id = Thread.currentThread().getId(); + Path p = new Path(testDir, fileName); + /*while(fs.exists(p)){ + System.out.println("Thread id : " + id + " can not get lock,sleep a while"); + Thread.currentThread().sleep(1000); + }*/ + while(!fs.createNewFile(p)){ + System.out.println("Thread id : " + id + " can not get lock,sleep a while"); + Thread.currentThread().sleep(1000); + } + System.out.println("Thread id : " + id + " get lock, sleep a while"); + Thread.currentThread().sleep(1000); + fs.delete(p,true); + System.out.println("Thread id : " + id + " release lock"); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + Thread.currentThread().sleep(1000); + fs.delete(p,true); + System.out.println("main thread release lock.Waiting threads down"); + System.out.println("file still exist : " + fs.exists(p)); + latch.await(); + } + + @Test + public void testHDFSStore() throws Exception { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + ResourceStore store = new HDFSResourceStore(config); + ResourceStoreTest.testAStore(store); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java new file mode 100644 index 0000000..8710edf --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java @@ -0,0 +1,41 @@ +package org.apache.kylin.storage.hdfs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * Created by xiefan on 17-1-17. + */ +public class HDFSLock { + + private Path rawLock; + + private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class); + + protected HDFSLock(String resourceFullPath) { + this.rawLock = new Path(resourceFullPath); + } + + public boolean init(FileSystem fs) throws IOException, InterruptedException { + if (!fs.isFile(rawLock)) { + logger.info("Not support directory lock yet"); + return false; + } + while (!fs.createNewFile(rawLock)) { + Thread.currentThread().sleep(1000); + } + return true; + } + + public boolean release(FileSystem fs) throws IOException, InterruptedException { + while (!fs.delete(rawLock, false)) { + Thread.currentThread().sleep(1000); + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java new file mode 100644 index 0000000..1cd0800 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java @@ -0,0 +1,45 @@ +package org.apache.kylin.storage.hdfs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.engine.mr.HadoopUtil; + +import java.io.IOException; + +/** + * Created by xiefan on 17-1-17. + */ +public class HDFSLockManager { + + private static final String LOCK_HOME = "LOCK_HOME"; + + private Path lockPath; + + private FileSystem fs; + + public HDFSLockManager(String hdfsWorkingDir) throws IOException{ + this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME); + this.fs = HadoopUtil.getFileSystem(lockPath); + if(!fs.exists(lockPath)){ + fs.create(lockPath); + } + } + + public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{ + HDFSLock lock = new HDFSLock(resourceFullPath); + boolean success = lock.init(fs); + if(success){ + return lock; + }else{ + throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath); + } + } + + public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{ + boolean success = lock.release(fs); + if(!success) + throw new IllegalStateException("Release lock fail"); + } + + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d23bf930/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java new file mode 100644 index 0000000..c7f0f25 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hdfs; + +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.RawResource; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + +/** + * Created by xiefan on 17-1-10. + */ +public class HDFSResourceStore extends ResourceStore { + + private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs"; + + private Path hdfsMetaPath; + + private FileSystem fs; + + private HDFSLockManager lockManager; + + private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class); + + //public for test. Normal should be protected + public HDFSResourceStore(KylinConfig kylinConfig) throws IOException { + super(kylinConfig); + String metadataUrl = kylinConfig.getHDFSMetadataUrl(); + // split TABLE@HBASE_URL + int cut = metadataUrl.indexOf('@'); + String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut); + createMetaFolder(metaDirName, kylinConfig); + } + + private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException { + String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory(); + fs = HadoopUtil.getFileSystem(hdfsWorkingDir); + Path hdfsWorkingPath = new Path(hdfsWorkingDir); + if (!fs.exists(hdfsWorkingPath)) { + throw new IOException("HDFS working dir not exist"); + } + hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName); + if (!fs.exists(hdfsMetaPath)) { + fs.create(hdfsMetaPath, true); + } + lockManager = new HDFSLockManager(hdfsWorkingDir); + } + + @Override + protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException { + Path p = getRealHDFSPath(folderPath); + if (!fs.exists(p) || !fs.isDirectory(p)) { + return null; + } + TreeSet<String> r = new TreeSet<>(); + FileStatus[] statuses = fs.listStatus(p); + String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/"; + for (FileStatus status : statuses) { + r.add(prefix + status.getPath().getName()); + } + return r; + } + + @Override + protected boolean existsImpl(String resPath) throws IOException { + Path p = getRealHDFSPath(resPath); + return fs.exists(p) && fs.isFile(p); + } + + @Override + protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException { + NavigableSet<String> resources = listResources(folderPath); + if (resources == null) + return Collections.emptyList(); + List<RawResource> result = Lists.newArrayListWithCapacity(resources.size()); + try { + for (String res : resources) { + long ts = getResourceTimestampImpl(res); + if (timeStart <= ts && ts < timeEndExclusive) { + RawResource resource = getResourceImpl(res); + if (resource != null) // can be null if is a sub-folder + result.add(resource); + } + } + } catch (IOException ex) { + for (RawResource rawResource : result) { + IOUtils.closeQuietly(rawResource.inputStream); + } + throw ex; + } + return result; + } + + @Override + protected RawResource getResourceImpl(String resPath) throws IOException { + Path p = getRealHDFSPath(resPath); + if (fs.exists(p) && fs.isFile(p)) { + if (fs.getFileStatus(p).getLen() == 0) { + logger.warn("Zero length file: " + p.toString()); + } + FSDataInputStream in = fs.open(p); + return new RawResource(fs.open(p), getResourceTimestamp(resPath)); + } else { + return null; + } + } + + @Override + protected long getResourceTimestampImpl(String resPath) throws IOException { + Path p = getRealHDFSPath(resPath); + if (!fs.exists(p) || !fs.isFile(p)) { + return 0; + } + FileStatus status = fs.getFileStatus(p); + return status.getModificationTime(); + } + + @Override + protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { + Path p = getRealHDFSPath(resPath); + FSDataOutputStream out = null; + try { + out = fs.create(p, true); + IOUtils.copy(content, out); + } finally { + IOUtils.closeQuietly(out); + } + } + + @Override + protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { + Path p = getRealHDFSPath(resPath); + if (!fs.exists(p)) { + if (oldTS != 0) { + throw new IllegalStateException("For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS); + } + + } else { + long realLastModify = getResourceTimestamp(resPath); + if (realLastModify != oldTS) { + throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but found " + realLastModify); + } + } + putResourceImpl(resPath, new ByteArrayInputStream(content), newTS); + return newTS; + } + + @Override + protected void deleteResourceImpl(String resPath) throws IOException { + Path p = getRealHDFSPath(resPath); + if (fs.exists(p)) { + fs.delete(p, true); + } + } + + @Override + protected String getReadableResourcePathImpl(String resPath) { + return getRealHDFSPath(resPath).toString(); + } + + private Path getRealHDFSPath(String resourcePath) { + return new Path(this.hdfsMetaPath, resourcePath); + } + +}