Repository: nifi Updated Branches: refs/heads/NIFI-747 645d3cc2f -> 22adb9a0f
NIFI-747 changes suggested in code review Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/22adb9a0 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/22adb9a0 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/22adb9a0 Branch: refs/heads/NIFI-747 Commit: 22adb9a0f54bc26aecc7218a056aac547670e0b2 Parents: 645d3cc Author: Tony Kurc <[email protected]> Authored: Sun Oct 18 19:46:54 2015 -0400 Committer: Tony Kurc <[email protected]> Committed: Sun Oct 18 19:46:54 2015 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/ListenHTTP.java | 18 +++++++++++------- .../standard/servlets/ListenHTTPServlet.java | 4 +++- 2 files changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/22adb9a0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index f76756b..a446eb6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -74,9 +74,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .description("Relationship for successfully received FlowFiles") .build(); - public static final PropertyDescriptor URI = new PropertyDescriptor.Builder() - .name("URI") - .description("URI for incoming connections") + public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder() + .name("Base Path") + .description("Base path for incoming connections") .required(true) .defaultValue("contentListener") .addValidator(StandardValidators.URI_VALIDATOR) @@ -129,6 +129,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern"; public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap"; public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler"; + public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath"; private volatile Server server = null; private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>(); @@ -141,7 +142,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { this.relationships = Collections.unmodifiableSet(relationships); final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(URI); + descriptors.add(BASE_PATH); descriptors.add(PORT); descriptors.add(MAX_DATA_RATE); descriptors.add(SSL_CONTEXT_SERVICE); @@ -178,7 +179,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { } private void createHttpServerFromService(final ProcessContext context) throws Exception { - final String uri = context.getProperty(URI).getValue(); + final String basePath = context.getProperty(BASE_PATH).getValue(); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); @@ -240,12 +241,13 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { for (final Class<? extends Servlet> cls : getServerClasses()) { final Path path = cls.getAnnotation(Path.class); // Note: servlets must have a path annotation - this will NPE otherwise - if(uri.isEmpty() && !path.value().isEmpty()){ + // also, servlets other than ListenHttpServlet must have a path starting with / + if(basePath.isEmpty() && !path.value().isEmpty()){ // Note: this is to handle the condition of an empty uri, otherwise pathSpec would start with // contextHandler.addServlet(cls, path.value()); } else{ - contextHandler.addServlet(cls, "/" + uri + path.value()); + contextHandler.addServlet(cls, "/" + basePath + path.value()); } } @@ -256,6 +258,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue())); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath); if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue())); @@ -273,6 +276,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { protected Set<Class<? extends Servlet>> getServerClasses() { final Set<Class<? extends Servlet>> s = new HashSet<>(); // NOTE: Servlets added below MUST have a Path annotation + // any servlets other than ListenHTTPServlet must have a Path annotation start with / s.add(ListenHTTPServlet.class); s.add(ContentAcknowledgmentServlet.class); return s; http://git-wip-us.apache.org/repos/asf/nifi/blob/22adb9a0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 6da32d5..79d3887 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -94,6 +94,7 @@ public class ListenHTTPServlet extends HttpServlet { private Pattern headerPattern; private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap; private StreamThrottler streamThrottler; + private String basePath; @SuppressWarnings("unchecked") @Override @@ -106,6 +107,7 @@ public class ListenHTTPServlet extends HttpServlet { this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); + this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH); } @Override @@ -292,7 +294,7 @@ public class ListenHTTPServlet extends HttpServlet { } while (previousWrapper != null); response.setStatus(HttpServletResponse.SC_SEE_OTHER); - final String ackUri = ListenHTTP.URI + "/holds/" + uuid; + final String ackUri = "/" + basePath + "/holds/" + uuid; response.addHeader(LOCATION_HEADER_NAME, ackUri); response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); response.getOutputStream().write(ackUri.getBytes("UTF-8"));
