KYLIN-1506 Refactor resource interface for timeseries-based data like jobs for better performance (with Hao Chen <[email protected]>)
This closes #31 Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/30cccbda Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/30cccbda Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/30cccbda Branch: refs/heads/master Commit: 30cccbda886f850c18a675e33528e825f827bd14 Parents: 6d368cd Author: Yang Li <[email protected]> Authored: Sun Mar 20 18:41:48 2016 +0800 Committer: Yang Li <[email protected]> Committed: Sun Mar 20 18:41:48 2016 +0800 ---------------------------------------------------------------------- .../common/persistence/FileResourceStore.java | 44 ++--- .../kylin/common/persistence/ResourceStore.java | 49 +++-- .../kylin/common/persistence/ResourceTool.java | 3 +- .../persistence/LocalFileResourceStoreTest.java | 131 +------------ .../common/persistence/ResourceStoreTest.java | 189 +++++++++++++++++++ .../org/apache/kylin/cube/CubeManagerTest.java | 18 +- .../apache/kylin/dict/DictionaryManager.java | 18 +- .../org/apache/kylin/job/dao/ExecutableDao.java | 36 +--- .../storage/hbase/ITHBaseResourceStoreTest.java | 132 +------------ .../kylin/storage/hbase/HBaseResourceStore.java | 121 ++++++------ 10 files changed, 323 insertions(+), 418 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/30cccbda/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java index 1ab659f..c1c62fb 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java @@ -24,13 +24,12 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,13 +50,13 @@ public class FileResourceStore extends ResourceStore { } @Override - protected NavigableSet<String> listResourcesImpl(String resPath) throws IOException { - String[] names = file(resPath).list(); + protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException { + String[] names = file(folderPath).list(); if (names == null) // not a directory return null; TreeSet<String> r = new TreeSet<>(); - String prefix = resPath.endsWith("/") ? resPath : resPath + "/"; + String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/"; for (String n : names) { r.add(prefix + n); } @@ -67,39 +66,32 @@ public class FileResourceStore extends ResourceStore { @Override protected boolean existsImpl(String resPath) throws IOException { File f = file(resPath); - return f.exists() && f.isFile(); // directory is not considered a - // resource + return f.exists() && f.isFile(); // directory is not considered a resource } @Override - protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException { - List<RawResource> result = Lists.newArrayList(); + protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException { + NavigableSet<String> resources = listResources(folderPath); + if (resources == null) + return Collections.emptyList(); + + List<RawResource> result = Lists.newArrayListWithCapacity(resources.size()); try { - String commonPrefix = StringUtils.getCommonPrefix(rangeEnd, rangeStart); - commonPrefix = commonPrefix.substring(0, commonPrefix.lastIndexOf("/") + 1); - final NavigableSet<String> resources = listResourcesImpl(commonPrefix); - for (String resource : resources) { - if (resource.compareTo(rangeStart) >= 0 && resource.compareTo(rangeEnd) <= 0) { - if (existsImpl(resource)) { - result.add(getResourceImpl(resource)); - } + for (String res : resources) { + long ts = getResourceTimestampImpl(res); + if (timeStart <= ts && ts < timeEndExclusive) { + RawResource resource = getResourceImpl(res); + if (resource != null) // can be null if is a sub-folder + result.add(resource); } } - return result; } catch (IOException ex) { for (RawResource rawResource : result) { IOUtils.closeQuietly(rawResource.inputStream); } throw ex; - } catch (Exception ex) { - throw new UnsupportedOperationException(ex); } - } - - @Override - protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException { - //just ignore time filter - return getAllResources(rangeStart, rangeEnd); + return result; } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/30cccbda/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index 4e9e904..746527d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -114,19 +114,17 @@ abstract public class ResourceStore { } /** - * return a list of child resources & folders under given path, return null - * if given path is not a folder + * List resources and sub-folders under a given folder, return null if given path is not a folder */ - final public NavigableSet<String> listResources(String resPath) throws IOException { - resPath = norm(resPath); - return listResourcesImpl(resPath); + final public NavigableSet<String> listResources(String folderPath) throws IOException { + String path = norm(folderPath); + return listResourcesImpl(path); } - abstract protected NavigableSet<String> listResourcesImpl(String resPath) throws IOException; + abstract protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException; /** - * return true if a resource exists, return false in case of folder or - * non-exist + * Return true if a resource exists, return false in case of folder or non-exist */ final public boolean exists(String resPath) throws IOException { return existsImpl(norm(resPath)); @@ -135,7 +133,7 @@ abstract public class ResourceStore { abstract protected boolean existsImpl(String resPath) throws IOException; /** - * read a resource, return null in case of not found + * Read a resource, return null in case of not found or is a folder */ final public <T extends RootPersistentEntity> T getResource(String resPath, Class<T> clz, Serializer<T> serializer) throws IOException { resPath = norm(resPath); @@ -162,16 +160,22 @@ abstract public class ResourceStore { return getResourceTimestampImpl(norm(resPath)); } - final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, Class<T> clazz, Serializer<T> serializer) throws IOException { - return getAllResources(rangeStart, rangeEnd, -1L, -1L, clazz, serializer); + /** + * Read all resources under a folder. Return empty list if folder not exist. + */ + final public <T extends RootPersistentEntity> List<T> getAllResources(String folderPath, Class<T> clazz, Serializer<T> serializer) throws IOException { + return getAllResources(folderPath, Long.MIN_VALUE, Long.MAX_VALUE, clazz, serializer); } - final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis, Class<T> clazz, Serializer<T> serializer) throws IOException { - final List<RawResource> allResources = getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis); - if (allResources.isEmpty()) { + /** + * Read all resources under a folder having last modified time between given range. Return empty list if folder not exist. + */ + final public <T extends RootPersistentEntity> List<T> getAllResources(String folderPath, long timeStart, long timeEndExclusive, Class<T> clazz, Serializer<T> serializer) throws IOException { + final List<RawResource> allResources = getAllResourcesImpl(folderPath, timeStart, timeEndExclusive); + if (allResources == null || allResources.isEmpty()) { return Collections.emptyList(); } - List<T> result = Lists.newArrayList(); + List<T> result = Lists.newArrayListWithCapacity(allResources.size()); try { for (RawResource rawResource : allResources) { final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream)); @@ -181,14 +185,13 @@ abstract public class ResourceStore { return result; } finally { for (RawResource rawResource : allResources) { - IOUtils.closeQuietly(rawResource.inputStream); + if (rawResource != null) + IOUtils.closeQuietly(rawResource.inputStream); } } } - abstract protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException; - - abstract protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException; + abstract protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException; /** returns null if not exists */ abstract protected RawResource getResourceImpl(String resPath) throws IOException; @@ -211,11 +214,17 @@ abstract public class ResourceStore { * check & set, overwrite a resource */ final public <T extends RootPersistentEntity> long putResource(String resPath, T obj, Serializer<T> serializer) throws IOException { + return putResource(resPath, obj, System.currentTimeMillis(), serializer); + } + + /** + * check & set, overwrite a resource + */ + final public <T extends RootPersistentEntity> long putResource(String resPath, T obj, long newTS, Serializer<T> serializer) throws IOException { resPath = norm(resPath); //logger.debug("Saving resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); long oldTS = obj.getLastModified(); - long newTS = System.currentTimeMillis(); obj.setLastModified(newTS); try { http://git-wip-us.apache.org/repos/asf/kylin/blob/30cccbda/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java index 56f855c..c0f3fd9 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java @@ -22,7 +22,6 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; @@ -169,7 +168,7 @@ public class ResourceTool { resetR(store, "/"); } - private static void resetR(ResourceStore store, String path) throws IOException { + public static void resetR(ResourceStore store, String path) throws IOException { NavigableSet<String> children = store.listResources(path); if (children == null) { // path is a resource (not a folder) if (matchFilter(path)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/30cccbda/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java index 7ba5329..cc6143d 100644 --- a/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/persistence/LocalFileResourceStoreTest.java @@ -18,18 +18,6 @@ package org.apache.kylin.common.persistence; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.NavigableSet; - -import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.junit.After; @@ -50,123 +38,8 @@ public class LocalFileResourceStoreTest extends LocalFileMetadataTestCase { @Test public void testFileStore() throws Exception { - testAStore(ResourceStore.getStore(KylinConfig.getInstanceFromEnv())); - } - - void testAStore(ResourceStore store) throws IOException { - String dir1 = "/cube"; - String path1 = "/cube/_test.json"; - StringEntity content1 = new StringEntity("anything"); - String dir2 = "/table"; - String path2 = "/table/_test.json"; - StringEntity content2 = new StringEntity("something"); - - // cleanup legacy if any - store.deleteResource(path1); - store.deleteResource(path2); - - StringEntity t; - - // put/get - store.putResource(path1, content1, StringEntity.serializer); - assertTrue(store.exists(path1)); - t = store.getResource(path1, StringEntity.class, StringEntity.serializer); - assertEquals(content1, t); - - store.putResource(path2, content2, StringEntity.serializer); - assertTrue(store.exists(path2)); - t = store.getResource(path2, StringEntity.class, StringEntity.serializer); - assertEquals(content2, t); - - // overwrite - t.str = "new string"; - store.putResource(path2, t, StringEntity.serializer); - - // write conflict - try { - t.setLastModified(t.lastModified - 1); - store.putResource(path2, t, StringEntity.serializer); - fail("write conflict should trigger IllegalStateException"); - } catch (IllegalStateException e) { - // expected - } - - // list - NavigableSet<String> list; - - list = store.listResources(dir1); - assertTrue(list.contains(path1)); - assertTrue(list.contains(path2) == false); - - list = store.listResources(dir2); - assertTrue(list.contains(path2)); - assertTrue(list.contains(path1) == false); - - list = store.listResources("/"); - assertTrue(list.contains(dir1)); - assertTrue(list.contains(dir2)); - assertTrue(list.contains(path1) == false); - assertTrue(list.contains(path2) == false); - - list = store.listResources(path1); - assertNull(list); - list = store.listResources(path2); - assertNull(list); - - // delete/exist - store.deleteResource(path1); - assertTrue(store.exists(path1) == false); - list = store.listResources(dir1); - assertTrue(list == null || list.contains(path1) == false); - - store.deleteResource(path2); - assertTrue(store.exists(path2) == false); - list = store.listResources(dir2); - assertTrue(list == null || list.contains(path2) == false); - } - - public static class StringEntity extends RootPersistentEntity { - - static final Serializer<StringEntity> serializer = new Serializer<StringEntity>() { - @Override - public void serialize(StringEntity obj, DataOutputStream out) throws IOException { - out.writeUTF(obj.str); - } - - @Override - public StringEntity deserialize(DataInputStream in) throws IOException { - String str = in.readUTF(); - return new StringEntity(str); - } - }; - - String str; - - public StringEntity(String str) { - this.str = str; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + ((str == null) ? 0 : str.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) - return true; - if (!(obj instanceof StringEntity)) - return false; - return StringUtils.equals(this.str, ((StringEntity) obj).str); - } - - @Override - public String toString() { - return str; - } + ResourceStore store = ResourceStore.getStore(KylinConfig.getInstanceFromEnv()); + ResourceStoreTest.testAStore(store); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/30cccbda/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java new file mode 100644 index 0000000..69fc6d8 --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.persistence; + +import static org.junit.Assert.*; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.commons.lang.StringUtils; + +/** + * Be called by LocalFileResourceStoreTest and ITHBaseResourceStoreTest. + */ +public class ResourceStoreTest { + + public static void testAStore(ResourceStore store) throws IOException { + testBasics(store); + testGetAllResources(store); + } + + private static void testGetAllResources(ResourceStore store) throws IOException { + final String folder = "/testFolder"; + List<StringEntity> result; + + // reset any leftover garbage + ResourceTool.resetR(store, folder); + + store.putResource(folder + "/res1", new StringEntity("data1"), 1000, StringEntity.serializer); + store.putResource(folder + "/res2", new StringEntity("data2"), 2000, StringEntity.serializer); + store.putResource(folder + "/sub/res3", new StringEntity("data3"), 3000, StringEntity.serializer); + store.putResource(folder + "/res4", new StringEntity("data4"), 4000, StringEntity.serializer); + + result = store.getAllResources(folder, StringEntity.class, StringEntity.serializer); + assertEntity(result.get(0), "data1", 1000); + assertEntity(result.get(1), "data2", 2000); + assertEntity(result.get(2), "data4", 4000); + assertEquals(3, result.size()); + + result = store.getAllResources(folder, 2000, 4000, StringEntity.class, StringEntity.serializer); + assertEntity(result.get(0), "data2", 2000); + assertEquals(1, result.size()); + + ResourceTool.resetR(store, folder); + } + + private static void assertEntity(StringEntity entity, String data, int ts) { + assertEquals(data, entity.str); + assertEquals(ts, entity.lastModified); + } + + private static void testBasics(ResourceStore store) throws IOException { + String dir1 = "/cube"; + String path1 = "/cube/_test.json"; + StringEntity content1 = new StringEntity("anything"); + String dir2 = "/table"; + String path2 = "/table/_test.json"; + StringEntity content2 = new StringEntity("something"); + + // cleanup legacy if any + store.deleteResource(path1); + store.deleteResource(path2); + + StringEntity t; + + // put/get + store.putResource(path1, content1, StringEntity.serializer); + assertTrue(store.exists(path1)); + t = store.getResource(path1, StringEntity.class, StringEntity.serializer); + assertEquals(content1, t); + + store.putResource(path2, content2, StringEntity.serializer); + assertTrue(store.exists(path2)); + t = store.getResource(path2, StringEntity.class, StringEntity.serializer); + assertEquals(content2, t); + + // overwrite + t.str = "new string"; + store.putResource(path2, t, StringEntity.serializer); + + // write conflict + try { + t.setLastModified(t.getLastModified() - 1); + store.putResource(path2, t, StringEntity.serializer); + fail("write conflict should trigger IllegalStateException"); + } catch (IllegalStateException e) { + // expected + } + + // list + NavigableSet<String> list; + + list = store.listResources(dir1); + assertTrue(list.contains(path1)); + assertTrue(list.contains(path2) == false); + + list = store.listResources(dir2); + assertTrue(list.contains(path2)); + assertTrue(list.contains(path1) == false); + + list = store.listResources("/"); + assertTrue(list.contains(dir1)); + assertTrue(list.contains(dir2)); + assertTrue(list.contains(path1) == false); + assertTrue(list.contains(path2) == false); + + list = store.listResources(path1); + assertNull(list); + list = store.listResources(path2); + assertNull(list); + + // delete/exist + store.deleteResource(path1); + assertTrue(store.exists(path1) == false); + list = store.listResources(dir1); + assertTrue(list == null || list.contains(path1) == false); + + store.deleteResource(path2); + assertTrue(store.exists(path2) == false); + list = store.listResources(dir2); + assertTrue(list == null || list.contains(path2) == false); + } + + @SuppressWarnings("serial") + public static class StringEntity extends RootPersistentEntity { + + public static final Serializer<StringEntity> serializer = new Serializer<StringEntity>() { + @Override + public void serialize(StringEntity obj, DataOutputStream out) throws IOException { + out.writeUTF(obj.str); + } + + @Override + public StringEntity deserialize(DataInputStream in) throws IOException { + String str = in.readUTF(); + return new StringEntity(str); + } + }; + + String str; + + public StringEntity(String str) { + this.str = str; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((str == null) ? 0 : str.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (!(obj instanceof StringEntity)) + return false; + return StringUtils.equals(this.str, ((StringEntity) obj).str); + } + + @Override + public String toString() { + return str; + } + } + + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/30cccbda/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java index 18a8150..409e01d 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java @@ -18,6 +18,11 @@ package org.apache.kylin.cube; +import static org.junit.Assert.*; + +import java.util.List; +import java.util.NavigableSet; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.JsonUtil; @@ -30,12 +35,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableSet; - -import static org.junit.Assert.*; - /** * @author yangli9 */ @@ -126,15 +125,10 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { public void testGetAllCubes() throws Exception { final ResourceStore store = ResourceStore.getStore(getTestConfig()); final NavigableSet<String> cubePath = store.listResources(ResourceStore.CUBE_RESOURCE_ROOT); - final Iterator<String> iterator = cubePath.iterator(); - final String firstPath = iterator.next(); - final String secondPath = iterator.next(); - final String lastPath = cubePath.last(); assertTrue(cubePath.size() > 1); - final List<CubeInstance> cubes = store.getAllResources(firstPath, lastPath, CubeInstance.class, CubeManager.CUBE_SERIALIZER); + final List<CubeInstance> cubes = store.getAllResources(ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class, CubeManager.CUBE_SERIALIZER); assertEquals(cubePath.size(), cubes.size()); - assertEquals(cubePath.size() - 1, store.getAllResources(secondPath, lastPath, CubeInstance.class, CubeManager.CUBE_SERIALIZER).size()); } @Test http://git-wip-us.apache.org/repos/asf/kylin/blob/30cccbda/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index b3ed2ea..90ec35e 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -311,13 +311,8 @@ public class DictionaryManager { } private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException { - ResourceStore store = MetadataManager.getInstance(config).getStore(); - NavigableSet<String> existings = store.listResources(dictInfo.getResourceDir()); - if (existings == null || existings.isEmpty()) { - return null; - } - - final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(existings.first(), existings.last(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); + final ResourceStore store = MetadataManager.getInstance(config).getStore(); + final List<DictionaryInfo> allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); TableSignature input = dictInfo.getInput(); @@ -330,13 +325,8 @@ public class DictionaryManager { } private DictionaryInfo findLargestDictInfo(DictionaryInfo dictInfo) throws IOException { - ResourceStore store = MetadataManager.getInstance(config).getStore(); - NavigableSet<String> dictInfos = store.listResources(dictInfo.getResourceDir()); - if (dictInfos == null || dictInfos.isEmpty()) { - return null; - } - - final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(dictInfos.first(), dictInfos.last(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); + final ResourceStore store = MetadataManager.getInstance(config).getStore(); + final List<DictionaryInfo> allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); DictionaryInfo largestDict = null; for (DictionaryInfo dictionaryInfo : allResources) { http://git-wip-us.apache.org/repos/asf/kylin/blob/30cccbda/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java index 493555d..58f845d 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java @@ -96,28 +96,16 @@ public class ExecutableDao { public List<ExecutableOutputPO> getJobOutputs() throws PersistentException { try { - NavigableSet<String> resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT); - if (resources == null || resources.isEmpty()) { - return Collections.emptyList(); - } - String rangeStart = resources.first(); - String rangeEnd = resources.last(); - return store.getAllResources(rangeStart, rangeEnd, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER); + return store.getAllResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER); } catch (IOException e) { logger.error("error get all Jobs:", e); throw new PersistentException(e); } } - public List<ExecutableOutputPO> getJobOutputs(long timeStartInMillis, long timeEndInMillis) throws PersistentException { + public List<ExecutableOutputPO> getJobOutputs(long timeStart, long timeEndExclusive) throws PersistentException { try { - NavigableSet<String> resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT); - if (resources == null || resources.isEmpty()) { - return Collections.emptyList(); - } - String rangeStart = resources.first(); - String rangeEnd = resources.last(); - return store.getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER); + return store.getAllResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT, timeStart, timeEndExclusive, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER); } catch (IOException e) { logger.error("error get all Jobs:", e); throw new PersistentException(e); @@ -126,28 +114,16 @@ public class ExecutableDao { public List<ExecutablePO> getJobs() throws PersistentException { try { - final NavigableSet<String> jobIds = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT); - if (jobIds == null || jobIds.isEmpty()) { - return Collections.emptyList(); - } - String rangeStart = jobIds.first(); - String rangeEnd = jobIds.last(); - return store.getAllResources(rangeStart, rangeEnd, ExecutablePO.class, JOB_SERIALIZER); + return store.getAllResources(ResourceStore.EXECUTE_RESOURCE_ROOT, ExecutablePO.class, JOB_SERIALIZER); } catch (IOException e) { logger.error("error get all Jobs:", e); throw new PersistentException(e); } } - public List<ExecutablePO> getJobs(long timeStartInMillis, long timeEndInMillis) throws PersistentException { + public List<ExecutablePO> getJobs(long timeStart, long timeEndExclusive) throws PersistentException { try { - final NavigableSet<String> jobIds = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT); - if (jobIds == null || jobIds.isEmpty()) { - return Collections.emptyList(); - } - String rangeStart = jobIds.first(); - String rangeEnd = jobIds.last(); - return store.getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis, ExecutablePO.class, JOB_SERIALIZER); + return store.getAllResources(ResourceStore.EXECUTE_RESOURCE_ROOT, timeStart, timeEndExclusive, ExecutablePO.class, JOB_SERIALIZER); } catch (IOException e) { logger.error("error get all Jobs:", e); throw new PersistentException(e); http://git-wip-us.apache.org/repos/asf/kylin/blob/30cccbda/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java index dccdca9..bc5cdf1 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java @@ -19,25 +19,16 @@ package org.apache.kylin.storage.hbase; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.NavigableSet; - -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.persistence.ResourceStoreTest; +import org.apache.kylin.common.persistence.ResourceStoreTest.StringEntity; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.junit.After; import org.junit.Before; @@ -57,7 +48,8 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase { @Test public void testHBaseStore() throws Exception { - testAStore(ResourceStore.getStore(KylinConfig.getInstanceFromEnv())); + ResourceStore store = ResourceStore.getStore(KylinConfig.getInstanceFromEnv()); + ResourceStoreTest.testAStore(store); } @Test @@ -95,120 +87,4 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase { } } - void testAStore(ResourceStore store) throws IOException { - String dir1 = "/cube"; - String path1 = "/cube/_test.json"; - StringEntity content1 = new StringEntity("anything"); - String dir2 = "/table"; - String path2 = "/table/_test.json"; - StringEntity content2 = new StringEntity("something"); - - // cleanup legacy if any - store.deleteResource(path1); - store.deleteResource(path2); - - StringEntity t; - - // put/get - store.putResource(path1, content1, StringEntity.serializer); - assertTrue(store.exists(path1)); - t = store.getResource(path1, StringEntity.class, StringEntity.serializer); - assertEquals(content1, t); - - store.putResource(path2, content2, StringEntity.serializer); - assertTrue(store.exists(path2)); - t = store.getResource(path2, StringEntity.class, StringEntity.serializer); - assertEquals(content2, t); - - // overwrite - t.str = "new string"; - store.putResource(path2, t, StringEntity.serializer); - - // write conflict - try { - t.setLastModified(t.getLastModified() - 1); - store.putResource(path2, t, StringEntity.serializer); - fail("write conflict should trigger IllegalStateException"); - } catch (IllegalStateException e) { - // expected - } - - // list - NavigableSet<String> list; - - list = store.listResources(dir1); - assertTrue(list.contains(path1)); - assertTrue(list.contains(path2) == false); - - list = store.listResources(dir2); - assertTrue(list.contains(path2)); - assertTrue(list.contains(path1) == false); - - list = store.listResources("/"); - assertTrue(list.contains(dir1)); - assertTrue(list.contains(dir2)); - assertTrue(list.contains(path1) == false); - assertTrue(list.contains(path2) == false); - - list = store.listResources(path1); - assertNull(list); - list = store.listResources(path2); - assertNull(list); - - // delete/exist - store.deleteResource(path1); - assertTrue(store.exists(path1) == false); - list = store.listResources(dir1); - assertTrue(list == null || list.contains(path1) == false); - - store.deleteResource(path2); - assertTrue(store.exists(path2) == false); - list = store.listResources(dir2); - assertTrue(list == null || list.contains(path2) == false); - } - - public static class StringEntity extends RootPersistentEntity { - - static final Serializer<StringEntity> serializer = new Serializer<StringEntity>() { - @Override - public void serialize(StringEntity obj, DataOutputStream out) throws IOException { - out.writeUTF(obj.str); - } - - @Override - public StringEntity deserialize(DataInputStream in) throws IOException { - String str = in.readUTF(); - return new StringEntity(str); - } - }; - - String str; - - public StringEntity(String str) { - this.str = str; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + ((str == null) ? 0 : str.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) - return true; - if (!(obj instanceof StringEntity)) - return false; - return StringUtils.equals(this.str, ((StringEntity) obj).str); - } - - @Override - public String toString() { - return str; - } - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/30cccbda/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index bbb5e21..bb5382f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -22,7 +22,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.util.*; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -37,7 +39,11 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.*; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; @@ -88,18 +94,44 @@ public class HBaseResourceStore extends ResourceStore { } @Override - protected NavigableSet<String> listResourcesImpl(String resPath) throws IOException { - assert resPath.startsWith("/"); - String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/"; + protected boolean existsImpl(String resPath) throws IOException { + Result r = getFromHTable(resPath, false, false); + return r != null; + } + + @Override + protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException { + final TreeSet<String> result = new TreeSet<>(); + + visitFolder(folderPath, new KeyOnlyFilter(), new FolderVisitor() { + @Override + public void visit(String childPath, String fullPath, Result hbaseResult) { + result.add(childPath); + } + }); + // return null to indicate not a folder + return result.isEmpty() ? null : result; + } + + private void visitFolder(String folderPath, Filter filter, FolderVisitor visitor) throws IOException { + assert folderPath.startsWith("/"); + String lookForPrefix = folderPath.endsWith("/") ? folderPath : folderPath + "/"; byte[] startRow = Bytes.toBytes(lookForPrefix); byte[] endRow = Bytes.toBytes(lookForPrefix); endRow[endRow.length - 1]++; - TreeSet<String> result = new TreeSet<>(); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); Scan scan = new Scan(startRow, endRow); - scan.setFilter(new KeyOnlyFilter()); + if ((filter != null && filter instanceof KeyOnlyFilter) == false) { + scan.addColumn(B_FAMILY, B_COLUMN_TS); + scan.addColumn(B_FAMILY, B_COLUMN); + } + if (filter != null) { + scan.setFilter(filter); + } + + tuneScanParameters(scan); + try { ResultScanner scanner = table.getScanner(scan); for (Result r : scanner) { @@ -107,73 +139,54 @@ public class HBaseResourceStore extends ResourceStore { assert path.startsWith(lookForPrefix); int cut = path.indexOf('/', lookForPrefix.length()); String child = cut < 0 ? path : path.substring(0, cut); - result.add(child); + visitor.visit(child, path, r); } } finally { IOUtils.closeQuietly(table); } - // return null to indicate not a folder - return result.isEmpty() ? null : result; } - @Override - protected boolean existsImpl(String resPath) throws IOException { - Result r = getFromHTable(resPath, false, false); - return r != null; + private void tuneScanParameters(Scan scan) { + // divide by 10 as some resource like dictionary or snapshot can be very large + scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10); + scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize()); + scan.setCacheBlocks(true); } - @Override - protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException { - return getAllResources(rangeStart, rangeEnd, -1L, -1L); + interface FolderVisitor { + void visit(String childPath, String fullPath, Result hbaseResult) throws IOException; } @Override - protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException { - byte[] startRow = Bytes.toBytes(rangeStart); - byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); - - Scan scan = new Scan(startRow, endRow); - scan.addColumn(B_FAMILY, B_COLUMN_TS); - scan.addColumn(B_FAMILY, B_COLUMN); - FilterList filterList = generateTimeFilterList(timeStartInMillis, timeEndInMillis); - if (filterList != null) { - scan.setFilter(filterList); - } - tuneScanParameters(scan); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - List<RawResource> result = Lists.newArrayList(); + protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException { + FilterList filter = generateTimeFilterList(timeStart, timeEndExclusive); + final List<RawResource> result = Lists.newArrayList(); try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); - } + visitFolder(folderPath, filter, new FolderVisitor() { + @Override + public void visit(String childPath, String fullPath, Result hbaseResult) throws IOException { + // is a direct child (not grand child)? + if (childPath.equals(fullPath)) + result.add(new RawResource(getInputStream(childPath, hbaseResult), getTimestamp(hbaseResult))); + } + }); } catch (IOException e) { for (RawResource rawResource : result) { IOUtils.closeQuietly(rawResource.inputStream); } throw e; - } finally { - IOUtils.closeQuietly(table); } return result; } - private void tuneScanParameters(Scan scan) { - // divide by 10 as some resource like dictionary or snapshot can be very large - scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10); - scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize()); - scan.setCacheBlocks(true); - } - - private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) { + private FilterList generateTimeFilterList(long timeStart, long timeEndExclusive) { FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); - if (timeStartInMillis != -1L) { - SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis)); + if (timeStart != Long.MIN_VALUE) { + SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(timeStart)); filterList.addFilter(timeStartFilter); } - if (timeEndInMillis != -1L) { - SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis)); + if (timeEndExclusive != Long.MAX_VALUE) { + SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS, Bytes.toBytes(timeEndExclusive)); filterList.addFilter(timeEndFilter); } return filterList.getFilters().size() == 0 ? null : filterList; @@ -299,12 +312,6 @@ public class HBaseResourceStore extends ResourceStore { } } - private byte[] plusZero(byte[] startRow) { - byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1); - endRow[endRow.length - 1] = 0; - return endRow; - } - private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { Path redirectPath = bigCellHDFSPath(resPath); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
