Repository: falcon Updated Branches: refs/heads/master 9e25ede1f -> fd384a7a3
FALCON-2188 Rest api to register extension $ bin/falcon extension -register -extensionName test -path hdfs://hostname:8020/user/dataqa/sampleextension -description "test ing" Extension:testregistered succesfully. Also we get file not found exception if jars or Readme is not present please look at the test cases. Author: Praveen Adlakha <[email protected]> Reviewers: @sandeepSamudrala, @pallavi-rao Closes #304 from PraveenAdlakha/2188 and squashes the following commits: e4c21a0 [Praveen Adlakha] test case disabled 90e6ec9 [Praveen Adlakha] minor comments addressed 52f6ebc [Praveen Adlakha] comments addressed 05c9c5e [Praveen Adlakha] comments addressed and test cases updated 9a74a2c [Praveen Adlakha] FALCON-2188 Rest api to register extension Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fd384a7a Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fd384a7a Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fd384a7a Branch: refs/heads/master Commit: fd384a7a3b1da39dbae603c032bd3c23eaf0b134 Parents: 9e25ede Author: Praveen Adlakha <[email protected]> Authored: Tue Nov 22 08:54:11 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Nov 22 08:54:11 2016 +0530 ---------------------------------------------------------------------- .../java/org/apache/falcon/cli/FalconCLI.java | 2 +- .../apache/falcon/cli/FalconExtensionCLI.java | 20 ++++ .../falcon/client/FalconCLIConstants.java | 3 + .../org/apache/falcon/client/FalconClient.java | 11 +- .../falcon/extensions/store/ExtensionStore.java | 56 +++++++++- .../extensions/store/ExtensionStoreTest.java | 106 +++++++++++++++++++ .../apache/falcon/hadoop/JailedFileSystem.java | 5 + .../resource/extensions/ExtensionManager.java | 16 +++ 8 files changed, 215 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/fd384a7a/cli/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconCLI.java index a071d7a..2e0978f 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -101,7 +101,7 @@ public class FalconCLI { parser.addCommand(FalconCLIConstants.EXTENSION_CMD, "", "Extension operations like enumerate, definition, describe, list, instances, " + "submit, submitAndSchedule, schedule, suspend, resume, delete, update, validate,unregister" - + ",detail", + + ",detail,register", extensionCLI.createExtensionOptions(), true); parser.addCommand(FalconCLIConstants.VERSION_OPT, "", "show client version", new Options(), false); http://git-wip-us.apache.org/repos/asf/falcon/blob/fd384a7a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 4d023a6..83b550f 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -51,10 +51,13 @@ public class FalconExtensionCLI { public static final String INSTANCES_OPT = "instances"; public static final String UNREGISTER_OPT = "unregister"; public static final String DETAIL_OPT = "detail"; + public static final String REGISTER_OPT = "register"; // Input parameters public static final String ENTENSION_NAME_OPT = "extensionName"; public static final String JOB_NAME_OPT = "jobName"; + public static final String DESCRIPTION = "description"; + public static final String PATH = "path"; public FalconExtensionCLI() { } @@ -70,6 +73,8 @@ public class FalconExtensionCLI { String jobName = commandLine.getOptionValue(JOB_NAME_OPT); String filePath = commandLine.getOptionValue(FalconCLIConstants.FILE_PATH_OPT); String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT); + String path = commandLine.getOptionValue(FalconCLIConstants.PATH); + String description = commandLine.getOptionValue(FalconCLIConstants.DESCRIPTION); if (optionsList.contains(ENUMERATE_OPT)) { result = client.enumerateExtensions(); @@ -91,6 +96,14 @@ public class FalconExtensionCLI { validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); result = client.submitExtensionJob(extensionName, filePath, doAsUser).getMessage(); + } else if (optionsList.contains(REGISTER_OPT)) { + validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(path, PATH); + result = client.registerExtension(extensionName, path, description); + }else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) { + validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); + result = client.submitExtensionJob(extensionName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) { validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); @@ -164,6 +177,8 @@ public class FalconExtensionCLI { Option unregister = new Option(FalconCLIConstants.UREGISTER, false, "Un-register an extension. This will make" + " the extension unavailable for instantiation"); Option detail = new Option(FalconCLIConstants.DETAIL, false, "Show details of a given extension"); + Option register = new Option(FalconCLIConstants.REGISTER, false, "Register an extension with Falcon. This will " + + "make the extension available for instantiation for all users."); OptionGroup group = new OptionGroup(); group.addOption(enumerate); @@ -181,6 +196,7 @@ public class FalconExtensionCLI { group.addOption(delete); group.addOption(unregister); group.addOption(detail); + group.addOption(register); extensionOptions.addOptionGroup(group); Option url = new Option(FalconCLIConstants.URL_OPTION, true, "Falcon URL"); @@ -200,6 +216,8 @@ public class FalconExtensionCLI { Option status = new Option(FalconCLIConstants.STATUS_OPT, true, "Filter returned instances by status"); Option orderBy = new Option(FalconCLIConstants.ORDER_BY_OPT, true, "Order returned instances by this field"); Option filePath = new Option(FalconCLIConstants.FILE_PATH_OPT, true, "File path of extension parameters"); + Option path = new Option(FalconCLIConstants.PATH, true, "Path of hdfs location for extension"); + Option description = new Option(FalconCLIConstants.DESCRIPTION, true, "Short Description for extension"); extensionOptions.addOption(url); extensionOptions.addOption(doAs); @@ -216,6 +234,8 @@ public class FalconExtensionCLI { extensionOptions.addOption(status); extensionOptions.addOption(orderBy); extensionOptions.addOption(filePath); + extensionOptions.addOption(path); + extensionOptions.addOption(description); return extensionOptions; } http://git-wip-us.apache.org/repos/asf/falcon/blob/fd384a7a/client/src/main/java/org/apache/falcon/client/FalconCLIConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconCLIConstants.java b/client/src/main/java/org/apache/falcon/client/FalconCLIConstants.java index 3775771..26e8937 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconCLIConstants.java +++ b/client/src/main/java/org/apache/falcon/client/FalconCLIConstants.java @@ -219,4 +219,7 @@ public final class FalconCLIConstants { public static final String DO_AS_DESCRIPTION = "doAs user"; public static final String UREGISTER = "unregister"; public static final String DETAIL = "detail"; + public static final String REGISTER = "register"; + public static final String PATH = "path"; + public static final String DESCRIPTION = "description"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/fd384a7a/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 3db249a..dabed3f 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -348,7 +348,8 @@ public class FalconClient extends AbstractFalconClient { RESUME("api/extension/resume", HttpMethod.POST, MediaType.TEXT_XML), DELETE("api/extension/delete", HttpMethod.POST, MediaType.TEXT_XML), UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_PLAIN), - DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON);; + DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON), + REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_PLAIN); private String path; private String method; @@ -1036,6 +1037,14 @@ public class FalconClient extends AbstractFalconClient { return getResponse(String.class, clientResponse); } + public String registerExtension(final String extensionName, final String path, final String description) { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.REGISTER.path, extensionName).addQueryParam(PATH, path) + .addQueryParam(FalconCLIConstants.DESCRIPTION, description) + .call(ExtensionOperations.REGISTER); + return getResponse(String.class, clientResponse); + } + public String getExtensionDefinition(final String extensionName) { ClientResponse clientResponse = new ResourceBuilder() .path(ExtensionOperations.DEFINITION.path, extensionName) http://git-wip-us.apache.org/repos/asf/falcon/blob/fd384a7a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index 02f7e62..e15919f 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -25,12 +25,14 @@ import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.entity.parser.ValidationException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.IOUtils; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -39,6 +41,8 @@ import java.io.InputStream; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.entity.store.StoreAccessException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -264,6 +268,54 @@ public final class ExtensionStore { } } + public String registerExtensionMetadata(final String extensionName, final String path, final String description) + throws URISyntaxException, FalconException { + Configuration conf = new Configuration(); + URI uri = new URI(path); + conf.set("fs.default.name", uri.getScheme() + "://" + uri.getAuthority()); + FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(uri); + try { + fileSystem.listStatus(new Path(uri.getPath() + "/README")); + } catch (IOException e){ + LOG.error("Exception in registerExtensionMetadata:", e); + throw new ValidationException("README file is not present in the " + path); + } + PathFilter filter=new PathFilter(){ + public boolean accept(Path file){ + return file.getName().endsWith(".jar"); + } + }; + FileStatus[] jarStatus; + try { + jarStatus = fileSystem.listStatus(new Path(uri.getPath() + "/libs/build"), filter); + if (jarStatus.length <=0) { + throw new ValidationException("Jars are not present in the " + uri.getPath() + "libs/build."); + } + } catch (IOException e){ + LOG.error("Exception in registerExtensionMetadata:", e); + throw new ValidationException("Jars are not present in the " + uri.getPath() + "libs/build."); + } + FileStatus[] propStatus; + try{ + propStatus = fileSystem.listStatus(new Path(uri.getPath() + "/META")); + if (propStatus.length <=0){ + throw new ValidationException("No properties file is not present in the " + uri.getPath() + "/META" + + " structure."); + } + } catch (IOException e){ + LOG.error("Exception in registerExtensionMetadata:", e); + throw new ValidationException("Directory is not present in the " + uri.getPath() + "/META" + + " structure."); + } + + if (!metaStore.checkIfExtensionExists(extensionName)){ + metaStore.storeExtensionMetadataBean(extensionName, path, ExtensionType.CUSTOM, description); + }else{ + throw new ValidationException(extensionName + " already exsists."); + } + return "Extension :" + extensionName + " registered succesfully."; + } + public String getResource(final String extensionName, final String resourceName) throws StoreAccessException { Map<String, String> resources = getExtensionArtifacts(extensionName); if (resources.isEmpty()) { http://git-wip-us.apache.org/repos/asf/falcon/blob/fd384a7a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java index bc04d88..98a8cb5 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java @@ -19,12 +19,28 @@ package org.apache.falcon.extensions.store; import com.google.common.collect.ImmutableMap; +import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.StoreAccessException; +import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension; +import org.apache.falcon.hadoop.JailedFileSystem; +import org.apache.falcon.service.FalconJPAService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.io.OutputStreamWriter; +import java.io.BufferedWriter; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URISyntaxException; import java.util.Map; /** @@ -32,6 +48,9 @@ import java.util.Map; */ public class ExtensionStoreTest extends AbstractTestExtensionStore { private static Map<String, String> resourcesMap; + private static JailedFileSystem fs; + protected static final String EXTENSION_PATH = "/projects/falcon/extension"; + private static final String STORAGE_URL = "jail://global:00"; @BeforeClass public void init() throws Exception { @@ -46,6 +65,8 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore { "hdfs-snapshot-mirroring-workflow.xml", extensionStorePath + "/hdfs-mirroring/resources/runtime/hdfs-snapshot-mirroring-workflow.xml" ); + fs = new JailedFileSystem(); + initFileSystem(); } @Test @@ -66,5 +87,90 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore { Assert.assertEquals(store.getExtensionLibPath(extensionName), libPath); } + private static void initFileSystem() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", STORAGE_URL); + fs.initialize(LocalFileSystem.getDefaultUri(conf), conf); + } + + @BeforeMethod + public void clean() { + clear(); + } + + + @Test(enabled = false) + public void testRegisterExtensionMetadata() throws IOException, URISyntaxException, FalconException{ + createlibs(); + createReadmeAndJar(); + createMETA(); + store = ExtensionStore.get(); + store.registerExtensionMetadata("test", STORAGE_URL + EXTENSION_PATH, "test desc"); + ExtensionMetaStore metaStore = new ExtensionMetaStore(); + Assert.assertEquals(metaStore.getAllExtensions().size(), 1); + } + + @Test(expectedExceptions=FileNotFoundException.class) + public void testFailureCaseRegisterExtensionMetadata() throws IOException, URISyntaxException, FalconException{ + store = ExtensionStore.get(); + createlibs(); + store.registerExtensionMetadata("test", STORAGE_URL + EXTENSION_PATH, "test desc"); + } + + private void createMETA() throws IOException{ + Path path = new Path(EXTENSION_PATH + "/META"); + if (fs.exists(path)){ + fs.delete(path, true); + } + fs.mkdirs(path); + path = new Path(EXTENSION_PATH + "/META/test.properties"); + OutputStream os = fs.create(path); + BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); + br.write("Hello World"); + if (fs.exists(path)){ + fs.delete(path, true); + } + br.write("test properties"); + fs.create(path); + br.close(); + } + + private void createlibs() throws IOException{ + Path path = new Path(EXTENSION_PATH); + if (fs.exists(path)){ + fs.delete(path, true); + } + fs.mkdirs(path); + path = new Path(EXTENSION_PATH + "/libs//libs/build"); + fs.mkdirs(path); + } + + private void createReadmeAndJar() throws IOException{ + Path path = new Path(EXTENSION_PATH + "/README"); + if (fs.exists(path)){ + fs.delete(path, true); + } + fs.create(path); + path = new Path(EXTENSION_PATH + "/libs/build/test.jar"); + OutputStream os = fs.create(path); + BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); + br.write("Hello World"); + br.write("test jar"); + fs.create(path); + br.close(); + } + + private void clear() { + EntityManager em = FalconJPAService.get().getEntityManager(); + em.getTransaction().begin(); + try { + Query query = em.createNativeQuery("delete from EXTENSION_METADATA"); + query.executeUpdate(); + } finally { + em.getTransaction().commit(); + em.close(); + } + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/fd384a7a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java index 27b5a9e..d9ac756 100644 --- a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java +++ b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java @@ -90,6 +90,11 @@ public class JailedFileSystem extends FileSystem { } @Override + public FSDataOutputStream create(Path f) throws IOException { + return localFS.create(toLocalPath(f)); + } + + @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { return localFS.append(toLocalPath(f), bufferSize, progress); } http://git-wip-us.apache.org/repos/asf/falcon/blob/fd384a7a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index b70b3a7..266e631 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -450,6 +450,22 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { } } + @POST + @Path("register/{extension-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces(MediaType.TEXT_PLAIN) + public String registerExtensionMetadata( + @PathParam("extension-name") String extensionName, + @QueryParam("path") String path, @QueryParam("description") String description){ + checkIfExtensionServiceIsEnabled(); + validateExtensionName(extensionName); + try { + return ExtensionStore.get().registerExtensionMetadata(extensionName, path, description); + } catch (Throwable e) { + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + @GET @Path("definition/{extension-name}") @Produces({MediaType.APPLICATION_JSON})
