Repository: falcon Updated Branches: refs/heads/master 42d379463 -> ccc343fa8
FALCON-2205 describe for non trusted recipe Author: Praveen Adlakha <[email protected]> Reviewers: @pallavi Closes #312 from PraveenAdlakha/2205 and squashes the following commits: 7ab62c0 [Praveen Adlakha] checkstyle issue fixed 229506c [Praveen Adlakha] comments addressed and test cases added daf3b48 [Praveen Adlakha] WIP 5ccf8ed [Praveen Adlakha] FALCON-2205 describe for non trusted recipe Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ccc343fa Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ccc343fa Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ccc343fa Branch: refs/heads/master Commit: ccc343fa84bcb257f69eb64b942e58eae2efd504 Parents: 42d3794 Author: Praveen Adlakha <[email protected]> Authored: Mon Jan 2 14:22:19 2017 +0530 Committer: Praveen Adlakha <[email protected]> Committed: Mon Jan 2 14:22:19 2017 +0530 ---------------------------------------------------------------------- .../entity/store/StoreAccessException.java | 4 ++ .../falcon/extensions/store/ExtensionStore.java | 68 ++++++++++++++------ .../extensions/store/ExtensionStoreTest.java | 14 +++- 3 files changed, 65 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/ccc343fa/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java index 318dc2e..96f60ea 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java +++ b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java @@ -35,4 +35,8 @@ public class StoreAccessException extends FalconException { public StoreAccessException(Exception e) { super(e); } + + public StoreAccessException(String message){ + super(message); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/ccc343fa/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index 8e4acbe..13ff2d1 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -26,6 +26,7 @@ import org.apache.falcon.extensions.AbstractExtension; import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.persistence.ExtensionBean; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -96,7 +97,7 @@ public final class ExtensionStore { private void initializeDbTable() { try { metaStore.deleteExtensionsOfType(ExtensionType.TRUSTED); - List<String> extensions = getExtensions(); + List<String> extensions = getTrustedExtensions(); for (String extension : extensions) { ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extension) ? ExtensionType.TRUSTED : ExtensionType.CUSTOM; @@ -141,19 +142,29 @@ public final class ExtensionStore { } } - public Map<String, String> getExtensionArtifacts(final String extensionName) throws StoreAccessException { + public Map<String, String> getExtensionArtifacts(final String extensionName) throws + FalconException { Map<String, String> extensionFileMap = new HashMap<>(); + Path extensionPath; try { - Path extensionPath = new Path(storePath, extensionName.toLowerCase()); - RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(extensionPath, true); + RemoteIterator<LocatedFileStatus> fileStatusListIterator; + if (AbstractExtension.isExtensionTrusted(extensionName)){ + extensionPath = new Path(storePath, extensionName.toLowerCase()); + fileStatusListIterator = fs.listFiles(extensionPath, true); + }else{ + ExtensionBean extensionBean = metaStore.getDetail(extensionName); + extensionPath = new Path(extensionBean.getLocation()); + FileSystem fileSystem = getHdfsFileSystem(extensionBean.getLocation()); + fileStatusListIterator = fileSystem.listFiles(extensionPath, true); + } if (!fileStatusListIterator.hasNext()) { - throw new StoreAccessException(new Exception(" For extension " + extensionName - + " there are no artifacts at the extension store path " + storePath)); + throw new StoreAccessException(" For extension " + extensionName + + " there are no artifacts at the extension store path " + storePath); } while (fileStatusListIterator.hasNext()) { LocatedFileStatus fileStatus = fileStatusListIterator.next(); - Path filePath = Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()); + Path filePath = fileStatus.getPath(); extensionFileMap.put(filePath.getName(), filePath.toString()); } } catch (IOException e) { @@ -162,6 +173,8 @@ public final class ExtensionStore { return extensionFileMap; } + + public Map<String, String> getExtensionResources(final String extensionName) throws StoreAccessException { Map<String, String> extensionFileMap = new HashMap<>(); try { @@ -178,8 +191,8 @@ public final class ExtensionStore { } if (resourcesPath == null) { - throw new StoreAccessException(new Exception(" For extension " + extensionName - + " there is no " + RESOURCES_DIR + "at the extension store path " + storePath)); + throw new StoreAccessException(" For extension " + extensionName + + " there is no " + RESOURCES_DIR + "at the extension store path " + storePath); } RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(resourcesPath, true); while (fileStatusListIterator.hasNext()) { @@ -219,24 +232,31 @@ public final class ExtensionStore { } } - public String getExtensionResource(final String resourcePath) throws StoreAccessException { + public String getExtensionResource(final String resourcePath) throws FalconException { if (StringUtils.isBlank(resourcePath)) { - throw new StoreAccessException(new Exception("Resource path cannot be null or empty")); + throw new StoreAccessException("Resource path cannot be null or empty"); } try { Path resourceFile = new Path(resourcePath); + InputStream data; ByteArrayOutputStream writer = new ByteArrayOutputStream(); - InputStream data = fs.open(resourceFile); - IOUtils.copyBytes(data, writer, fs.getConf(), true); + if (resourcePath.startsWith("file")){ + data = fs.open(resourceFile); + IOUtils.copyBytes(data, writer, fs.getConf(), true); + }else{ + FileSystem fileSystem = getHdfsFileSystem(resourcePath); + data = fileSystem.open(resourceFile); + IOUtils.copyBytes(data, writer, fileSystem.getConf(), true); + } return writer.toString(); } catch (IOException e) { throw new StoreAccessException(e); } } - public List<String> getExtensions() throws StoreAccessException { + public List<String> getTrustedExtensions() throws StoreAccessException { List<String> extensionList = new ArrayList<>(); try { FileStatus[] fileStatuses = fs.listStatus(storePath); @@ -276,6 +296,19 @@ public final class ExtensionStore { throw new ValidationException(msg); } } + private FileSystem getHdfsFileSystem(String path) throws FalconException { + Configuration conf = new Configuration(); + URI uri; + try { + uri = new URI(path); + } catch (URISyntaxException e){ + LOG.error("Exception : ", e); + throw new FalconException(e); + } + conf.set("fs.default.name", uri.getScheme() + "://" + uri.getAuthority()); + return HadoopClientFactory.get().createFalconFileSystem(uri); + } + public String registerExtension(final String extensionName, final String path, final String description, String extensionOwner) throws URISyntaxException, FalconException { @@ -285,7 +318,7 @@ public final class ExtensionStore { assertURI("Authority", uri.getAuthority()); assertURI("Path", uri.getPath()); conf.set("fs.defaultFS", uri.getScheme() + "://" + uri.getAuthority()); - FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(uri); + FileSystem fileSystem = getHdfsFileSystem(path); try { fileSystem.listStatus(new Path(uri.getPath() + "/README")); } catch (IOException e) { @@ -328,11 +361,10 @@ public final class ExtensionStore { LOG.info("Extension :" + extensionName + " registered successfully."); return "Extension :" + extensionName + " registered successfully."; } - - public String getResource(final String extensionName, final String resourceName) throws StoreAccessException { + public String getResource(final String extensionName, final String resourceName) throws FalconException { Map<String, String> resources = getExtensionArtifacts(extensionName); if (resources.isEmpty()) { - throw new StoreAccessException(new Exception("No extension resources found for " + extensionName)); + throw new StoreAccessException("No extension resources found for " + extensionName); } return getExtensionResource(resources.get(resourceName)); http://git-wip-us.apache.org/repos/asf/falcon/blob/ccc343fa/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java index 0835f38..1b33e1b 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java @@ -36,10 +36,10 @@ import org.testng.annotations.Test; import javax.persistence.EntityManager; import javax.persistence.Query; +import java.io.OutputStreamWriter; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; -import java.io.OutputStreamWriter; import java.net.URISyntaxException; import java.util.Map; @@ -139,6 +139,8 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore { createMETA(extensionPath); store = ExtensionStore.get(); store.registerExtension("toBeDeleted", STORAGE_URL + extensionPath, "test desc", "falconUser"); + Assert.assertTrue(store.getResource("toBeDeleted", "README").equals("README")); + store.getResource("toBeDeleted", "README"); store.deleteExtension("toBeDeleted", "falconUser"); ExtensionMetaStore metaStore = new ExtensionMetaStore(); Assert.assertEquals(metaStore.getAllExtensions().size(), 0); @@ -188,14 +190,20 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore { if (fs.exists(path)) { fs.delete(path, true); } - fs.create(path); - path = new Path(extensionPath + "/libs/build/test.jar"); OutputStream os = fs.create(path); BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); + br.write("README"); + fs.create(path); + br.close(); + os.close(); + path = new Path(extensionPath + "/libs/build/test.jar"); + os = fs.create(path); + br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); br.write("Hello World"); br.write("test jar"); fs.create(path); br.close(); + os.close(); } private void clearDB() {
