KYLIN-1152 ResourceStore add getResourceTimestamp() API
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e9219d7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e9219d7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e9219d7b Branch: refs/heads/1.x-HBase1.1.3 Commit: e9219d7ba42379d929c5c2ebf183b3919381cc05 Parents: 2f788e7 Author: Li, Yang <yang...@ebay.com> Authored: Tue Nov 17 13:54:19 2015 +0800 Committer: Li, Yang <yang...@ebay.com> Committed: Tue Nov 17 13:54:19 2015 +0800 ---------------------------------------------------------------------- .../kylin/common/persistence/FileResourceStore.java | 9 +++++++++ .../kylin/common/persistence/HBaseResourceStore.java | 7 ++++++- .../apache/kylin/common/persistence/ResourceStore.java | 9 ++++++++- .../apache/kylin/job/hadoop/cube/MetadataCleanupJob.java | 11 ++++------- 4 files changed, 27 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e9219d7b/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java index 98c6b18..646cd80 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java @@ -100,6 +100,15 @@ public class FileResourceStore extends ResourceStore { } @Override + protected long getResourceTimestampImpl(String resPath) throws IOException { + File f = file(resPath); + if (f.exists() && f.isFile()) + return f.lastModified(); + else + return 0; + } + + @Override protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { File f = file(resPath); f.getParentFile().mkdirs(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e9219d7b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java index 2b14345..35a62b5 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java @@ -206,6 +206,11 @@ public class HBaseResourceStore extends ResourceStore { } @Override + protected long getResourceTimestampImpl(String resPath) throws IOException { + return getTimestamp(getByScan(resPath, false, true)); + } + + @Override protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { ByteArrayOutputStream bout = new ByteArrayOutputStream(); IOUtils.copy(content, bout); @@ -233,7 +238,7 @@ public class HBaseResourceStore extends ResourceStore { boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); if (!ok) { - long real = getTimestamp(getByScan(resPath, false, true)); + long real = getResourceTimestampImpl(resPath); throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e9219d7b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index a23a4cd..db70997 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -146,6 +146,10 @@ abstract public class ResourceStore { return getResourceImpl(norm(resPath)); } + final public long getResourceTimestamp(String resPath) throws IOException { + return getResourceTimestampImpl(norm(resPath)); + } + final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, Class<T> clazz, Serializer<T> serializer) throws IOException { final List<RawResource> allResources = getAllResources(rangeStart, rangeEnd); if (allResources.isEmpty()) { @@ -170,7 +174,10 @@ abstract public class ResourceStore { /** returns null if not exists */ abstract protected RawResource getResourceImpl(String resPath) throws IOException; - + + /** returns 0 if not exists */ + abstract protected long getResourceTimestampImpl(String resPath) throws IOException; + /** * overwrite a resource without write conflict check */ http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e9219d7b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java index cb601c5..6d06dcc 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java @@ -28,7 +28,6 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.hadoop.AbstractHadoopJob; @@ -115,9 +114,8 @@ public class MetadataCleanupJob extends AbstractHadoopJob { if (snapshotNames != null) for (String snapshot : snapshotNames) { if (!activeResourceList.contains(snapshot)) { - RawResource res = getStore().getResource(snapshot); - res.inputStream.close(); - if (isOlderThanThreshold(res.timestamp)) + long ts = getStore().getResourceTimestamp(snapshot); + if (isOlderThanThreshold(ts)) toDeleteResource.add(snapshot); } } @@ -137,9 +135,8 @@ public class MetadataCleanupJob extends AbstractHadoopJob { if (dictionaries != null) for (String dict : dictionaries) if (!activeResourceList.contains(dict)) { - RawResource res = getStore().getResource(dict); - res.inputStream.close(); - if (isOlderThanThreshold(res.timestamp)) + long ts = getStore().getResourceTimestamp(dict); + if (isOlderThanThreshold(ts)) toDeleteResource.add(dict); } }