Repository: falcon Updated Branches: refs/heads/master 9fdfe713f -> 8054727ce
FALCON-2197 enable and disable functionalities for extensions Author: Pracheer Agarwal <[email protected]> Author: Pracheer Agarwal <[email protected]> Reviewers: @sandeepSamudrala,@pallavi-rao Closes #328 from PracheerAgarwal/enableDisable and squashes the following commits: f141efc [Pracheer Agarwal] back merge dd6f9e9 [Pracheer Agarwal] back merge 7f17079 [Pracheer Agarwal] back merge 26b182f [Pracheer Agarwal] EC_UNRELATED_TYPES error fixes 5e60c83 [Pracheer Agarwal] checkstyle errors 3100641 [Pracheer Agarwal] bug fixes e9fabdf [Pracheer Agarwal] squashing the commits f4a08c8 [Pracheer Agarwal] # This is a combination of 3 commits. # This is the 1st commit message: FALCON-2197, status flag added for extensions for enable/disable funcnality 0793f2e [Pracheer Agarwal] FALCON-2221 back merge 62271b3 [Pracheer Agarwal] FALCON-2221 bug fixes for extension job submit ab830e4 [Pracheer Agarwal] FALCON-2197 adding disabled check for an extension while submitting jobs c47613a [Pracheer Agarwal] FALCON-2197 adding disabled check for an extension while submitting jobs 2d9fc53 [Pracheer Agarwal] FALCON-2197, extension flag value changed b39aea2 [Pracheer Agarwal] FALCON-2197, status flag added for extensions for enable/disable funcnality f885602 [Pracheer Agarwal] FALCON-2197, setting the default status flag for extensions 8a0c245 [Pracheer Agarwal] FALCON-2197, enable and disable functionalities for extensions 46042fd [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon daa3ffc [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions 622cae4 [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8054727c Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8054727c Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8054727c Branch: refs/heads/master Commit: 8054727ce160bf7e9707cee86b6ea2b77897a26e Parents: 9fdfe71 Author: Pracheer Agarwal <[email protected]> Authored: Mon Jan 2 16:54:48 2017 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Jan 2 16:54:48 2017 +0530 ---------------------------------------------------------------------- .../apache/falcon/cli/FalconExtensionCLI.java | 10 ++- .../falcon/client/AbstractFalconClient.java | 14 ++++ .../org/apache/falcon/client/FalconClient.java | 16 +++- .../falcon/extensions/ExtensionStatus.java | 37 +++++++++ .../falcon/persistence/ExtensionBean.java | 18 ++++- .../persistence/PersistenceConstants.java | 1 + .../extensions/jdbc/ExtensionMetaStore.java | 15 ++++ .../falcon/extensions/store/ExtensionStore.java | 27 +++++++ .../extensions/store/ExtensionStoreTest.java | 37 +++++++++ .../resource/AbstractExtensionManager.java | 20 ++++- .../resource/proxy/ExtensionManagerProxy.java | 82 ++++++++++++++++---- .../apache/falcon/unit/FalconUnitClient.java | 10 +++ .../falcon/unit/LocalExtensionManager.java | 9 ++- 13 files changed, 278 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index aa436da..0343aa8 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -52,6 +52,8 @@ public class FalconExtensionCLI { public static final String UNREGISTER_OPT = "unregister"; public static final String DETAIL_OPT = "detail"; public static final String REGISTER_OPT = "register"; + public static final String ENABLE_OPT = "enable"; + public static final String DISABLE_OPT = "disable"; // Input parameters public static final String EXTENSION_NAME_OPT = "extensionName"; @@ -153,10 +155,16 @@ public class FalconExtensionCLI { commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT), commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT)); result = instances != null ? instances.toString() : "No instance (" + jobName + ") found."; + } else if (optionsList.contains(ENABLE_OPT)) { + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); + result = client.enableExtension(extensionName).getMessage(); + } else if (optionsList.contains(DISABLE_OPT)) { + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); + result = client.disableExtension(extensionName).getMessage(); } else { throw new FalconCLIException("Invalid/missing extension command. Supported commands include " + "enumerate, definition, describe, list, instances, submit, submitAndSchedule, " - + "schedule, suspend, resume, delete, update, validate. " + + "schedule, suspend, resume, delete, update, validate, enable, disable. " + "Please refer to Falcon CLI twiki for more details."); } OUT.get().println(result); http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index e4ce993..3181b64 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -197,6 +197,20 @@ public abstract class AbstractFalconClient { public abstract APIResult unregisterExtension(String extensionName); /** + * + * @param extensionName extensionName that needs to be enabled + * @return Result of the enableExtension operation + */ + public abstract APIResult enableExtension(String extensionName); + + /** + * + * @param extensionName extensionName that needs to be disabled + * @return Result of the disableExtension operation + */ + public abstract APIResult disableExtension(String extensionName); + + /** * Prepares set of entities the extension has implemented and stage them to a local directory and submit them too. * @param extensionName extension which is available in the store. * @param jobName name to be used in all the extension entities' tagging that are built as part of http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 9adb142..8401c9c 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -356,7 +356,9 @@ public class FalconClient extends AbstractFalconClient { UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_XML), DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON), JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON), - REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML); + REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML), + ENABLE("api/extension/enable", HttpMethod.POST, MediaType.TEXT_XML), + DISABLE("api/extension/disable", HttpMethod.POST, MediaType.TEXT_XML); private String path; private String method; @@ -1049,6 +1051,18 @@ public class FalconClient extends AbstractFalconClient { return getResponse(APIResult.class, clientResponse); } + public APIResult enableExtension(final String extensionName) { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.ENABLE.path, extensionName).call(ExtensionOperations.ENABLE); + return getResponse(APIResult.class, clientResponse); + } + + public APIResult disableExtension(final String extensionName) { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.DISABLE.path, extensionName).call(ExtensionOperations.DISABLE); + return getResponse(APIResult.class, clientResponse); + } + public APIResult getExtensionDefinition(final String extensionName) { ClientResponse clientResponse = new ResourceBuilder() .path(ExtensionOperations.DEFINITION.path, extensionName) http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java ---------------------------------------------------------------------- diff --git a/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java b/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java new file mode 100644 index 0000000..8720096 --- /dev/null +++ b/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.extensions; + +/** + * Enum to store ExtensionStatus. + */ +public enum ExtensionStatus { + ENABLED("enabled state"), + DISABLED("disabled state"); + + private final String text; + + ExtensionStatus(final String text) { + this.text = text; + } + + @Override + public String toString(){ + return text; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java index 79cfe16..122ae5e 100644 --- a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java @@ -18,6 +18,7 @@ package org.apache.falcon.persistence; +import org.apache.falcon.extensions.ExtensionStatus; import org.apache.falcon.extensions.ExtensionType; import javax.persistence.Basic; @@ -44,7 +45,8 @@ import java.util.Date; @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSIONS, query = "select OBJECT(a) from ExtensionBean a "), @NamedQuery(name = PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE, query = "delete from ExtensionBean a where a.extensionType = :extensionType "), @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION, query = "delete from ExtensionBean a where a.extensionName = :extensionName "), - @NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName") + @NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName"), + @NamedQuery(name = PersistenceConstants.CHANGE_EXTENSION_STATUS, query = "update ExtensionBean a set a.status = :extensionStatus where a.extensionName = :extensionName") }) //RESUME CHECKSTYLE CHECK LineLengthCheck public class ExtensionBean { @@ -79,6 +81,12 @@ public class ExtensionBean { @Column(name = "extension_owner") private String extensionOwner; + @Basic + @NotNull + @Column(name = "status") + @Enumerated(EnumType.STRING) + private ExtensionStatus status; + public ExtensionType getExtensionType() { return extensionType; } @@ -127,4 +135,12 @@ public class ExtensionBean { this.extensionOwner = extensionOwner; } + public ExtensionStatus getStatus() { + return status; + } + + public void setStatus(ExtensionStatus status) { + this.status = status; + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index 26a5cd4..e80f7b7 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -75,6 +75,7 @@ public final class PersistenceConstants { public static final String DELETE_EXTENSIONS_OF_TYPE = "DELETE_EXTENSIONS_OF_TYPE"; public static final String DELETE_EXTENSION = "DELETE_EXTENSION"; public static final String GET_EXTENSION = "GET_EXTENSION"; + public static final String CHANGE_EXTENSION_STATUS = "CHANGE_EXTENSION_STATUS"; public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS"; public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB"; http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index 03f98f6..e53069a 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.extensions.jdbc; +import org.apache.falcon.extensions.ExtensionStatus; import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.persistence.ExtensionBean; import org.apache.falcon.persistence.ExtensionJobsBean; @@ -36,6 +37,7 @@ public class ExtensionMetaStore { private static final String EXTENSION_NAME = "extensionName"; private static final String JOB_NAME = "jobName"; private static final String EXTENSION_TYPE = "extensionType"; + private static final String EXTENSION_STATUS = "extensionStatus"; private EntityManager getEntityManager() { return FalconJPAService.get().getEntityManager(); @@ -50,6 +52,7 @@ public class ExtensionMetaStore { extensionBean.setCreationTime(new Date(System.currentTimeMillis())); extensionBean.setDescription(description); extensionBean.setExtensionOwner(extensionOwner); + extensionBean.setStatus(ExtensionStatus.ENABLED); EntityManager entityManager = getEntityManager(); try { beginTransaction(entityManager); @@ -229,4 +232,16 @@ public class ExtensionMetaStore { entityManager.close(); } } + + public void updateExtensionStatus(String extensionName, ExtensionStatus status) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.CHANGE_EXTENSION_STATUS); + q.setParameter(EXTENSION_NAME, extensionName).setParameter(EXTENSION_STATUS, status); + try { + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index 13ff2d1..32b0cfd 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -23,6 +23,7 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.store.StoreAccessException; import org.apache.falcon.extensions.AbstractExtension; +import org.apache.falcon.extensions.ExtensionStatus; import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.hadoop.HadoopClientFactory; @@ -378,4 +379,30 @@ public final class ExtensionStore { return (storePath != null); } + public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws + FalconException { + validateStatusChange(extensionName, currentUser); + if (metaStore.getDetail(extensionName).getStatus().equals(status)) { + throw new ValidationException(extensionName + " is already in " + status.toString() + " state."); + } else { + metaStore.updateExtensionStatus(extensionName, status); + return "Status of extension: " + extensionName + "changed to " + status.toString() + " state."; + } + } + + private void validateStatusChange(final String extensionName, String currentUser) throws FalconException { + + ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extensionName) ? ExtensionType.TRUSTED + : ExtensionType.CUSTOM; + if (extensionType.equals(ExtensionType.TRUSTED)) { + throw new ValidationException(extensionName + " is trusted. Status can't be changed for trusted " + + "extensions."); + } else if (!metaStore.checkIfExtensionExists(extensionName)) { + throw new FalconException("Extension:" + extensionName + " is not registered with Falcon."); + } else if (!metaStore.getDetail(extensionName).getExtensionOwner().equals(currentUser)) { + throw new FalconException("User: " + currentUser + " is not allowed to change status of extension: " + + extensionName); + } + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java index 1b33e1b..b2fac5f 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.falcon.FalconException; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.store.StoreAccessException; +import org.apache.falcon.extensions.ExtensionStatus; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension; import org.apache.falcon.hadoop.JailedFileSystem; @@ -157,6 +158,42 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore { store.deleteExtension("ACLFailure", "falconUser"); } + @Test(expectedExceptions = FalconException.class) + public void testStatusChangeExtensionACLFailure() throws IOException, URISyntaxException, FalconException { + String extensionPath = EXTENSION_PATH + "testStatusChangeACLFailure"; + createLibs(extensionPath); + createReadmeAndJar(extensionPath); + createMETA(extensionPath); + store = ExtensionStore.get(); + store.registerExtension("testStatusChangeACLFailure", STORAGE_URL + extensionPath, "test desc", "falconUser"); + store.updateExtensionStatus("testStatusChangeACLFailure", "oozieUser", ExtensionStatus.DISABLED); + } + + @Test(expectedExceptions = ValidationException.class) + public void testStatusChangeExtensionValidationFailure() throws IOException, URISyntaxException, FalconException { + String extensionPath = EXTENSION_PATH + "testStatusChangeValidationFailure"; + createLibs(extensionPath); + createReadmeAndJar(extensionPath); + createMETA(extensionPath); + store = ExtensionStore.get(); + store.registerExtension("testStatusChangeValidationFailure", STORAGE_URL + extensionPath, "test desc", + "falconUser"); + store.updateExtensionStatus("testStatusChangeValidationFailure", "falconUser", ExtensionStatus.ENABLED); + } + + @Test() + public void testStatusChangeExtension() throws IOException, URISyntaxException, FalconException { + String extensionPath = EXTENSION_PATH + "testStatusChange"; + createLibs(extensionPath); + createReadmeAndJar(extensionPath); + createMETA(extensionPath); + store = ExtensionStore.get(); + store.registerExtension("testStatusChange", STORAGE_URL + extensionPath, "test desc", "falconUser"); + store.updateExtensionStatus("testStatusChange", "falconUser", ExtensionStatus.DISABLED); + ExtensionMetaStore metaStore = new ExtensionMetaStore(); + Assert.assertEquals(metaStore.getDetail("testStatusChange").getStatus(), ExtensionStatus.DISABLED); + } + private void createMETA(String extensionPath) throws IOException { Path path = new Path(extensionPath + "/META"); if (fs.exists(path)) { http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java index f1ed6f5..63bf1b6 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java @@ -23,6 +23,7 @@ import org.apache.falcon.FalconWebException; import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.parser.ValidationException; +import org.apache.falcon.extensions.ExtensionStatus; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; @@ -35,7 +36,6 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import javax.ws.rs.core.Response; import java.io.IOException; import java.util.ArrayList; @@ -174,6 +174,24 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { return tags.substring(nameStart, nameEnd); } + public String disableExtension(String extensionName, String currentUser) { + validateExtensionName(extensionName); + try { + return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.DISABLED); + } catch (Throwable e) { + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + public String enableExtension(String extensionName, String currentUser) { + validateExtensionName(extensionName); + try { + return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.ENABLED); + } catch (Throwable e) { + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + private JSONObject buildExtensionDetailResult(final String extensionName) throws FalconException { ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index 551dbbf..2b5cbe7 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -30,9 +30,10 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.extensions.Extension; -import org.apache.falcon.extensions.ExtensionProperties; +import org.apache.falcon.extensions.ExtensionStatus; import org.apache.falcon.extensions.ExtensionService; import org.apache.falcon.extensions.ExtensionType; +import org.apache.falcon.extensions.ExtensionProperties; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.persistence.ExtensionBean; @@ -192,6 +193,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { public APIResult schedule(@PathParam("job-name") String jobName, @DefaultValue("") @QueryParam("doAs") String doAsUser) { checkIfExtensionServiceIsEnabled(); + checkIfExtensionIsEnabled(ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName()); try { List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); if (entities.isEmpty()) { @@ -307,8 +309,9 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @FormDataParam("feeds") List<FormDataBodyPart> feedForms, @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); + checkIfExtensionIsEnabled(extensionName); + checkIfExtensionJobExists(jobName, extensionName); SortedMap<EntityType, List<Entity>> entityMap; - try { entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); submitEntities(extensionName, jobName, entityMap, config, request); @@ -376,6 +379,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @FormDataParam("feeds") List<FormDataBodyPart> feedForms, @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); + checkIfExtensionIsEnabled(extensionName); + checkIfExtensionJobExists(jobName, extensionName); SortedMap<EntityType, List<Entity>> entityMap; try { entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); @@ -445,14 +450,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { List<String> feedNames = new ArrayList<>(); List<String> processNames = new ArrayList<>(); + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + byte[] configBytes = null; + if (configStream != null) { + configBytes = IOUtils.toByteArray(configStream); + } for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { for (final Entity entity : entry.getValue()) { - final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request); - final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity); - entityProxyUtil.proxySubmit(entity.getEntityType().toString(), bufferedRequest, entity, colos); - if (!embeddedMode) { - super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo); - } if (entity.getEntityType().equals(EntityType.FEED)) { feedNames.add(entity.getName()); } else { @@ -460,13 +464,18 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { } } } + metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); - ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); - byte[] configBytes = null; - if (configStream != null) { - configBytes = IOUtils.toByteArray(configStream); + for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ + for(final Entity entity : entry.getValue()){ + final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request); + final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity); + entityProxyUtil.proxySubmit(entity.getEntityType().toString(), bufferedRequest, entity, colos); + if (!embeddedMode) { + super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo); + } + } } - metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); } private void updateEntities(String extensionName, String jobName, @@ -700,6 +709,36 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { } } + @POST + @Path("disable/{extension-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces(MediaType.TEXT_PLAIN) + public APIResult disableExtension( + @PathParam("extension-name") String extensionName) { + checkIfExtensionServiceIsEnabled(); + try { + return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, + CurrentUser.getUser())); + } catch (Throwable e) { + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + @POST + @Path("enable/{extension-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces(MediaType.TEXT_PLAIN) + public APIResult enableExtension( + @PathParam("extension-name") String extensionName) { + checkIfExtensionServiceIsEnabled(); + try { + return new APIResult(APIResult.Status.SUCCEEDED, super.enableExtension(extensionName, + CurrentUser.getUser())); + } catch (Throwable e) { + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + private List<Entity> generateEntities(String extensionName, InputStream configStream) throws FalconException, IOException { // get entities for extension job @@ -731,4 +770,21 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { ExtensionService.SERVICE_NAME + " is not enabled.", Response.Status.NOT_FOUND); } } + + private static void checkIfExtensionIsEnabled(String extensionName) { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + if (metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) { + throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.", + Response.Status.INTERNAL_SERVER_ERROR); + } + } + + private static void checkIfExtensionJobExists(String jobName, String extensionName) { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName); + if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) { + throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.", + Response.Status.INTERNAL_SERVER_ERROR); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 9ed2a0d..2a40611 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -282,6 +282,16 @@ public class FalconUnitClient extends AbstractFalconClient { } @Override + public APIResult enableExtension(String extensionName) { + return localExtensionManager.enableExtension(extensionName); + } + + @Override + public APIResult disableExtension(String extensionName) { + return localExtensionManager.disableExtension(extensionName); + } + + @Override public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) { InputStream configStream = getServletInputStream(configPath); http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 0412ef2..1e9b15a 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -140,6 +140,14 @@ public class LocalExtensionManager extends AbstractExtensionManager { return super.getExtensionJobDetail(jobName); } + public APIResult disableExtension(String extensionName) { + return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser())); + } + + public APIResult enableExtension(String extensionName) { + return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser())); + } + public APIResult getExtensionDetails(String extensionName){ return super.getExtensionDetail(extensionName); } @@ -147,5 +155,4 @@ public class LocalExtensionManager extends AbstractExtensionManager { public APIResult getExtensions(){ return super.getExtensions(); } - }
