Repository: falcon Updated Branches: refs/heads/master bc4dcf9cf -> 4f42dc117
FALCON-2200 Update API support for extension job (user extension) Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #331 from sandeepSamudrala/FALCON-2200 and squashes the following commits: 737fad3 [sandeep] FALCON-2200 fixed checkstyle issues. removed unused imports 1780416 [sandeep] Incorporated review comments. Removed entitychannel and config channel from ExtensionManager Proxy as they are now used from proxyUtil 8a4d035 [sandeep] FALCON-2200 Incorporated review comments. Moved common code from proxies to proxyutil and making 2 api calls to get location in case of update extension c8d0ab7 [sandeep] FALCON-2200 Adding changes related to clusters being removed and clusters being added into entity definition cc7c9e9 [sandeep] FALCON-2200 Update API support for extension job (user extension) 456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon 194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4f42dc11 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4f42dc11 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4f42dc11 Branch: refs/heads/master Commit: 4f42dc117026a419d5f882c598ea2ff122f5a5c0 Parents: bc4dcf9 Author: sandeep <[email protected]> Authored: Fri Dec 30 19:16:53 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Dec 30 19:16:53 2016 +0530 ---------------------------------------------------------------------- .../apache/falcon/cli/FalconExtensionCLI.java | 5 +- .../org/apache/falcon/ExtensionHandler.java | 11 ++ .../falcon/client/AbstractFalconClient.java | 10 ++ .../org/apache/falcon/client/FalconClient.java | 31 +++- .../falcon/tools/FalconStateStoreDBCLI.java | 6 +- .../extensions/jdbc/ExtensionMetaStore.java | 17 +++ .../org/apache/falcon/ExtensionExample.java | 20 ++- .../src/test/resources/extension-example.xml | 1 - .../apache/falcon/jdbc/BacklogMetricStore.java | 2 +- .../falcon/resource/AbstractEntityManager.java | 6 +- .../resource/AbstractExtensionManager.java | 19 ++- .../falcon/resource/proxy/EntityProxyUtil.java | 150 +++++++++++++++++++ .../resource/proxy/ExtensionManagerProxy.java | 129 +++++++++------- .../proxy/SchedulableEntityManagerProxy.java | 149 ++++-------------- .../service/BacklogMetricEmitterService.java | 12 +- .../apache/falcon/unit/FalconUnitClient.java | 15 +- .../falcon/unit/LocalExtensionManager.java | 28 +++- .../apache/falcon/unit/FalconUnitTestBase.java | 5 + .../org/apache/falcon/unit/TestFalconUnit.java | 17 ++- unit/src/test/resources/extension.properties | 2 +- 20 files changed, 421 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 59538bc..aa436da 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -110,12 +110,13 @@ public class FalconExtensionCLI { result = client.registerExtension(extensionName, path, description).getMessage(); } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); + validateRequiredParameter(jobName, JOB_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) { - validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); + validateRequiredParameter(jobName, JOB_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); - result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage(); + result = client.updateExtensionJob(jobName, filePath, doAsUser).getMessage(); } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/client/src/main/java/org/apache/falcon/ExtensionHandler.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java index 11122e2..5e2f26d 100644 --- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java +++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java @@ -54,6 +54,7 @@ public final class ExtensionHandler { private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir")); private static final String LOCATION = "location"; private static final String TYPE = "type"; + private static final String NAME = "name"; private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE = "META-INF/services/org.apache.falcon.extensions.ExtensionBuilder"; @@ -220,4 +221,14 @@ public final class ExtensionHandler { } return extensionType; } + + public static String getExtensionName(String jobName, JSONObject extensionJobDetailJson) { + String extensionType; + try { + extensionType = extensionJobDetailJson.get(NAME).toString(); + } catch (JSONException e) { + throw new FalconCLIException("Failed to get extension name for the given extension job:" + jobName, e); + } + return extensionType; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 879d794..8cdbf30 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -222,6 +222,16 @@ public abstract class AbstractFalconClient { String doAsUser); /** + * Prepares set of entities the extension has implemented and stage them to a local directory and updates them. + * @param jobName name to be used in all the extension entities' tagging that are built as part of + * loadAndPrepare. + * @param configPath path to extension parameters. + * @return + * @throws FalconCLIException + */ + public abstract APIResult updateExtensionJob(String jobName, String configPath, String doAsUser); + + /** * Prepares set of entities the extension has implemented to validate the extension job. * @param jobName job name of the extension job. * @return http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index e03e82d..9adb142 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -1028,9 +1028,12 @@ public class FalconClient extends AbstractFalconClient { } public APIResult getExtensionJobDetails(final String jobName) { - ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName) + return getResponse(APIResult.class, getExtensionJobDetailsResponse(jobName)); + } + + private ClientResponse getExtensionJobDetailsResponse(final String jobName) { + return new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName) .call(ExtensionOperations.JOB_DETAILS); - return getResponse(APIResult.class, clientResponse); } private ClientResponse getExtensionDetailResponse(final String extensionName) { @@ -1097,7 +1100,11 @@ public class FalconClient extends AbstractFalconClient { private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName, InputStream configStream) { - JSONObject extensionDetailJson = getExtensionDetailJson(extensionName); + JSONObject extensionDetailJson; + if (StringUtils.isBlank(extensionName)) { + extensionName = ExtensionHandler.getExtensionName(jobName, getExtensionJobDetailJson(jobName)); + } + extensionDetailJson = getExtensionDetailJson(extensionName); String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson); String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName, extensionDetailJson); return getEntities(extensionName, jobName, configStream, extensionType, @@ -1115,6 +1122,16 @@ public class FalconClient extends AbstractFalconClient { } return extensionDetailJson; } + private JSONObject getExtensionJobDetailJson(String jobName) { + ClientResponse clientResponse = getExtensionJobDetailsResponse(jobName); + JSONObject extensionJobDetailJson; + try { + extensionJobDetailJson = new JSONObject(getResponse(APIResult.class, clientResponse).getMessage()); + } catch (JSONException e) { + throw new FalconCLIException("Failed to get details for the given extension", e); + } + return extensionJobDetailJson; + } private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream, String extensionType, String extensionBuildLocation) { @@ -1144,12 +1161,12 @@ public class FalconClient extends AbstractFalconClient { return getResponse(APIResult.class, clientResponse); } - public APIResult updateExtensionJob(final String extensionName, final String filePath, final String doAsUser) { - InputStream entityStream = getServletInputStream(filePath); + public APIResult updateExtensionJob(final String jobName, final String configPath, final String doAsUser) { + FormDataMultiPart entitiesForm = getEntitiesForm(null, jobName, configPath); ClientResponse clientResponse = new ResourceBuilder() - .path(ExtensionOperations.UPDATE.path, extensionName) + .path(ExtensionOperations.UPDATE.path, jobName) .addQueryParam(DO_AS_OPT, doAsUser) - .call(ExtensionOperations.UPDATE, entityStream); + .call(ExtensionOperations.UPDATE, entitiesForm); return getResponse(APIResult.class, clientResponse); } http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java index 6ad887e..cb8c816 100644 --- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java +++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java @@ -272,12 +272,12 @@ public class FalconStateStoreDBCLI { "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))"; private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception { - String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')"; + String insertDbVersion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')"; PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true)); writer.println(); writer.println(CREATE_FALCON_DB_PROPS); - writer.println(insertDbVerion); + writer.println(insertDbVersion); writer.close(); System.out.println("Create FALCON_DB_PROPS table"); if (run) { @@ -287,7 +287,7 @@ public class FalconStateStoreDBCLI { conn.setAutoCommit(true); st = conn.createStatement(); st.executeUpdate(CREATE_FALCON_DB_PROPS); - st.executeUpdate(insertDbVerion); + st.executeUpdate(insertDbVersion); st.close(); } catch (Exception ex) { closeStatement(st); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index 277cb95..03f98f6 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -172,6 +172,23 @@ public class ExtensionMetaStore { } } + public void updateExtensionJob(String jobName, String extensionName, List<String> feedNames, + List<String> processNames, byte[] configBytes) { + EntityManager entityManager = getEntityManager(); + ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean(); + extensionJobsBean.setJobName(jobName); + extensionJobsBean.setExtensionName(extensionName); + extensionJobsBean.setFeeds(feedNames); + extensionJobsBean.setProcesses(processNames); + extensionJobsBean.setConfig(configBytes); + try { + beginTransaction(entityManager); + entityManager.merge(extensionJobsBean); + } finally { + commitAndCloseTransaction(entityManager); + } + } + public ExtensionJobsBean getExtensionJobDetails(String jobName) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/extensions/src/test/java/org/apache/falcon/ExtensionExample.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/ExtensionExample.java b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java index 432e37b..d3de2e6 100644 --- a/extensions/src/test/java/org/apache/falcon/ExtensionExample.java +++ b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java @@ -21,29 +21,44 @@ package org.apache.falcon; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.Schema; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.extensions.ExtensionBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.xml.bind.JAXBException; +import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.Properties; /** * Extension Example for testing extension loading and preparing entities. */ public class ExtensionExample implements ExtensionBuilder{ + public static final Logger LOG = LoggerFactory.getLogger(ExtensionExample.class); public static final String PROCESS_XML = "/extension-example.xml"; @Override public List<Entity> getEntities(String extensionName, InputStream extensionConfigStream) throws FalconException { - Entity process; + Process process; try { - process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal( + process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal( getClass().getResourceAsStream(PROCESS_XML)); } catch (JAXBException e) { throw new FalconException("Failed in un-marshalling the entity"); } + if (extensionConfigStream != null) { + Properties properties = new Properties(); + try { + properties.load(extensionConfigStream); + } catch (IOException e) { + LOG.warn("Not able to load the configStream"); + } + process.setPipelines(properties.getProperty("pipelines.name")); + } List<Entity> entities = new ArrayList<>(); entities.add(process); return entities; @@ -52,7 +67,6 @@ public class ExtensionExample implements ExtensionBuilder{ @Override public void validateExtensionConfig(String extensionName, InputStream extensionConfigStream) throws FalconException { - } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/extensions/src/test/resources/extension-example.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/extension-example.xml b/extensions/src/test/resources/extension-example.xml index 4a2a982..bb391e4 100644 --- a/extensions/src/test/resources/extension-example.xml +++ b/extensions/src/test/resources/extension-example.xml @@ -27,7 +27,6 @@ <parallel>1</parallel> <order>LIFO</order> <frequency>hours(1)</frequency> - <sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/> <!-- how --> <properties> <property name="name1" value="value1"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java index 621974d..8bb8bbb 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java @@ -82,7 +82,7 @@ public class BacklogMetricStore { beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES); q.setParameter("entityName", entityName); - try{ + try { q.executeUpdate(); } finally { commitAndCloseTransaction(entityManager); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index aefd699..81b0448 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -116,7 +116,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { } } - protected Set<String> getAllColos() { + public static Set<String> getAllColos() { if (DeploymentUtil.isEmbeddedMode()) { return DeploymentUtil.getDefaultColos(); } @@ -141,7 +141,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { return colos; } - protected Set<String> getApplicableColos(String type, String name) { + public static Set<String> getApplicableColos(String type, String name) { try { if (DeploymentUtil.isEmbeddedMode()) { return DeploymentUtil.getDefaultColos(); @@ -157,7 +157,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { } } - protected Set<String> getApplicableColos(String type, Entity entity) { + public static Set<String> getApplicableColos(String type, Entity entity) { try { if (DeploymentUtil.isEmbeddedMode()) { return DeploymentUtil.getDefaultColos(); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java index d360370..9fb0dd4 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java @@ -42,7 +42,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { public static final Logger LOG = LoggerFactory.getLogger(AbstractExtensionManager.class); private static final String JOB_NAME = "jobName"; - public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job="; + protected static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job="; private static final String EXTENSION_NAME = "extensionName"; private static final String FEEDS = "feeds"; private static final String PROCESSES = "processes"; @@ -50,19 +50,19 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { private static final String CREATION_TIME = "creationTime"; private static final String LAST_UPDATE_TIME = "lastUpdatedTime"; - private static final String NAME = "name"; - private static final String EXTENSION_TYPE = "type"; - private static final String EXTENSION_DESC = "description"; - private static final String EXTENSION_LOCATION = "location"; + public static final String NAME = "name"; + protected static final String EXTENSION_TYPE = "type"; + protected static final String EXTENSION_DESC = "description"; + protected static final String EXTENSION_LOCATION = "location"; - public static void validateExtensionName(final String extensionName) { + protected static void validateExtensionName(final String extensionName) { if (StringUtils.isBlank(extensionName)) { throw FalconWebException.newAPIException("Extension name is mandatory and shouldn't be blank", Response.Status.BAD_REQUEST); } } - public APIResult registerExtensionMetadata(String extensionName, String path, String description, String owner) { + protected APIResult registerExtensionMetadata(String extensionName, String path, String description, String owner) { validateExtensionName(extensionName); try { return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().registerExtension(extensionName, path, @@ -80,7 +80,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { } } - public APIResult getExtensionDetail(String extensionName) { + protected APIResult getExtensionDetail(String extensionName) { try { return new APIResult(APIResult.Status.SUCCEEDED, buildExtensionDetailResult(extensionName).toString()); } catch (FalconException e) { @@ -112,6 +112,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { if (jobsBean == null) { throw new ValidationException("Job name not found:" + jobName); } + ExtensionBean extensionBean = metaStore.getDetail(jobsBean.getExtensionName()); JSONObject detailsObject = new JSONObject(); try { detailsObject.put(JOB_NAME, jobsBean.getJobName()); @@ -121,6 +122,8 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { detailsObject.put(CONFIG, jobsBean.getConfig()); detailsObject.put(CREATION_TIME, jobsBean.getCreationTime()); detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime()); + detailsObject.put(EXTENSION_LOCATION, extensionBean.getLocation()); + detailsObject.put(EXTENSION_TYPE, extensionBean.getExtensionType()); } catch (JSONException e) { LOG.error("Exception while building extension jon details for job {}", jobName, e); } http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java new file mode 100644 index 0000000..a07a6d4 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource.proxy; + +import org.apache.falcon.FalconException; +import org.apache.falcon.FalconRuntimException; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.channel.Channel; +import org.apache.falcon.resource.channel.ChannelFactory; +import org.apache.falcon.util.DeploymentUtil; + +import javax.servlet.http.HttpServletRequest; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.falcon.resource.AbstractEntityManager.getAllColos; +import static org.apache.falcon.resource.AbstractEntityManager.getApplicableColos; +import static org.apache.falcon.resource.proxy.SchedulableEntityManagerProxy.FALCON_TAG; + +class EntityProxyUtil { + private final Map<String, Channel> entityManagerChannels = new HashMap<>(); + private final Map<String, Channel> configSyncChannels = new HashMap<>(); + + EntityProxyUtil() { + try { + Set<String> colos = getAllColos(); + + for (String colo : colos) { + initializeFor(colo); + } + + DeploymentUtil.setPrismMode(); + } catch (FalconException e) { + throw new FalconRuntimException("Unable to initialize channels", e); + } + } + private void initializeFor(String colo) throws FalconException { + entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo)); + configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo)); + } + + Channel getConfigSyncChannel(String colo) throws FalconException { + if (!configSyncChannels.containsKey(colo)) { + initializeFor(colo); + } + return configSyncChannels.get(colo); + } + + Channel getEntityManager(String colo) throws FalconException { + if (!entityManagerChannels.containsKey(colo)) { + initializeFor(colo); + } + return entityManagerChannels.get(colo); + } + + Map<String, APIResult> proxySubmit(final String type, final HttpServletRequest bufferedRequest, + final Entity entity, final Set<String> colos) { + Map<String, APIResult> results = new HashMap<>(); + results.put(FALCON_TAG, new EntityProxy(type, entity.getName()) { + @Override + protected Set<String> getColosToApply() { + return colos; + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo); + } + }.execute()); + return results; + } + + Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun, + final HttpServletRequest bufferedRequest, Entity newEntity) { + final Set<String> oldColos = getApplicableColos(type, entityName); + final Set<String> newColos = getApplicableColos(type, newEntity); + final Set<String> mergedColos = new HashSet<>(); + mergedColos.addAll(oldColos); + mergedColos.retainAll(newColos); //Common colos where update should be called + newColos.removeAll(oldColos); //New colos where submit should be called + oldColos.removeAll(mergedColos); //Old colos where delete should be called + + Map<String, APIResult> results = new HashMap<>(); + if (!oldColos.isEmpty()) { + results.put(FALCON_TAG + "/delete", new EntityProxy(type, entityName) { + @Override + protected Set<String> getColosToApply() { + return oldColos; + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, + type, entityName, colo); + } + }.execute()); + } + + if (!mergedColos.isEmpty()) { + results.put(FALCON_TAG + "/update", new EntityProxy(type, entityName) { + @Override + protected Set<String> getColosToApply() { + return mergedColos; + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getConfigSyncChannel(colo).invoke("update", bufferedRequest, + type, entityName, + colo, skipDryRun); + } + }.execute()); + } + + if (!newColos.isEmpty()) { + results.put(FALCON_TAG + "/submit", new EntityProxy(type, entityName) { + @Override + protected Set<String> getColosToApply() { + return newColos; + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, + colo); + } + }.execute()); + } + return results; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index 343ef6c..0e79f12 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -36,14 +36,13 @@ import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.persistence.ExtensionBean; +import org.apache.falcon.persistence.ExtensionJobsBean; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractExtensionManager; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.ExtensionInstanceList; import org.apache.falcon.resource.ExtensionJobList; -import org.apache.falcon.resource.channel.Channel; -import org.apache.falcon.resource.channel.ChannelFactory; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.Services; import org.apache.falcon.util.DeploymentUtil; @@ -95,8 +94,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { private boolean embeddedMode = DeploymentUtil.isEmbeddedMode(); private String currentColo = DeploymentUtil.getCurrentColo(); - private final Map<String, Channel> configSyncChannels = new HashMap<String, Channel>(); - private final Map<String, Channel> entityManagerChannels = new HashMap<String, Channel>(); + private EntityProxyUtil entityProxyUtil = new EntityProxyUtil(); private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json"; @@ -126,7 +124,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { // sort by extension job name List<String> jobNames = new ArrayList<>(groupedEntities.keySet()); switch (sortOrder.toLowerCase()) { - case DESCENDING_SORT_ORDER : + case DESCENDING_SORT_ORDER: Collections.sort(jobNames, Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER)); break; default: @@ -325,9 +323,9 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { } private SortedMap<EntityType, List<Entity>> getEntityList(String extensionName, String jobName, - List<FormDataBodyPart> feedForms, - List<FormDataBodyPart> processForms, InputStream config) - throws FalconException, IOException{ + List<FormDataBodyPart> feedForms, + List<FormDataBodyPart> processForms, InputStream config) + throws FalconException, IOException { List<Entity> processes = getProcesses(processForms); List<Entity> feeds = getFeeds(feedForms); ExtensionType extensionType = getExtensionType(extensionName); @@ -362,11 +360,10 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { return extensionDetails.getExtensionType(); } - private Channel getEntityManager(String colo) throws FalconException { - if (!entityManagerChannels.containsKey(colo)) { - initializeFor(colo); - } - return entityManagerChannels.get(colo); + private String getExtensionName(String jobName) { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean extensionJobDetails = metaStore.getExtensionJobDetails(jobName); + return extensionJobDetails.getExtensionName(); } @POST @@ -396,9 +393,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request) throws FalconException, JAXBException, IOException { - - for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ - for(final Entity entity : entry.getValue()){ + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (final Entity entity : entry.getValue()) { final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request); final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest); final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity); @@ -411,8 +407,9 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("schedule", bufferedRequest, entity.getEntityType().toString(), - entity.getName(), colo, Boolean.FALSE, ""); + return new EntityProxyUtil().getEntityManager(colo).invoke("schedule", bufferedRequest, + entity.getEntityType().toString(), + entity.getName(), colo, Boolean.FALSE, ""); } }.execute(); } @@ -426,7 +423,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { return new BufferedRequest(request); } - protected void submitEntities(String extensionName, String jobName, + private void submitEntities(String extensionName, String jobName, SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream, HttpServletRequest request) throws FalconException, IOException, JAXBException { List<Entity> feeds = entityMap.get(EntityType.FEED); @@ -436,28 +433,17 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { List<String> feedNames = new ArrayList<>(); List<String> processNames = new ArrayList<>(); - for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ - for(final Entity entity : entry.getValue()){ + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (final Entity entity : entry.getValue()) { final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request); final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity); - new EntityProxy(entity.getEntityType().toString(), entity.getName()) { - @Override - protected Set<String> getColosToApply() { - return colos; - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, - entity.getEntityType().toString(), colo); - } - }.execute(); + entityProxyUtil.proxySubmit(entity.getEntityType().toString(), bufferedRequest, entity, colos); if (!embeddedMode) { super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo); } - if (entity.getEntityType().equals(EntityType.FEED)){ + if (entity.getEntityType().equals(EntityType.FEED)) { feedNames.add(entity.getName()); - }else{ + } else { processNames.add(entity.getName()); } } @@ -471,24 +457,49 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); } - private void initializeFor(String colo) throws FalconException { - entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo)); - configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo)); - } + private void updateEntities(String extensionName, String jobName, + SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream, + HttpServletRequest request) throws FalconException, IOException, JAXBException { + List<Entity> feeds = entityMap.get(EntityType.FEED); + List<Entity> processes = entityMap.get(EntityType.PROCESS); + validateFeeds(feeds); + validateProcesses(processes); + List<String> feedNames = new ArrayList<>(); + List<String> processNames = new ArrayList<>(); - private Channel getConfigSyncChannel(String colo) throws FalconException { - if (!configSyncChannels.containsKey(colo)) { - initializeFor(colo); + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (final Entity entity : entry.getValue()) { + final String entityType = entity.getEntityType().toString(); + final String entityName = entity.getName(); + final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request); + entityProxyUtil.proxyUpdate(entityType, entityName, Boolean.FALSE, bufferedRequest, entity); + if (!embeddedMode) { + super.update(bufferedRequest, entity.getEntityType().toString(), entity.getName(), currentColo, + Boolean.FALSE); + } + if (entity.getEntityType().equals(EntityType.FEED)) { + feedNames.add(entity.getName()); + } else { + processNames.add(entity.getName()); + } + } } - return configSyncChannels.get(colo); + + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + byte[] configBytes = null; + if (configStream != null) { + configBytes = IOUtils.toByteArray(configStream); + } + metaStore.updateExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); } - private HttpServletRequest getEntityStream(Entity entity, EntityType type, HttpServletRequest request) throws IOException, JAXBException { + private HttpServletRequest getEntityStream(Entity entity, EntityType type, HttpServletRequest request) + throws IOException, JAXBException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); type.getMarshaller().marshal(entity, baos); final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(baos.toByteArray()); - ServletInputStream servletInputStream=new ServletInputStream(){ + ServletInputStream servletInputStream = new ServletInputStream() { public int read() throws IOException { return byteArrayInputStream.read(); } @@ -506,7 +517,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { private void validateProcesses(List<Entity> processes) throws FalconException { ProcessEntityParser processEntityParser = new ProcessEntityParser(); for (Entity process : processes) { - processEntityParser.validate((Process)process, false); + processEntityParser.validate((Process) process, false); } } @@ -531,21 +542,25 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { } @POST - @Path("update/{extension-name}") - @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Path("update/{job-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult update( - @PathParam("extension-name") String extensionName, + @PathParam("job-name") String jobName, @Context HttpServletRequest request, - @DefaultValue("") @QueryParam("doAs") String doAsUser) { + @DefaultValue("") @QueryParam("doAs") String doAsUser, + @FormDataParam("processes") List<FormDataBodyPart> processForms, + @FormDataParam("feeds") List<FormDataBodyPart> feedForms, + @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); + + SortedMap<EntityType, List<Entity>> entityMap; + String extensionName = getExtensionName(jobName); try { - List<Entity> entities = generateEntities(extensionName, request.getInputStream()); - for (Entity entity : entities) { - super.update(entity, entity.getEntityType().name(), entity.getName(), null); - } - } catch (FalconException | IOException e) { - LOG.error("Error when updating extension job: ", e); + entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); + updateEntities(extensionName, jobName, entityMap, config, request); + } catch (FalconException | IOException | JAXBException e) { + LOG.error("Error while updating extension job: " + jobName, e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully"); @@ -634,7 +649,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) @Produces({MediaType.TEXT_PLAIN, MediaType.TEXT_XML}) public APIResult deleteExtensionMetadata( - @PathParam("extension-name") String extensionName){ + @PathParam("extension-name") String extensionName) { checkIfExtensionServiceIsEnabled(); try { return super.deleteExtensionMetadata(extensionName); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index ed1054c..74a1acc 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -20,7 +20,6 @@ package org.apache.falcon.resource.proxy; import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; -import org.apache.falcon.FalconRuntimException; import org.apache.falcon.FalconWebException; import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; @@ -32,14 +31,12 @@ import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.monitors.Dimension; import org.apache.falcon.monitors.Monitored; import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.AbstractExtensionManager; import org.apache.falcon.resource.AbstractSchedulableEntityManager; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.SchedulableEntityInstanceResult; -import org.apache.falcon.resource.channel.Channel; -import org.apache.falcon.resource.channel.ChannelFactory; -import org.apache.falcon.resource.AbstractExtensionManager; import org.apache.falcon.util.DeploymentUtil; import javax.servlet.http.HttpServletRequest; @@ -64,47 +61,13 @@ import java.util.Set; */ @Path("entities") public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityManager { - private static final String PRISM_TAG = "prism"; - public static final String FALCON_TAG = "falcon"; + static final String PRISM_TAG = "prism"; + static final String FALCON_TAG = "falcon"; - private final Map<String, Channel> entityManagerChannels = new HashMap<String, Channel>(); - private final Map<String, Channel> configSyncChannels = new HashMap<String, Channel>(); + private EntityProxyUtil entityProxyUtil = new EntityProxyUtil(); private boolean embeddedMode = DeploymentUtil.isEmbeddedMode(); private String currentColo = DeploymentUtil.getCurrentColo(); - public SchedulableEntityManagerProxy() { - try { - Set<String> colos = getAllColos(); - - for (String colo : colos) { - initializeFor(colo); - } - - DeploymentUtil.setPrismMode(); - } catch (FalconException e) { - throw new FalconRuntimException("Unable to initialize channels", e); - } - } - - private void initializeFor(String colo) throws FalconException { - entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo)); - configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo)); - } - - private Channel getConfigSyncChannel(String colo) throws FalconException { - if (!configSyncChannels.containsKey(colo)) { - initializeFor(colo); - } - return configSyncChannels.get(colo); - } - - private Channel getEntityManager(String colo) throws FalconException { - if (!entityManagerChannels.containsKey(colo)) { - initializeFor(colo); - } - return entityManagerChannels.get(colo); - } - private BufferedRequest getBufferedRequest(HttpServletRequest request) { if (request instanceof BufferedRequest) { return (BufferedRequest) request; @@ -136,8 +99,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected SchedulableEntityInstanceResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("getEntitySLAMissPendingAlerts", entityType, entityName, - start, end, colo); + return entityProxyUtil.getEntityManager(colo).invoke("getEntitySLAMissPendingAlerts", entityType, + entityName, start, end, colo); } }.execute(); } @@ -162,24 +125,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana final HttpServletRequest bufferedRequest = getBufferedRequest(request); final Entity entity = getEntity(bufferedRequest, type); - Map<String, APIResult> results = new HashMap<String, APIResult>(); + Map<String, APIResult> results = new HashMap<>(); final Set<String> colos = getApplicableColos(type, entity); entityHasExtensionJobTag(entity); validateEntity(entity, colos); - results.put(FALCON_TAG, new EntityProxy(type, entity.getName()) { - @Override - protected Set<String> getColosToApply() { - return colos; - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo); - } - }.execute()); - + results.putAll(entityProxyUtil.proxySubmit(type, bufferedRequest, entity, colos)); if (!embeddedMode) { results.put(PRISM_TAG, super.submit(bufferedRequest, type, currentColo)); } @@ -240,7 +192,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("validate", bufferedRequest, type, skipDryRun); + return entityProxyUtil.getEntityManager(colo).invoke("validate", bufferedRequest, type, + skipDryRun); } }.execute(); } @@ -287,7 +240,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo); + return entityProxyUtil.getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, + colo); } }.execute()); @@ -326,58 +280,10 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana final HttpServletRequest bufferedRequest = new BufferedRequest(request); Entity newEntity = getEntity(bufferedRequest, type); entityHasExtensionJobTag(newEntity); - final Set<String> oldColos = getApplicableColos(type, entityName); - final Set<String> newColos = getApplicableColos(type, newEntity); - final Set<String> mergedColos = new HashSet<String>(); - mergedColos.addAll(oldColos); - mergedColos.retainAll(newColos); //Common colos where update should be called - newColos.removeAll(oldColos); //New colos where submit should be called - oldColos.removeAll(mergedColos); //Old colos where delete should be called - Map<String, APIResult> results = new HashMap<String, APIResult>(); + Map<String, APIResult> results = new HashMap<>(); boolean result = true; - if (!oldColos.isEmpty()) { - results.put(FALCON_TAG + "/delete", new EntityProxy(type, entityName) { - @Override - protected Set<String> getColosToApply() { - return oldColos; - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo); - } - }.execute()); - } - - if (!mergedColos.isEmpty()) { - results.put(FALCON_TAG + "/update", new EntityProxy(type, entityName) { - @Override - protected Set<String> getColosToApply() { - return mergedColos; - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type, entityName, - colo, skipDryRun); - } - }.execute()); - } - - if (!newColos.isEmpty()) { - results.put(FALCON_TAG + "/submit", new EntityProxy(type, entityName) { - @Override - protected Set<String> getColosToApply() { - return newColos; - } - - @Override - protected APIResult doExecute(String colo) throws FalconException { - return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo); - } - }.execute()); - } + results.putAll(entityProxyUtil.proxyUpdate(type, entityName, skipDryRun, bufferedRequest, newEntity)); for (APIResult apiResult : results.values()) { if (apiResult.getStatus() != APIResult.Status.SUCCEEDED) { @@ -451,8 +357,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getConfigSyncChannel(colo).invoke("updateClusterDependents", clusterName, - colo, skipDryRun); + return entityProxyUtil.getConfigSyncChannel(colo).invoke("updateClusterDependents", + clusterName, colo, skipDryRun); } }.execute()); } @@ -497,7 +403,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("touch", type, entityName, colo, skipDryRun); + return entityProxyUtil.getEntityManager(colo).invoke("touch", type, entityName, colo, skipDryRun); } }.execute(); } @@ -527,7 +433,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("getStatus", type, entity, colo, showScheduler); + return entityProxyUtil.getEntityManager(colo).invoke("getStatus", type, entity, colo, + showScheduler); } }.execute(); } @@ -592,8 +499,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo, skipDryRun, - properties); + return entityProxyUtil.getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, + colo, skipDryRun, properties); } }.execute(); } @@ -654,7 +561,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity, colo); + return entityProxyUtil.getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity, + colo); } }.execute(); } @@ -686,7 +594,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("resume", bufferedRequest, type, entity, colo); + return entityProxyUtil.getEntityManager(colo).invoke("resume", bufferedRequest, type, entity, + colo); } }.execute(); } @@ -811,9 +720,9 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected EntitySummaryResult doExecute(String colo) throws FalconException { - EntitySummaryResult es = getEntityManager(colo).invoke("getEntitySummary", type, clusterName, startStr, - endStr, entityFields, entityFilter, entityTags, entityOrderBy, entitySortOrder, entityOffset, - numEntities, numInstanceResults, doAsUser); + EntitySummaryResult es = entityProxyUtil.getEntityManager(colo).invoke("getEntitySummary", type, + clusterName, startStr, endStr, entityFields, entityFilter, entityTags, entityOrderBy, + entitySortOrder, entityOffset, numEntities, numInstanceResults, doAsUser); return es; } }.execute(); @@ -844,7 +753,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected FeedLookupResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("reverseLookup", type, path); + return entityProxyUtil.getEntityManager(colo).invoke("reverseLookup", type, path); } }.execute(); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java index 7688619..23f4cf1 100644 --- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java +++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java @@ -146,12 +146,14 @@ public final class BacklogMetricEmitterService implements FalconService, return; } Process newProcess = (Process) newEntity; + Process oldProcess = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName()); if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){ - backlogMetricStore.deleteEntityInstance(newProcess.getName()); - entityBacklogs.remove(newProcess); - Process process = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName()); - for(Cluster cluster : process.getClusters().getClusters()){ - dropMetric(cluster.getName(), process); + if (oldProcess.getSla() != null) { + backlogMetricStore.deleteEntityInstance(newProcess.getName()); + entityBacklogs.remove(newProcess); + for (Cluster cluster : oldProcess.getClusters().getClusters()) { + dropMetric(cluster.getName(), oldProcess); + } } } else { addToBacklog(newEntity); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 3a3c5b2..66b8e9b 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -293,7 +293,8 @@ public class FalconUnitClient extends AbstractFalconClient { } } - private SortedMap<EntityType, List<Entity>> getEntityTypeListMap(String extensionName, String jobName, InputStream configStream) { + private SortedMap<EntityType, List<Entity>> getEntityTypeListMap(String extensionName, String jobName, + InputStream configStream) { List<Entity> entities = getEntities(extensionName, jobName, configStream); List<Entity> feeds = new ArrayList<>(); List<Entity> processes = new ArrayList<>(); @@ -335,6 +336,18 @@ public class FalconUnitClient extends AbstractFalconClient { } } + public APIResult updateExtensionJob(String jobName, String configPath, String doAsUser) { + InputStream configStream = getServletInputStream(configPath); + try { + String extensionName = ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName(); + SortedMap<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); + return localExtensionManager.updateExtensionJob(extensionName, jobName, configStream, + entityMap); + } catch (FalconException | IOException e) { + throw new FalconCLIException("Failed in updating the extension job " + jobName); + } + } + @Override public APIResult getExtensionJobDetails(final String jobName) { return localExtensionManager.getExtensionJobDetails(jobName); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 4cf3ae4..7002dc8 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -86,6 +86,32 @@ public class LocalExtensionManager extends AbstractExtensionManager { return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); } + public APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream, + SortedMap<EntityType, List<Entity>> entityMap) + throws FalconException, IOException { + List<String> feedNames = new ArrayList<>(); + List<String> processNames = new ArrayList<>(); + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (Entity entity : entry.getValue()) { + update(entity, entity.getEntityType().toString(), entity.getName(), true); + } + } + byte[] configBytes = null; + if (configStream != null) { + configBytes = IOUtils.toByteArray(configStream); + } + for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) { + for (final Entity entity : entry.getValue()) { + if (entity.getEntityType().equals(EntityType.FEED)) { + feedNames.add(entity.getName()); + } else { + processNames.add(entity.getName()); + } + } + } + ExtensionStore.getMetaStore().updateExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); + return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully"); + } public APIResult registerExtensionMetadata(String extensionName, String packagePath , String description) { return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser()); @@ -100,7 +126,7 @@ public class LocalExtensionManager extends AbstractExtensionManager { } public APIResult getExtensionDetails(String extensionName){ - return super.getExtensionJobDetail(extensionName); + return super.getExtensionDetail(extensionName); } public APIResult getExtensions(){ http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 0771b9d..690fdd5 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -241,6 +241,11 @@ public class FalconUnitTestBase { return falconUnitClient.submitAndScheduleExtensionJob(extensionName, jobName, configPath, doAsUser); } + APIResult updateExtensionJob(String jobName, String configPath, String doAsUser) { + return falconUnitClient.updateExtensionJob(jobName, configPath, doAsUser); + } + + public static String overlayParametersOverTemplate(String template, Map<String, String> overlay) throws IOException { File tmpFile = getTempFile(); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 293bb23..07d8195 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -70,7 +70,8 @@ public class TestFalconUnit extends FalconUnitTestBase { private static final String WORKFLOW = "workflow.xml"; private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml"; private static final String EXTENSION_PATH = "/projects/falcon/extension/testExtension"; - public static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources"; + private static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources"; + private static final String EXTENSION_PROPERTIES = "extension.properties"; private FileSystem fileSystem; private static final String STORAGE_URL = "jail://global:00"; @@ -445,6 +446,20 @@ public class TestFalconUnit extends FalconUnitTestBase { result = getExtensionJobDetails("testJob"); JSONObject resultJson = new JSONObject(result); Assert.assertEquals(resultJson.get("extensionName"), "testExtension"); + Process process = (Process)getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); + Assert.assertEquals(process.getPipelines(), "testPipeline"); + + apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); + assertStatus(apiResult); + Assert.assertEquals(apiResult.getMessage(), "RUNNING"); + + apiResult = updateExtensionJob("testJob", getAbsolutePath(EXTENSION_PROPERTIES), null); + assertStatus(apiResult); + + String processes = new JSONObject(getExtensionJobDetails("testJob")).get("processes").toString(); + Assert.assertEquals(processes, "sample"); + process = (Process)getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); + Assert.assertEquals(process.getPipelines(), "testSample"); apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); assertStatus(apiResult); http://git-wip-us.apache.org/repos/asf/falcon/blob/4f42dc11/unit/src/test/resources/extension.properties ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/extension.properties b/unit/src/test/resources/extension.properties index d52de1e..0f2d7e8 100644 --- a/unit/src/test/resources/extension.properties +++ b/unit/src/test/resources/extension.properties @@ -20,4 +20,4 @@ #### This is used for falcon packaging only. #### #################################################### -pipelines.name=test +pipelines.name=testSample
