Add Zookeeper Lock Signed-off-by: shaofengshi <shaofeng...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d3276e2e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d3276e2e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d3276e2e Branch: refs/heads/KYLIN-2361 Commit: d3276e2e909d3001724ee8fda1304ae8b7f08c63 Parents: d23bf93 Author: xiefan46 <958034...@qq.com> Authored: Fri Jan 20 09:48:17 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Jan 23 16:23:56 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 22 +- .../test_case_data/sandbox/kylin.properties | 4 + .../storage/hdfs/ITHDFSResourceStoreTest.java | 66 +----- .../kylin/storage/hdfs/ITLockManagerTest.java | 205 +++++++++++++++++++ .../kylin/storage/hbase/HBaseResourceStore.java | 1 + .../org/apache/kylin/storage/hdfs/HDFSLock.java | 41 ---- .../kylin/storage/hdfs/HDFSLockManager.java | 45 ---- .../kylin/storage/hdfs/HDFSResourceStore.java | 95 +++++++-- .../apache/kylin/storage/hdfs/LockManager.java | 116 +++++++++++ .../apache/kylin/storage/hdfs/ResourceLock.java | 51 +++++ 10 files changed, 471 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 44d636d..75b38ff 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -193,6 +193,14 @@ abstract public class KylinConfigBase implements Serializable { return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString(); } + public String getRawHdfsWorkingDirectory() { + String root = getRequired("kylin.env.hdfs-working-dir"); + if (!root.endsWith("/")) { + root += "/"; + } + return root; + } + // ============================================================================ // METADATA // ============================================================================ @@ -201,11 +209,6 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.metadata.url"); } - //for hdfs resource store - public String getHDFSMetadataUrl() { - return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs"); - } - // for test only public void setMetadataUrl(String metadataUrl) { setProperty("kylin.metadata.url", metadataUrl); @@ -925,4 +928,13 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.web.cross-domain-enabled", "true")); } + //zoo keeper + public String getZooKeeperHost() { + return getOptional("kylin.storage-zookeeper.host", "localhost"); + } + + public String getZooKeeperPort() { + return getOptional("kylin.storage-zookeeper.port", "2181"); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index b01c377..2c2da91 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -177,3 +177,7 @@ kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history #kylin.engine.spark-conf.spark.yarn.queue=default #kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar #kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec + + +#zoo keeper +kylin.storage-zookeeper.host=sandbox \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java index ef04957..27d8a3c 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java @@ -17,23 +17,15 @@ */ package org.apache.kylin.storage.hdfs; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; + import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.ResourceStoreTest; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertEquals; /** * Created by xiefan on 17-1-10. @@ -53,65 +45,13 @@ public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase { this.cleanupTestMetadata(); } - @Ignore - @Test - public void testHDFSUrl() throws Exception { - assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl()); - System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory()); - } - - @Ignore @Test - public void testMultiThreadWriteHDFS() throws Exception{ - //System.out.println(kylinConfig.getHdfsWorkingDirectory()); - final Path testDir = new Path("hdfs:///test123"); - final FileSystem fs = HadoopUtil.getFileSystem(testDir); - final String fileName = "test.json"; - int threadNum = 3; - ExecutorService service = Executors.newFixedThreadPool(threadNum); - final CountDownLatch latch = new CountDownLatch(threadNum); - Path p = new Path(testDir,fileName); - fs.deleteOnExit(p); - fs.createNewFile(p); - for(int i=0;i<threadNum;i++) { - service.execute(new Runnable() { - @Override - public void run() { - try { - long id = Thread.currentThread().getId(); - Path p = new Path(testDir, fileName); - /*while(fs.exists(p)){ - System.out.println("Thread id : " + id + " can not get lock,sleep a while"); - Thread.currentThread().sleep(1000); - }*/ - while(!fs.createNewFile(p)){ - System.out.println("Thread id : " + id + " can not get lock,sleep a while"); - Thread.currentThread().sleep(1000); - } - System.out.println("Thread id : " + id + " get lock, sleep a while"); - Thread.currentThread().sleep(1000); - fs.delete(p,true); - System.out.println("Thread id : " + id + " release lock"); - latch.countDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - Thread.currentThread().sleep(1000); - fs.delete(p,true); - System.out.println("main thread release lock.Waiting threads down"); - System.out.println("file still exist : " + fs.exists(p)); - latch.await(); - } - - @Test - public void testHDFSStore() throws Exception { + public void testResourceStoreBasic() throws Exception { KylinConfig config = KylinConfig.getInstanceFromEnv(); ResourceStore store = new HDFSResourceStore(config); ResourceStoreTest.testAStore(store); } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java new file mode 100644 index 0000000..2b58d30 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; + +public class ITLockManagerTest extends HBaseMetadataTestCase { + + + private String zkConnection = "sandbox:2181"; + + private KylinConfig kylinConfig; + + private CuratorFramework zkClient; + + private static final String lockRootPath = "/test_lock"; + + private LockManager manager; + + private static final int QTY = 5; + + private static final int REPETITIONS = QTY * 10; + + @Before + public void setup() throws Exception { + this.createTestMetadata(); + kylinConfig = KylinConfig.getInstanceFromEnv(); + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + zkClient = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy); + zkClient.start(); + manager = new LockManager(kylinConfig, lockRootPath); + System.out.println("nodes in lock root : " + zkClient.getChildren().forPath(lockRootPath)); + + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + zkClient.delete().deletingChildrenIfNeeded().forPath(lockRootPath); + List<String> nodes = zkClient.getChildren().forPath("/"); + System.out.println("nodes in zk after delete : " + nodes); + manager.close(); + } + + @Test + public void testCreateLock() throws Exception { + + ResourceLock lock = manager.getLock("/dictionary/numberdict.json"); + lock.acquire(); + manager.releaseLock(lock); + System.out.println(zkClient.getChildren().forPath(lockRootPath + "/dictionary")); + List<String> nodes = zkClient.getChildren().forPath(lockRootPath + "/dictionary"); + assertEquals(1, nodes.size()); + assertEquals("numberdict.json", nodes.get(0)); + } + + @Test + public void testLockSafty() throws Exception { + // all of the useful sample code is in ExampleClientThatLocks.java + + // FakeLimitedResource simulates some external resource that can only be access by one process at a time + final FakeLimitedResource resource = new FakeLimitedResource(); + ExecutorService service = Executors.newFixedThreadPool(QTY); + final TestingServer server = new TestingServer(zkConnection); + final List<FutureTask<Void>> tasks = new ArrayList<>(); + try { + for (int i = 0; i < QTY; ++i) { + final int index = i; + FutureTask<Void> task = new FutureTask<Void>(new Callable<Void>() { + @Override + public Void call() throws Exception { + LockManager threadLocalLockManager = new LockManager(kylinConfig, lockRootPath); + try { + ExampleClientThatLocks example = new ExampleClientThatLocks(threadLocalLockManager, lockRootPath, resource, "Client " + index); + for (int j = 0; j < REPETITIONS; ++j) { + example.doWork(10, TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + e.printStackTrace(); + // log or do something + } finally { + threadLocalLockManager.close(); + } + return null; + } + }); + tasks.add(task); + service.submit(task); + } + for (FutureTask<Void> task : tasks) { + task.get(); + } + } finally { + CloseableUtils.closeQuietly(server); + } + } + + class FakeLimitedResource { + private final AtomicBoolean inUse = new AtomicBoolean(false); + + public void use() throws InterruptedException { + // in a real application this would be accessing/manipulating a shared resource + + if (!inUse.compareAndSet(false, true)) { + throw new IllegalStateException("Needs to be used by one client at a time"); + } + + try { + Thread.sleep((long) (3 * Math.random())); + } finally { + inUse.set(false); + } + } + } + + class TestingServer implements Closeable { + + private String connectionString; + + public TestingServer(String connectionStr) { + this.connectionString = connectionStr; + } + + @Override + public void close() throws IOException { + + } + + public String getConnectString() { + return connectionString; + } + } + + class ExampleClientThatLocks { + + private final FakeLimitedResource resource; + + private final String clientName; + + private LockManager lockManager; + + private String lockPath; + + public ExampleClientThatLocks(LockManager lockManager, String lockPath, FakeLimitedResource resource, String clientName) { + this.resource = resource; + this.clientName = clientName; + this.lockManager = lockManager; + this.lockPath = lockPath; + } + + public void doWork(long time, TimeUnit unit) throws Exception { + ResourceLock lock = lockManager.getLock(lockPath); + if (!lock.acquire(time, unit)) { + throw new IllegalStateException(clientName + " could not acquire the lock"); + } + try { + System.out.println(clientName + " has the lock"); + resource.use(); + } finally { + System.out.println(clientName + " releasing the lock"); + lock.release(); // always release the lock in a finally block + } + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 1c45967..170e351 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -311,6 +311,7 @@ public class HBaseResourceStore extends ResourceStore { } finally { IOUtils.closeQuietly(table); } + } private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java deleted file mode 100644 index 8710edf..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java +++ /dev/null @@ -1,41 +0,0 @@ -package org.apache.kylin.storage.hdfs; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; - -/** - * Created by xiefan on 17-1-17. - */ -public class HDFSLock { - - private Path rawLock; - - private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class); - - protected HDFSLock(String resourceFullPath) { - this.rawLock = new Path(resourceFullPath); - } - - public boolean init(FileSystem fs) throws IOException, InterruptedException { - if (!fs.isFile(rawLock)) { - logger.info("Not support directory lock yet"); - return false; - } - while (!fs.createNewFile(rawLock)) { - Thread.currentThread().sleep(1000); - } - return true; - } - - public boolean release(FileSystem fs) throws IOException, InterruptedException { - while (!fs.delete(rawLock, false)) { - Thread.currentThread().sleep(1000); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java deleted file mode 100644 index 1cd0800..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.apache.kylin.storage.hdfs; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.engine.mr.HadoopUtil; - -import java.io.IOException; - -/** - * Created by xiefan on 17-1-17. - */ -public class HDFSLockManager { - - private static final String LOCK_HOME = "LOCK_HOME"; - - private Path lockPath; - - private FileSystem fs; - - public HDFSLockManager(String hdfsWorkingDir) throws IOException{ - this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME); - this.fs = HadoopUtil.getFileSystem(lockPath); - if(!fs.exists(lockPath)){ - fs.create(lockPath); - } - } - - public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{ - HDFSLock lock = new HDFSLock(resourceFullPath); - boolean success = lock.init(fs); - if(success){ - return lock; - }else{ - throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath); - } - } - - public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{ - boolean success = lock.release(fs); - if(!success) - throw new IllegalStateException("Release lock fail"); - } - - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java index c7f0f25..a746a97 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,44 +39,62 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; -/** - * Created by xiefan on 17-1-10. - */ public class HDFSResourceStore extends ResourceStore { - private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs"; + private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class); + + private static final long DEFAULT_ACQUIRE_LOCK_TIMEOUT = 10; + + private static final String DEFAULT_FOLDER_NAME = "kylin_default_instance"; + + private static final String DEFAULT_METADATA_FOLDER_NAME = "hdfs_metadata"; private Path hdfsMetaPath; private FileSystem fs; - private HDFSLockManager lockManager; - - private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class); + private LockManager lockManager; //public for test. Normal should be protected - public HDFSResourceStore(KylinConfig kylinConfig) throws IOException { + public HDFSResourceStore(KylinConfig kylinConfig) throws Exception { super(kylinConfig); - String metadataUrl = kylinConfig.getHDFSMetadataUrl(); - // split TABLE@HBASE_URL + String metadataUrl = kylinConfig.getMetadataUrl(); int cut = metadataUrl.indexOf('@'); - String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut); + String metaDirName = cut < 0 ? DEFAULT_FOLDER_NAME : metadataUrl.substring(0, cut); + metaDirName += "/" + DEFAULT_METADATA_FOLDER_NAME; + logger.info("meta dir name :" + metaDirName); createMetaFolder(metaDirName, kylinConfig); } - private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException { + private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws Exception { String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory(); fs = HadoopUtil.getFileSystem(hdfsWorkingDir); + logger.info("hdfs working dir : " + hdfsWorkingDir); Path hdfsWorkingPath = new Path(hdfsWorkingDir); if (!fs.exists(hdfsWorkingPath)) { throw new IOException("HDFS working dir not exist"); } + //creat lock manager + this.lockManager = new LockManager(kylinConfig, kylinConfig.getRawHdfsWorkingDirectory() + metaDirName); + //create hdfs meta path hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName); if (!fs.exists(hdfsMetaPath)) { - fs.create(hdfsMetaPath, true); + ResourceLock lock = lockManager.getLock(lockManager.getLockPath("/")); + try { + if (lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES)) { + logger.info("get root lock successfully"); + if (!fs.exists(hdfsMetaPath)) { + fs.mkdirs(hdfsMetaPath); + logger.info("create hdfs meta path"); + } + } + } finally { + lockManager.releaseLock(lock); + } } - lockManager = new HDFSLockManager(hdfsWorkingDir); + logger.info("hdfs meta path : " + hdfsMetaPath.toString()); } @Override @@ -132,7 +150,8 @@ public class HDFSResourceStore extends ResourceStore { logger.warn("Zero length file: " + p.toString()); } FSDataInputStream in = fs.open(p); - return new RawResource(fs.open(p), getResourceTimestamp(resPath)); + long t = in.readLong(); + return new RawResource(in, t); } else { return null; } @@ -144,19 +163,42 @@ public class HDFSResourceStore extends ResourceStore { if (!fs.exists(p) || !fs.isFile(p)) { return 0; } - FileStatus status = fs.getFileStatus(p); - return status.getModificationTime(); + FSDataInputStream in = null; + ResourceLock lock = null; + try { + lock = lockManager.getLock(resPath); + lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES); + in = fs.open(p); + long t = in.readLong(); + return t; + } catch (Exception e) { + throw new IOException("Put resource fail", e); + } finally { + IOUtils.closeQuietly(in); + lockManager.releaseLock(lock); + } + } @Override protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { + logger.info("res path : " + resPath); Path p = getRealHDFSPath(resPath); + logger.info("put resource : " + p.toUri()); FSDataOutputStream out = null; + ResourceLock lock = null; try { + lock = lockManager.getLock(resPath); + lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES); out = fs.create(p, true); + out.writeLong(ts); IOUtils.copy(content, out); + + } catch (Exception e) { + throw new IOException("Put resource fail", e); } finally { IOUtils.closeQuietly(out); + lockManager.releaseLock(lock); } } @@ -180,9 +222,18 @@ public class HDFSResourceStore extends ResourceStore { @Override protected void deleteResourceImpl(String resPath) throws IOException { - Path p = getRealHDFSPath(resPath); - if (fs.exists(p)) { - fs.delete(p, true); + ResourceLock lock = null; + try { + lock = lockManager.getLock(resPath); + lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES); + Path p = getRealHDFSPath(resPath); + if (fs.exists(p)) { + fs.delete(p, true); + } + } catch (Exception e) { + throw new IOException("Delete resource fail", e); + } finally { + lockManager.releaseLock(lock); } } @@ -192,6 +243,8 @@ public class HDFSResourceStore extends ResourceStore { } private Path getRealHDFSPath(String resourcePath) { + if (resourcePath.startsWith("/") && resourcePath.length() > 1) + resourcePath = resourcePath.substring(1, resourcePath.length()); return new Path(this.hdfsMetaPath, resourcePath); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java new file mode 100644 index 0000000..4959718 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.Arrays; + +public class LockManager { + + private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class); + + final private KylinConfig config; + + final CuratorFramework zkClient; + + private String lockRootPath; + + public LockManager(String lockRootPath) throws Exception { + + this(KylinConfig.getInstanceFromEnv(), lockRootPath); + } + + public LockManager(KylinConfig config, String lockRootPath) throws Exception { + this.config = config; + this.lockRootPath = lockRootPath; + String zkConnectString = getZKConnectString(config); + logger.info("zk connection string:" + zkConnectString); + if (StringUtils.isEmpty(zkConnectString)) { + throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!"); + } + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy); + zkClient.start(); + if (zkClient.checkExists().forPath(lockRootPath) == null) + zkClient.create().creatingParentsIfNeeded().forPath(lockRootPath); + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + close(); + } + })); + } + + public ResourceLock getLock(String name) throws Exception { + String lockPath = getLockPath(name); + InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath); + return new ResourceLock(lockPath, lock); + } + + public void releaseLock(ResourceLock lock) { + try { + if (lock != null) + lock.release(); + } catch (Exception e) { + logger.error("Fail to release lock"); + e.printStackTrace(); + } + } + + private static String getZKConnectString(KylinConfig kylinConfig) { + final String host = kylinConfig.getZooKeeperHost(); + final String port = kylinConfig.getZooKeeperPort(); + return StringUtils.join(Iterables.transform(Arrays.asList(host.split(",")), new Function<String, String>() { + @Nullable + @Override + public String apply(String input) { + return input + ":" + port; + } + }), ","); + } + + public String getLockPath(String resourceName) { + if (!resourceName.startsWith("/")) + resourceName = "/" + resourceName; + if (resourceName.endsWith("/")) + resourceName = resourceName.substring(0, resourceName.length() - 1); + return lockRootPath + resourceName; + } + + public void close() { + try { + zkClient.close(); + } catch (Exception e) { + logger.error("error occurred to close PathChildrenCache", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java new file mode 100644 index 0000000..9d51871 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hdfs; + +import org.apache.curator.framework.recipes.locks.InterProcessMutex; + +import java.util.concurrent.TimeUnit; + + +public class ResourceLock { + + private String resourcePath; + + private InterProcessMutex lock; + + protected ResourceLock(String resourcePath, InterProcessMutex lock) { + this.resourcePath = resourcePath; + this.lock = lock; + } + + public boolean acquire(long time, TimeUnit unit) throws Exception { + return lock.acquire(time, unit); + } + + public void acquire() throws Exception{ + lock.acquire(); + } + + protected void release() throws Exception { + lock.release(); + } + + public String getResourcePath() { + return resourcePath; + } +}