This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch sync in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4823d8b0f5b344f4b94b82bd7c8bc1d5bc7bc07a Author: Jiatao Tao <245915...@qq.com> AuthorDate: Wed Feb 7 13:27:59 2018 +0800 KYLIN-3234, add HDFS resource store implementation. --- .../common/persistence/FileResourceStore.java | 1 - .../common/persistence/HDFSResourceStore.java | 37 ++++++++--- .../common/persistence/HDFSResourceStoreTest.java | 77 ++++++++++++++++++++++ 3 files changed, 105 insertions(+), 10 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java index 6ac4536..38ccbdd 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java @@ -66,7 +66,6 @@ public class FileResourceStore extends ResourceStore { String[] names = file.list(); // not a directory if (names == null) - // fixme should return empty set, like HBase implement. return null; String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/"; if (recursive) { diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java index 7a3a933..8ad2540 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java @@ -31,7 +31,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.util.HadoopUtil; @@ -89,20 +91,37 @@ public class HDFSResourceStore extends ResourceStore { @Override protected NavigableSet<String> listResourcesImpl(String folderPath, boolean recursive) throws IOException { - if (recursive) { - throw new IllegalArgumentException("Not support fullPath yet"); - } Path p = getRealHDFSPath(folderPath); + String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/"; if (!fs.exists(p) || !fs.isDirectory(p)) { return null; } - TreeSet<String> r = new TreeSet<>(); - FileStatus[] statuses = fs.listStatus(p); - String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/"; - for (FileStatus status : statuses) { - r.add(prefix + status.getPath().getName()); + TreeSet<String> r; + + if (recursive) { + r = getAllFilePath(p, prefix); + } else { + r = getFilePath(p, prefix); + } + return r.isEmpty() ? null : r; + } + + private TreeSet<String> getFilePath(Path p, String prefix) throws IOException { + TreeSet<String> fileList = new TreeSet<>(); + for (FileStatus fileStat : fs.listStatus(p)) { + fileList.add(prefix + fileStat.getPath().getName()); + } + return fileList; + } + + TreeSet<String> getAllFilePath(Path filePath, String prefix) throws IOException { + TreeSet<String> fileList = new TreeSet<>(); + RemoteIterator<LocatedFileStatus> it = fs.listFiles(filePath, true); + while (it.hasNext()) { + String[] path = it.next().getPath().toString().split(prefix, 2); + fileList.add(prefix + path[1]); } - return r; + return fileList; } @Override diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/HDFSResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/HDFSResourceStoreTest.java new file mode 100644 index 0000000..e5e26a7 --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/persistence/HDFSResourceStoreTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.persistence; + +import java.io.File; +import java.lang.reflect.Field; +import java.util.TreeSet; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class HDFSResourceStoreTest extends LocalFileMetadataTestCase { + + @Before + public void setup() throws Exception { + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testListResourcesImpl() throws Exception { + String path = "../examples/test_metadata/"; + String cp = new File(path).getCanonicalFile().getPath(); + FileSystem fs = HadoopUtil.getFileSystem(cp); + HDFSResourceStore store = new HDFSResourceStore(KylinConfig.getInstanceFromEnv(), + StorageURL.valueOf("hdfs@hdfs")); + Field field = store.getClass().getDeclaredField("fs"); + field.setAccessible(true); + field.set(store, fs); + + File f1 = new File(cp + "/resource/resource/e1.json"); + File f2 = new File(cp + "/resource/resource/e2.json"); + if (!f1.getParentFile().exists()) { + if (!f1.getParentFile().mkdirs()) { + throw new RuntimeException("Can not create dir."); + } + } + if (!(f1.createNewFile() && f2.createNewFile())) { + throw new RuntimeException("Can not create file."); + } + + Path p = new Path(cp); + TreeSet<String> resources = store.getAllFilePath(new Path(p, "resource"), "/resource/"); + TreeSet<String> expected = new TreeSet<>(); + expected.add("/resource/resource/e1.json"); + expected.add("/resource/resource/e2.json"); + Assert.assertEquals(expected, resources); + } +} -- To stop receiving notification emails like this one, please contact liy...@apache.org.