Repository: falcon Updated Branches: refs/heads/master b48f2df48 -> 9946758ea
FALCON-2226 Submit ,Schedule and submitAndSchedule API for extension Author: Praveen Adlakha <[email protected]> Reviewers: @pallavi-rao Closes #327 from PraveenAdlakha/2226 and squashes the following commits: 59a43ef [Praveen Adlakha] minor changes in falcon client and localextensionmanager 473f04a [Praveen Adlakha] comments addressed 6a346aa [Praveen Adlakha] comments addressed 8733f53 [Praveen Adlakha] checkstyle issue resolved 9ba005e [Praveen Adlakha] FALCON-2226 Submit ,Schedule and submitAndSchedule API for extension in distributed mode 699b06f [Praveen Adlakha] WIP 29da911 [Praveen Adlakha] WIP 0b7d02a [Praveen Adlakha] merge conflicts resolved 6b77cc1 [Praveen Adlakha] FALCON-2223 Distributed mode support for User Extension Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9946758e Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9946758e Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9946758e Branch: refs/heads/master Commit: 9946758eac38d985a0c50fddee4267e8e320f7a2 Parents: b48f2df Author: Praveen Adlakha <[email protected]> Authored: Mon Dec 26 16:57:43 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Dec 26 16:57:43 2016 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/client/FalconClient.java | 1 + .../resource/AbstractExtensionManager.java | 63 ------- .../falcon/resource/proxy/EntityProxy.java | 90 ++++++++++ .../resource/proxy/ExtensionManagerProxy.java | 176 ++++++++++++++----- .../HttpServletRequestInputStreamWrapper.java | 42 +++++ .../proxy/SchedulableEntityManagerProxy.java | 56 ------ .../apache/falcon/unit/FalconUnitClient.java | 17 +- .../falcon/unit/LocalExtensionManager.java | 48 ++++- 8 files changed, 315 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 1614b24..d5b8342 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -1137,6 +1137,7 @@ public class FalconClient extends AbstractFalconClient { FormDataMultiPart entitiesForm = getEntitiesForm(extensionName, jobName, configPath); ClientResponse clientResponse = new ResourceBuilder() .path(ExtensionOperations.SUBMIT_AND_SCHEDULE.path, extensionName) + .addQueryParam(JOB_NAME_OPT, jobName) .addQueryParam(DO_AS_OPT, doAsUser) .call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entitiesForm); return getResponse(APIResult.class, clientResponse); http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java index f0f21a9..a50b535 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java @@ -17,32 +17,20 @@ */ package org.apache.falcon.resource; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; -import org.apache.falcon.entity.parser.ProcessEntityParser; import org.apache.falcon.entity.parser.ValidationException; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.persistence.ExtensionJobsBean; import org.apache.falcon.security.CurrentUser; -import org.apache.hadoop.security.authorize.AuthorizationException; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.Response; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; /** * A base class for managing Extension Operations. @@ -113,57 +101,6 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { return detailsObject; } - - protected void submitEntities(String extensionName, String doAsUser, String jobName, - Map<EntityType, List<Entity>> entityMap, InputStream configStream) - throws FalconException, IOException { - List<Entity> feeds = entityMap.get(EntityType.FEED); - List<Entity> processes = entityMap.get(EntityType.PROCESS); - validateFeeds(feeds); - validateProcesses(processes); - List<String> feedNames = new ArrayList<>(); - List<String> processNames = new ArrayList<>(); - for (Entity feed : feeds) { - submitInternal(feed, doAsUser); - feedNames.add(feed.getName()); - } - for (Entity process: processes) { - submitInternal(process, doAsUser); - processNames.add(process.getName()); - } - - ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); - byte[] configBytes = null; - if (configStream != null) { - configBytes = IOUtils.toByteArray(configStream); - } - metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); - } - - - private void validateFeeds(List<Entity> feeds) throws FalconException { - for (Entity feed : feeds) { - super.validate(feed); - } - } - - private void validateProcesses(List<Entity> processes) throws FalconException { - ProcessEntityParser processEntityParser = new ProcessEntityParser(); - for (Entity process : processes) { - processEntityParser.validate((Process)process, false); - } - } - - protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap) throws FalconException, - AuthorizationException { - for (Object feed: entityMap.get(EntityType.FEED)) { - scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), null, null); - } - for (Object process: entityMap.get(EntityType.PROCESS)) { - scheduleInternal(EntityType.PROCESS.name(), ((Process)process).getName(), null, null); - } - } - public static String getJobNameFromTag(String tags) { int nameStart = tags.indexOf(TAG_PREFIX_EXTENSION_JOB); if (nameStart == -1) { http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxy.java new file mode 100644 index 0000000..e1240e3 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxy.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource.proxy; + +import org.apache.falcon.FalconException; +import org.apache.falcon.FalconRuntimException; +import org.apache.falcon.FalconWebException; +import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.AbstractEntityManager; + +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Entity Proxy class to talk to channels. + */ +public abstract class EntityProxy<T extends APIResult> extends AbstractEntityManager{ + private final Class<T> clazz; + private String type; + private String name; + + public EntityProxy(String type, String name, Class<T> resultClazz) { + this.clazz = resultClazz; + this.type = type; + this.name = name; + } + + + private T getResultInstance(APIResult.Status status, String message) { + try { + Constructor<T> constructor = clazz.getConstructor(APIResult.Status.class, String.class); + return constructor.newInstance(status, message); + } catch (Exception e) { + throw new FalconRuntimException("Unable to consolidate result.", e); + } + } + + public EntityProxy(String type, String name) { + this(type, name, (Class<T>) APIResult.class); + } + + public T execute() { + Set<String> colos = getColosToApply(); + + Map<String, T> results = new HashMap(); + + for (String colo : colos) { + try { + results.put(colo, doExecute(colo)); + } catch (FalconWebException e) { + String message = ((APIResult) e.getResponse().getEntity()).getMessage(); + results.put(colo, getResultInstance(APIResult.Status.FAILED, message)); + } catch (Throwable throwable) { + results.put(colo, getResultInstance(APIResult.Status.FAILED, throwable.getClass().getName() + "::" + + throwable.getMessage())); + } + } + + T finalResult = consolidateResult(results, clazz); + if (finalResult.getStatus() == APIResult.Status.FAILED) { + throw FalconWebException.newAPIException(finalResult.getMessage()); + } else { + return finalResult; + } + } + + protected Set<String> getColosToApply() { + return getApplicableColos(type, name); + } + + protected abstract T doExecute(String colo) throws FalconException; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java index 5e556a1..9ddebe8 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java @@ -44,16 +44,18 @@ import org.apache.falcon.resource.AbstractExtensionManager; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.ExtensionInstanceList; import org.apache.falcon.resource.ExtensionJobList; +import org.apache.falcon.resource.channel.Channel; +import org.apache.falcon.resource.channel.ChannelFactory; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.Services; import org.apache.falcon.util.DeploymentUtil; -import org.apache.hadoop.security.authorize.AuthorizationException; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.ServletInputStream; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -66,16 +68,22 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; +import javax.xml.bind.JAXBException; import java.util.Collections; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Arrays; +import java.util.Set; +import java.util.Properties; import java.util.List; import java.util.Map; -import java.util.Properties; +import java.util.TreeMap; +import java.util.SortedMap; +import java.io.IOException; +import java.io.InputStream; +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; /** * Jersey Resource for extension job operations. @@ -97,6 +105,11 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { private static final String EXTENSION_TYPE = "type"; private static final String EXTENSION_DESC = "description"; private static final String EXTENSION_LOCATION = "location"; + private boolean embeddedMode = DeploymentUtil.isEmbeddedMode(); + private String currentColo = DeploymentUtil.getCurrentColo(); + private final Map<String, Channel> configSyncChannels = new HashMap<String, Channel>(); + private static final String PRISM_TAG = "prism"; + private final Map<String, Channel> entityManagerChannels = new HashMap<String, Channel>(); @@ -313,19 +326,19 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @FormDataParam("feeds") List<FormDataBodyPart> feedForms, @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); - Map<EntityType, List<Entity>> entityMap; + SortedMap<EntityType, List<Entity>> entityMap; try { entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); - submitEntities(extensionName, doAsUser, jobName, entityMap, config); - } catch (FalconException | IOException e) { + submitEntities(extensionName, jobName, entityMap, config, request); + } catch (FalconException | IOException | JAXBException e) { LOG.error("Error while submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully:" + jobName); } - private Map<EntityType, List<Entity>> getEntityList(String extensionName, String jobName, + private SortedMap<EntityType, List<Entity>> getEntityList(String extensionName, String jobName, List<FormDataBodyPart> feedForms, List<FormDataBodyPart> processForms, InputStream config) throws FalconException, IOException{ @@ -333,7 +346,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { List<Entity> feeds = getFeeds(feedForms); ExtensionType extensionType = getExtensionType(extensionName); List<Entity> entities; - Map<EntityType, List<Entity>> entityMap = new HashMap<>(); + TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>(); if (ExtensionType.TRUSTED.equals(extensionType)) { entities = generateEntities(extensionName, config); List<Entity> trustedFeeds = new ArrayList<>(); @@ -364,6 +377,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { return extensionDetails.getExtensionType(); } + private Channel getEntityManager(String colo) throws FalconException { + if (!entityManagerChannels.containsKey(colo)) { + initializeFor(colo); + } + return entityManagerChannels.get(colo); + } + @POST @Path("submitAndSchedule/{extension-name}") @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA}) @@ -377,54 +397,85 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { @FormDataParam("feeds") List<FormDataBodyPart> feedForms, @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); - Map<EntityType, List<Entity>> entityMap; + SortedMap<EntityType, List<Entity>> entityMap; try { entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config); - submitEntities(extensionName, doAsUser, jobName, entityMap, config); - scheduleEntities(entityMap); - } catch (FalconException | IOException e) { + submitEntities(extensionName, jobName, entityMap, config, request); + scheduleEntities(entityMap, request); + } catch (FalconException | IOException | JAXBException e) { LOG.error("Error while submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"); } - private List<Entity> getFeeds(List<FormDataBodyPart> feedForms) { - List<Entity> feeds = new ArrayList<>(); - if (feedForms != null && !feedForms.isEmpty()) { - for (FormDataBodyPart formDataBodyPart : feedForms) { - feeds.add(formDataBodyPart.getValueAs(Feed.class)); + protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request) + throws FalconException, JAXBException, IOException { + + for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ + for(final Entity entity : entry.getValue()){ + final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request); + final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest); + final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity); + + new EntityProxy(entity.getEntityType().toString(), entity.getName()) { + @Override + protected Set<String> getColosToApply() { + return colos; + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getEntityManager(colo).invoke("schedule", bufferedRequest, entity.getEntityType().toString(), + entity.getName(), colo, Boolean.FALSE, ""); + } + }.execute(); } } - return feeds; } - private List<Entity> getProcesses(List<FormDataBodyPart> processForms) { - List<Entity> processes = new ArrayList<>(); - if (processForms != null && !processForms.isEmpty()) { - for (FormDataBodyPart formDataBodyPart : processForms) { - processes.add(formDataBodyPart.getValueAs(Process.class)); - } + private BufferedRequest getBufferedRequest(HttpServletRequest request) { + if (request instanceof BufferedRequest) { + return (BufferedRequest) request; } - return processes; + return new BufferedRequest(request); } - protected void submitEntities(String extensionName, String doAsUser, String jobName, - Map<EntityType, List<Entity>> entityMap, InputStream configStream) - throws FalconException, IOException { + protected void submitEntities(String extensionName, String jobName, + SortedMap<EntityType, List<Entity>> entityMap, InputStream configStream, + HttpServletRequest request) throws FalconException, IOException, JAXBException { List<Entity> feeds = entityMap.get(EntityType.FEED); List<Entity> processes = entityMap.get(EntityType.PROCESS); validateFeeds(feeds); validateProcesses(processes); List<String> feedNames = new ArrayList<>(); List<String> processNames = new ArrayList<>(); - for (Entity feed : feeds) { - submitInternal(feed, doAsUser); - feedNames.add(feed.getName()); - } - for (Entity process: processes) { - submitInternal(process, doAsUser); - processNames.add(process.getName()); + + for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ + for(final Entity entity : entry.getValue()){ + final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request); + final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity); + new EntityProxy(entity.getEntityType().toString(), entity.getName()) { + @Override + protected Set<String> getColosToApply() { + return colos; + } + + @Override + protected APIResult doExecute(String colo) throws FalconException { + return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, + entity.getEntityType().toString(), colo); + } + }.execute(); + if (!embeddedMode) { + super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo); + } + if (entity.getEntityType().equals(EntityType.FEED)){ + feedNames.add(entity.getName()); + }else{ + processNames.add(entity.getName()); + } + } } ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); @@ -435,14 +486,29 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); } - protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap) throws FalconException, - AuthorizationException { - for (Object feed: entityMap.get(EntityType.FEED)) { - scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), null, null); - } - for (Object process: entityMap.get(EntityType.PROCESS)) { - scheduleInternal(EntityType.PROCESS.name(), ((Process)process).getName(), null, null); + private void initializeFor(String colo) throws FalconException { + entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo)); + configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo)); + } + + private Channel getConfigSyncChannel(String colo) throws FalconException { + if (!configSyncChannels.containsKey(colo)) { + initializeFor(colo); } + return configSyncChannels.get(colo); + } + + private HttpServletRequest getEntityStream(Entity entity, EntityType type, HttpServletRequest request) throws IOException, JAXBException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + type.getMarshaller().marshal(entity, baos); + final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(baos.toByteArray()); + ServletInputStream servletInputStream=new ServletInputStream(){ + public int read() throws IOException { + return byteArrayInputStream.read(); + } + }; + return getBufferedRequest(new HttpServletRequestInputStreamWrapper(request, servletInputStream)); } @@ -459,6 +525,26 @@ public class ExtensionManagerProxy extends AbstractExtensionManager { } } + private List<Entity> getFeeds(List<FormDataBodyPart> feedForms) { + List<Entity> feeds = new ArrayList<>(); + if (feedForms != null && !feedForms.isEmpty()) { + for (FormDataBodyPart formDataBodyPart : feedForms) { + feeds.add(formDataBodyPart.getValueAs(Feed.class)); + } + } + return feeds; + } + + private List<Entity> getProcesses(List<FormDataBodyPart> processForms) { + List<Entity> processes = new ArrayList<>(); + if (processForms != null && !processForms.isEmpty()) { + for (FormDataBodyPart formDataBodyPart : processForms) { + processes.add(formDataBodyPart.getValueAs(Process.class)); + } + } + return processes; + } + @POST @Path("update/{extension-name}") @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/prism/src/main/java/org/apache/falcon/resource/proxy/HttpServletRequestInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/HttpServletRequestInputStreamWrapper.java b/prism/src/main/java/org/apache/falcon/resource/proxy/HttpServletRequestInputStreamWrapper.java new file mode 100644 index 0000000..5492c0b --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/HttpServletRequestInputStreamWrapper.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.resource.proxy; + +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; + +/** + * Wrapper class to wrap new ServletInputStream. + */ + +public class HttpServletRequestInputStreamWrapper extends HttpServletRequestWrapper { + + private ServletInputStream inputStream; + + + public HttpServletRequestInputStreamWrapper(HttpServletRequest request , ServletInputStream stream){ + super(request); + this.inputStream = stream; + } + + @Override + public ServletInputStream getInputStream(){ + return inputStream; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 26de20e..ed1054c 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -54,7 +54,6 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -852,60 +851,5 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana } //RESUME CHECKSTYLE CHECK ParameterNumberCheck - private abstract class EntityProxy<T extends APIResult> { - private final Class<T> clazz; - private String type; - private String name; - public EntityProxy(String type, String name, Class<T> resultClazz) { - this.clazz = resultClazz; - this.type = type; - this.name = name; - } - - - private T getResultInstance(APIResult.Status status, String message) { - try { - Constructor<T> constructor = clazz.getConstructor(APIResult.Status.class, String.class); - return constructor.newInstance(status, message); - } catch (Exception e) { - throw new FalconRuntimException("Unable to consolidate result.", e); - } - } - - public EntityProxy(String type, String name) { - this(type, name, (Class<T>) APIResult.class); - } - - public T execute() { - Set<String> colos = getColosToApply(); - - Map<String, T> results = new HashMap(); - - for (String colo : colos) { - try { - results.put(colo, doExecute(colo)); - } catch (FalconWebException e) { - String message = ((APIResult) e.getResponse().getEntity()).getMessage(); - results.put(colo, getResultInstance(APIResult.Status.FAILED, message)); - } catch (Throwable throwable) { - results.put(colo, getResultInstance(APIResult.Status.FAILED, throwable.getClass().getName() + "::" - + throwable.getMessage())); - } - } - - T finalResult = consolidateResult(results, clazz); - if (finalResult.getStatus() == APIResult.Status.FAILED) { - throw FalconWebException.newAPIException(finalResult.getMessage()); - } else { - return finalResult; - } - } - - protected Set<String> getColosToApply() { - return getApplicableColos(type, name); - } - - protected abstract T doExecute(String colo) throws FalconException; - } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index c9e1d4c..7036dc7 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -56,14 +56,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; +import java.util.TreeMap; import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedHashMap; +import java.util.Properties; import java.util.List; import java.util.Map; -import java.util.Properties; +import java.util.Date; +import java.util.LinkedHashMap; import java.util.TimeZone; +import java.util.SortedMap; /** * Client for Falcon Unit. @@ -285,14 +286,14 @@ public class FalconUnitClient extends AbstractFalconClient { InputStream configStream = getServletInputStream(configPath); try { - Map<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); + SortedMap<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); return localExtensionManager.submitExtensionJob(extensionName, jobName, configStream, entityMap); } catch (FalconException | IOException e) { throw new FalconCLIException("Failed in submitting extension job " + jobName); } } - private Map<EntityType, List<Entity>> getEntityTypeListMap(String extensionName, String jobName, InputStream configStream) { + private SortedMap<EntityType, List<Entity>> getEntityTypeListMap(String extensionName, String jobName, InputStream configStream) { List<Entity> entities = getEntities(extensionName, jobName, configStream); List<Entity> feeds = new ArrayList<>(); List<Entity> processes = new ArrayList<>(); @@ -303,7 +304,7 @@ public class FalconUnitClient extends AbstractFalconClient { processes.add(entity); } } - Map<EntityType, List<Entity>> entityMap = new HashMap<>(); + SortedMap<EntityType, List<Entity>> entityMap = new TreeMap<>(); entityMap.put(EntityType.PROCESS, processes); entityMap.put(EntityType.FEED, feeds); return entityMap; @@ -326,7 +327,7 @@ public class FalconUnitClient extends AbstractFalconClient { String doAsUser) { InputStream configStream = getServletInputStream(configPath); try { - Map<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); + SortedMap<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName, jobName, configStream); return localExtensionManager.submitAndSchedulableExtensionJob(extensionName, jobName, configStream, entityMap); } catch (FalconException | IOException e) { http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 7f55f9a..57c339e 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -18,17 +18,21 @@ package org.apache.falcon.unit; +import org.apache.commons.io.IOUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractExtensionManager; import org.apache.falcon.security.CurrentUser; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.SortedMap; /** * A proxy implementation of the extension operations in local mode. @@ -37,16 +41,48 @@ public class LocalExtensionManager extends AbstractExtensionManager { public LocalExtensionManager() {} public APIResult submitExtensionJob(String extensionName, String jobName, InputStream config, - Map<EntityType, List<Entity>> entityMap) throws FalconException, IOException { - submitEntities(extensionName, null, jobName, entityMap, config); + SortedMap<EntityType, List<Entity>> entityMap) + throws FalconException, IOException { + + for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ + for(Entity entity : entry.getValue()){ + submitInternal(entity, "falconUser"); + } + } return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); } - public APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream config, - Map<EntityType, List<Entity>> entityMap) + public APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream configStream, + SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException { - submitEntities(extensionName, null, jobName, entityMap, config); - scheduleEntities(entityMap); + List<String> feedNames = new ArrayList<>(); + List<String> processNames = new ArrayList<>(); + for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ + for(Entity entity : entry.getValue()){ + submitInternal(entity, "falconUser"); + } + } + + for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ + for(Entity entity : entry.getValue()){ + scheduleInternal(entry.getKey().name(), entity.getName(), null, null); + } + } + byte[] configBytes = null; + if (configStream != null) { + configBytes = IOUtils.toByteArray(configStream); + } + for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){ + for(final Entity entity : entry.getValue()){ + if (entity.getEntityType().equals(EntityType.FEED)){ + feedNames.add(entity.getName()); + }else{ + processNames.add(entity.getName()); + } + } + } + ExtensionStore.getMetaStore().storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes); + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); }
