http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d29a2d68/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 451ba57..6f228b2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -94,26 +94,26 @@ public class UnpackContent extends AbstractProcessor { public static final String OCTET_STREAM = "application/octet-stream"; - public static final PropertyDescriptor PACKAGING_FORMAT = new PropertyDescriptor.Builder(). - name("Packaging Format"). - description("The Packaging Format used to create the file"). - required(true). - allowableValues(AUTO_DETECT_FORMAT, TAR_FORMAT, ZIP_FORMAT, FLOWFILE_STREAM_FORMAT_V3, FLOWFILE_STREAM_FORMAT_V2, FLOWFILE_TAR_FORMAT). - defaultValue(AUTO_DETECT_FORMAT). - build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder(). - name("success"). - description("Unpacked FlowFiles are sent to this relationship"). - build(); - public static final Relationship REL_ORIGINAL = new Relationship.Builder(). - name("original"). - description("The original FlowFile is sent to this relationship after it has been successfully unpacked"). - build(); - public static final Relationship REL_FAILURE = new Relationship.Builder(). - name("failure"). - description("The original FlowFile is sent to this relationship when it cannot be unpacked for some reason"). - build(); + public static final PropertyDescriptor PACKAGING_FORMAT = new PropertyDescriptor.Builder() + .name("Packaging Format") + .description("The Packaging Format used to create the file") + .required(true) + .allowableValues(AUTO_DETECT_FORMAT, TAR_FORMAT, ZIP_FORMAT, FLOWFILE_STREAM_FORMAT_V3, FLOWFILE_STREAM_FORMAT_V2, FLOWFILE_TAR_FORMAT) + .defaultValue(AUTO_DETECT_FORMAT) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Unpacked FlowFiles are sent to this relationship") + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile is sent to this relationship after it has been successfully unpacked") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("The original FlowFile is sent to this relationship when it cannot be unpacked for some reason") + .build(); private Set<Relationship> relationships; private List<PropertyDescriptor> properties; @@ -149,15 +149,11 @@ public class UnpackContent extends AbstractProcessor { } final ProcessorLog logger = getLogger(); - String packagingFormat = context.getProperty(PACKAGING_FORMAT). - getValue(). - toLowerCase(); + String packagingFormat = context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase(); if (AUTO_DETECT_FORMAT.equals(packagingFormat)) { - final String mimeType = flowFile. - getAttribute(CoreAttributes.MIME_TYPE.key()); + final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); if (mimeType == null) { - logger. - error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile}); + logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile}); session.transfer(flowFile, REL_FAILURE); return; } @@ -179,8 +175,7 @@ public class UnpackContent extends AbstractProcessor { packagingFormat = FLOWFILE_TAR_FORMAT; break; default: { - logger. - info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); + logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); session.transfer(flowFile, REL_SUCCESS); return; } @@ -211,17 +206,14 @@ public class UnpackContent extends AbstractProcessor { addFragmentAttrs = false; break; default: - throw new AssertionError("Packaging Format was " + context. - getProperty(PACKAGING_FORMAT). - getValue()); + throw new AssertionError("Packaging Format was " + context.getProperty(PACKAGING_FORMAT).getValue()); } final List<FlowFile> unpacked = new ArrayList<>(); try { unpacker.unpack(session, flowFile, unpacked); if (unpacked.isEmpty()) { - logger. - error("Unable to unpack {} because it does not appear to have any entries; routing to failure", new Object[]{flowFile}); + logger.error("Unable to unpack {} because it does not appear to have any entries; routing to failure", new Object[]{flowFile}); session.transfer(flowFile, REL_FAILURE); return; } @@ -231,13 +223,10 @@ public class UnpackContent extends AbstractProcessor { } session.transfer(unpacked, REL_SUCCESS); session.transfer(flowFile, REL_ORIGINAL); - session.getProvenanceReporter(). - fork(flowFile, unpacked); - logger. - info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked}); + session.getProvenanceReporter().fork(flowFile, unpacked); + logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked}); } catch (final ProcessException e) { - logger. - error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e}); + logger.error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e}); session.transfer(flowFile, REL_FAILURE); session.remove(unpacked); } @@ -252,8 +241,7 @@ public class UnpackContent extends AbstractProcessor { @Override public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) { - final String fragmentId = UUID.randomUUID(). - toString(); + final String fragmentId = UUID.randomUUID().toString(); session.read(source, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { @@ -268,38 +256,28 @@ public class UnpackContent extends AbstractProcessor { final Path filePath = file.toPath(); final String filePathString = filePath.getParent() + "/"; final Path absPath = filePath.toAbsolutePath(); - final String absPathString = absPath.getParent(). - toString() + "/"; + final String absPathString = absPath.getParent().toString() + "/"; FlowFile unpackedFile = session.create(source); try { final Map<String, String> attributes = new HashMap<>(); - attributes. - put(CoreAttributes.FILENAME.key(), file. - getName()); - attributes. - put(CoreAttributes.PATH.key(), filePathString); - attributes.put(CoreAttributes.ABSOLUTE_PATH. - key(), absPathString); - attributes. - put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + attributes.put(CoreAttributes.FILENAME.key(), file.getName()); + attributes.put(CoreAttributes.PATH.key(), filePathString); + attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString); + attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); attributes.put(FRAGMENT_ID, fragmentId); - attributes.put(FRAGMENT_INDEX, String. - valueOf(++fragmentCount)); + attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount)); - unpackedFile = session. - putAllAttributes(unpackedFile, attributes); + unpackedFile = session.putAllAttributes(unpackedFile, attributes); final long fileSize = tarEntry.getSize(); - unpackedFile = session. - write(unpackedFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - StreamUtils. - copy(tarIn, out, fileSize); - } - }); + unpackedFile = session.write(unpackedFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils.copy(tarIn, out, fileSize); + } + }); } finally { unpacked.add(unpackedFile); } @@ -314,8 +292,7 @@ public class UnpackContent extends AbstractProcessor { @Override public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) { - final String fragmentId = UUID.randomUUID(). - toString(); + final String fragmentId = UUID.randomUUID().toString(); session.read(source, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { @@ -327,39 +304,28 @@ public class UnpackContent extends AbstractProcessor { continue; } final File file = new File(zipEntry.getName()); - final String parentDirectory = (file.getParent() == null) ? "/" : file. - getParent(); - final Path absPath = file.toPath(). - toAbsolutePath(); - final String absPathString = absPath.getParent(). - toString() + "/"; + final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent(); + final Path absPath = file.toPath().toAbsolutePath(); + final String absPathString = absPath.getParent().toString() + "/"; FlowFile unpackedFile = session.create(source); try { final Map<String, String> attributes = new HashMap<>(); - attributes. - put(CoreAttributes.FILENAME.key(), file. - getName()); - attributes. - put(CoreAttributes.PATH.key(), parentDirectory); - attributes.put(CoreAttributes.ABSOLUTE_PATH. - key(), absPathString); - attributes. - put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + attributes.put(CoreAttributes.FILENAME.key(), file.getName()); + attributes.put(CoreAttributes.PATH.key(), parentDirectory); + attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString); + attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); attributes.put(FRAGMENT_ID, fragmentId); - attributes.put(FRAGMENT_INDEX, String. - valueOf(++fragmentCount)); - - unpackedFile = session. - putAllAttributes(unpackedFile, attributes); - unpackedFile = session. - write(unpackedFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - StreamUtils.copy(zipIn, out); - } - }); + attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount)); + + unpackedFile = session.putAllAttributes(unpackedFile, attributes); + unpackedFile = session.write(unpackedFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils.copy(zipIn, out); + } + }); } finally { unpacked.add(unpackedFile); } @@ -388,24 +354,20 @@ public class UnpackContent extends AbstractProcessor { final ObjectHolder<Map<String, String>> attributesRef = new ObjectHolder<>(null); FlowFile unpackedFile = session.create(source); try { - unpackedFile = session. - write(unpackedFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream out = new BufferedOutputStream(rawOut)) { - final Map<String, String> attributes = unpackager. - unpackageFlowFile(in, out); - if (attributes == null) { - throw new IOException("Failed to unpack " + source + ": stream had no Attributes"); - } - attributesRef. - set(attributes); - } + unpackedFile = session.write(unpackedFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out); + if (attributes == null) { + throw new IOException("Failed to unpack " + source + ": stream had no Attributes"); } - }); + attributesRef.set(attributes); + } + } + }); - final Map<String, String> attributes = attributesRef. - get(); + final Map<String, String> attributes = attributesRef.get(); // Remove the UUID from the attributes because we don't want to use the same UUID for this FlowFile. // If we do, then we get into a weird situation if we use MergeContent to create a FlowFile Package @@ -413,24 +375,17 @@ public class UnpackContent extends AbstractProcessor { attributes.remove(CoreAttributes.UUID.key()); // maintain backward compatibility with legacy NiFi attribute names - mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME. - key()); - mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH. - key()); - mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE. - key()); - mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE. - key()); + mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME.key()); + mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH.key()); + mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key()); + mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key()); if (!attributes. - containsKey(CoreAttributes.MIME_TYPE. - key())) { - attributes.put(CoreAttributes.MIME_TYPE. - key(), OCTET_STREAM); + containsKey(CoreAttributes.MIME_TYPE.key())) { + attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); } - unpackedFile = session. - putAllAttributes(unpackedFile, attributes); + unpackedFile = session.putAllAttributes(unpackedFile, attributes); } finally { unpacked.add(unpackedFile); } @@ -455,8 +410,7 @@ public class UnpackContent extends AbstractProcessor { } /** - * If the unpacked flowfiles contain fragment index attributes, then we need - * to apply fragment count and other attributes for completeness. + * If the unpacked flowfiles contain fragment index attributes, then we need to apply fragment count and other attributes for completeness. * * @param session * @param source @@ -474,12 +428,9 @@ public class UnpackContent extends AbstractProcessor { } } - String originalFilename = source.getAttribute(CoreAttributes.FILENAME. - key()); - if (originalFilename.endsWith(".tar") || originalFilename. - endsWith(".zip") || originalFilename.endsWith(".pkg")) { - originalFilename = originalFilename.substring(0, originalFilename. - length() - 4); + String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key()); + if (originalFilename.endsWith(".tar") || originalFilename.endsWith(".zip") || originalFilename.endsWith(".pkg")) { + originalFilename = originalFilename.substring(0, originalFilename.length() - 4); } // second pass adds fragment attributes
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d29a2d68/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java index 40c7e65..ab12be2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java @@ -59,33 +59,24 @@ public class ContentAcknowledgmentServlet extends HttpServlet { @Override public void init(final ServletConfig config) throws ServletException { final ServletContext context = config.getServletContext(); - this.processor = (Processor) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESSOR); - this.logger = (ProcessorLog) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); - this.authorizedPattern = (Pattern) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); - this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); + this.processor = (Processor) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESSOR); + this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); + this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); + this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); } @Override protected void doDelete(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { - final X509Certificate[] certs = (X509Certificate[]) request. - getAttribute("javax.servlet.request.X509Certificate"); + final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); String foundSubject = DEFAULT_FOUND_SUBJECT; if (certs != null && certs.length > 0) { for (final X509Certificate cert : certs) { - foundSubject = cert.getSubjectDN(). - getName(); - if (authorizedPattern.matcher(foundSubject). - matches()) { + foundSubject = cert.getSubjectDN().getName(); + if (authorizedPattern.matcher(foundSubject).matches()) { break; } else { - logger. - warn(processor + " rejecting transfer attempt from " + foundSubject + " because the DN is not authorized"); - response. - sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn"); + logger.warn(processor + " rejecting transfer attempt from " + foundSubject + " because the DN is not authorized"); + response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn"); return; } } @@ -101,10 +92,8 @@ public class ContentAcknowledgmentServlet extends HttpServlet { final String uuid = uri.substring(slashIndex + 1, questionIndex); final FlowFileEntryTimeWrapper timeWrapper = flowFileMap.remove(uuid); if (timeWrapper == null) { - logger. - warn("received DELETE for HOLD with ID " + uuid + " from Remote Host: [" + request. - getRemoteHost() + "] Port [" + request. - getRemotePort() + "] SubjectDN [" + foundSubject + "], but no HOLD exists with that ID; sending response with Status Code 404"); + logger.warn("received DELETE for HOLD with ID " + uuid + " from Remote Host: [" + request.getRemoteHost() + + "] Port [" + request.getRemotePort() + "] SubjectDN [" + foundSubject + "], but no HOLD exists with that ID; sending response with Status Code 404"); response.sendError(HttpServletResponse.SC_NOT_FOUND); return; } @@ -112,8 +101,7 @@ public class ContentAcknowledgmentServlet extends HttpServlet { try { final Set<FlowFile> flowFiles = timeWrapper.getFlowFiles(); - final long transferTime = System.currentTimeMillis() - timeWrapper. - getEntryTime(); + final long transferTime = System.currentTimeMillis() - timeWrapper.getEntryTime(); long totalFlowFileSize = 0; for (final FlowFile flowFile : flowFiles) { totalFlowFileSize += flowFile.getSize(); @@ -124,13 +112,11 @@ public class ContentAcknowledgmentServlet extends HttpServlet { seconds = .00000001D; } final double bytesPerSecond = ((double) totalFlowFileSize / seconds); - final String transferRate = FormatUtils. - formatDataSize(bytesPerSecond) + "/sec"; + final String transferRate = FormatUtils.formatDataSize(bytesPerSecond) + "/sec"; - logger. - info("received {} files/{} bytes from Remote Host: [{}] Port [{}] SubjectDN [{}] in {} milliseconds at a rate of {}; transferring to 'success': {}", - new Object[]{flowFiles.size(), totalFlowFileSize, request. - getRemoteHost(), request.getRemotePort(), foundSubject, transferTime, transferRate, flowFiles}); + logger.info("received {} files/{} bytes from Remote Host: [{}] Port [{}] SubjectDN [{}] in {} milliseconds at a rate of {}; " + + "transferring to 'success': {}", + new Object[]{flowFiles.size(), totalFlowFileSize, request.getRemoteHost(), request.getRemotePort(), foundSubject, transferTime, transferRate, flowFiles}); final ProcessSession session = timeWrapper.getSession(); session.transfer(flowFiles, ListenHTTP.RELATIONSHIP_SUCCESS); @@ -139,12 +125,9 @@ public class ContentAcknowledgmentServlet extends HttpServlet { response.setStatus(HttpServletResponse.SC_OK); response.flushBuffer(); } catch (final Throwable t) { - timeWrapper.getSession(). - rollback(); - logger. - error("received DELETE for HOLD with ID {} from Remote Host: [{}] Port [{}] SubjectDN [{}], but failed to process the request due to {}", - new Object[]{uuid, request.getRemoteHost(), request. - getRemotePort(), foundSubject, t.toString()}); + timeWrapper.getSession().rollback(); + logger.error("received DELETE for HOLD with ID {} from Remote Host: [{}] Port [{}] SubjectDN [{}], but failed to process the request due to {}", + new Object[]{uuid, request.getRemoteHost(), request.getRemotePort(), foundSubject, t.toString()}); if (logger.isDebugEnabled()) { logger.error("", t); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d29a2d68/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 7e2338a..81986ba 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -103,20 +103,13 @@ public class ListenHTTPServlet extends HttpServlet { @Override public void init(final ServletConfig config) throws ServletException { final ServletContext context = config.getServletContext(); - this.logger = (ProcessorLog) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); - this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER); - this.processContext = (ProcessContext) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER); - this.authorizedPattern = (Pattern) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); - this.headerPattern = (Pattern) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); - this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); - this.streamThrottler = (StreamThrottler) context. - getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); + this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); + this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER); + this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER); + this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); + this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); + this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); + this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); } @Override @@ -148,15 +141,12 @@ public class ListenHTTPServlet extends HttpServlet { try { final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE; if (n == 0 || !spaceAvailable.get()) { - if (context.getAvailableRelationships(). - isEmpty()) { + if (context.getAvailableRelationships().isEmpty()) { spaceAvailable.set(false); if (logger.isDebugEnabled()) { - logger.debug("Received request from " + request. - getRemoteHost() + " but no space available; Indicating Service Unavailable"); + logger.debug("Received request from " + request.getRemoteHost() + " but no space available; Indicating Service Unavailable"); } - response. - sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); return; } else { spaceAvailable.set(true); @@ -164,32 +154,24 @@ public class ListenHTTPServlet extends HttpServlet { } response.setHeader("Content-Type", MediaType.TEXT_PLAIN); - final boolean contentGzipped = Boolean.parseBoolean(request. - getHeader(GZIPPED_HEADER)); + final boolean contentGzipped = Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER)); - final X509Certificate[] certs = (X509Certificate[]) request. - getAttribute("javax.servlet.request.X509Certificate"); + final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); foundSubject = DEFAULT_FOUND_SUBJECT; if (certs != null && certs.length > 0) { for (final X509Certificate cert : certs) { - foundSubject = cert.getSubjectDN(). - getName(); - if (authorizedPattern.matcher(foundSubject). - matches()) { + foundSubject = cert.getSubjectDN().getName(); + if (authorizedPattern.matcher(foundSubject).matches()) { break; } else { - logger. - warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + request. - getRemoteHost()); - response. - sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn"); + logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + request.getRemoteHost()); + response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn"); return; } } } - final String destinationVersion = request. - getHeader(PROTOCOL_VERSION_HEADER); + final String destinationVersion = request.getHeader(PROTOCOL_VERSION_HEADER); Integer protocolVersion = null; if (destinationVersion != null) { try { @@ -200,19 +182,15 @@ public class ListenHTTPServlet extends HttpServlet { } final boolean destinationIsLegacyNiFi = (protocolVersion == null); - final boolean createHold = Boolean.parseBoolean(request. - getHeader(FLOWFILE_CONFIRMATION_HEADER)); + final boolean createHold = Boolean.parseBoolean(request.getHeader(FLOWFILE_CONFIRMATION_HEADER)); final String contentType = request.getContentType(); - final InputStream unthrottled = contentGzipped ? new GZIPInputStream(request. - getInputStream()) : request.getInputStream(); + final InputStream unthrottled = contentGzipped ? new GZIPInputStream(request.getInputStream()) : request.getInputStream(); - final InputStream in = (streamThrottler == null) ? unthrottled : streamThrottler. - newThrottledInputStream(unthrottled); + final InputStream in = (streamThrottler == null) ? unthrottled : streamThrottler.newThrottledInputStream(unthrottled); if (logger.isDebugEnabled()) { - logger. - debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped); + logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped); } final AtomicBoolean hasMoreData = new AtomicBoolean(false); @@ -241,21 +219,16 @@ public class ListenHTTPServlet extends HttpServlet { IOUtils.copy(in, bos); hasMoreData.set(false); } else { - attributes.putAll(unpackager. - unpackageFlowFile(in, bos)); + attributes.putAll(unpackager.unpackageFlowFile(in, bos)); if (destinationIsLegacyNiFi) { if (attributes.containsKey("nf.file.name")) { // for backward compatibility with old nifi... - attributes.put(CoreAttributes.FILENAME. - key(), attributes. - remove("nf.file.name")); + attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name")); } if (attributes.containsKey("nf.file.path")) { - attributes. - put(CoreAttributes.PATH.key(), attributes. - remove("nf.file.path")); + attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path")); } } @@ -269,12 +242,10 @@ public class ListenHTTPServlet extends HttpServlet { }); final long transferNanos = System.nanoTime() - startNanos; - final long transferMillis = TimeUnit.MILLISECONDS. - convert(transferNanos, TimeUnit.NANOSECONDS); + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); // put metadata on flowfile - final String nameVal = request. - getHeader(CoreAttributes.FILENAME.key()); + final String nameVal = request.getHeader(CoreAttributes.FILENAME.key()); if (StringUtils.isNotBlank(nameVal)) { attributes.put(CoreAttributes.FILENAME.key(), nameVal); } @@ -283,31 +254,24 @@ public class ListenHTTPServlet extends HttpServlet { for (Enumeration<String> headerEnum = request.getHeaderNames(); headerEnum.hasMoreElements();) { String headerName = headerEnum.nextElement(); - if (headerPattern != null && headerPattern. - matcher(headerName). - matches()) { + if (headerPattern != null && headerPattern.matcher(headerName).matches()) { String headerValue = request.getHeader(headerName); attributes.put(headerName, headerValue); } } - String sourceSystemFlowFileIdentifier = attributes. - get(CoreAttributes.UUID.key()); + String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key()); if (sourceSystemFlowFileIdentifier != null) { sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier; // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event - attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID(). - toString()); + attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); } flowFile = session.putAllAttributes(flowFile, attributes); - session.getProvenanceReporter(). - receive(flowFile, request.getRequestURL(). - toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis); - flowFile = session. - putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject); + session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis); + flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject); flowFileSet.add(flowFile); if (holdUuid == null) { @@ -316,45 +280,34 @@ public class ListenHTTPServlet extends HttpServlet { } while (hasMoreData.get()); if (createHold) { - String uuid = (holdUuid == null) ? UUID.randomUUID(). - toString() : holdUuid; + String uuid = (holdUuid == null) ? UUID.randomUUID().toString() : holdUuid; if (flowFileMap.containsKey(uuid)) { - uuid = UUID.randomUUID(). - toString(); + uuid = UUID.randomUUID().toString(); } - final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System. - currentTimeMillis()); + final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis()); FlowFileEntryTimeWrapper previousWrapper; do { previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper); if (previousWrapper != null) { - uuid = UUID.randomUUID(). - toString(); + uuid = UUID.randomUUID().toString(); } } while (previousWrapper != null); response.setStatus(HttpServletResponse.SC_SEE_OTHER); final String ackUri = ListenHTTP.URI + "/holds/" + uuid; response.addHeader(LOCATION_HEADER_NAME, ackUri); - response. - addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); - response.getOutputStream(). - write(ackUri.getBytes("UTF-8")); + response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); + response.getOutputStream().write(ackUri.getBytes("UTF-8")); if (logger.isDebugEnabled()) { - logger. - debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}", - new Object[]{flowFileSet, request. - getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet. - size(), uuid}); + logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}", + new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid}); } } else { response.setStatus(HttpServletResponse.SC_OK); - logger. - info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}", - new Object[]{request.getRemoteHost(), request. - getRemotePort(), foundSubject, flowFile}); + logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}", + new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile}); session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS); session.commit(); @@ -362,16 +315,13 @@ public class ListenHTTPServlet extends HttpServlet { } catch (final Throwable t) { session.rollback(); if (flowFile == null) { - logger. - error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request. - getRemoteHost(), foundSubject, t}); + logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", + new Object[]{request.getRemoteHost(), foundSubject, t}); } else { - logger. - error("Unable to receive file {} from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{flowFile, request. - getRemoteHost(), foundSubject, t}); + logger.error("Unable to receive file {} from Remote Host: [{}] SubjectDN [{}] due to {}", + new Object[]{flowFile, request.getRemoteHost(), foundSubject, t}); } - response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t. - toString()); + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString()); } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d29a2d68/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/Bin.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/Bin.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/Bin.java index aa5cdc3..c9d906d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/Bin.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/Bin.java @@ -46,8 +46,7 @@ public class Bin { * @param minEntries * @param maxEntries * @param fileCountAttribute - * @throws IllegalArgumentException if the min is not less than or equal to - * the max. + * @throws IllegalArgumentException if the min is not less than or equal to the max. */ public Bin(final long minSizeBytes, final long maxSizeBytes, final int minEntries, final int maxEntries, final String fileCountAttribute) { this.minimumSizeBytes = minSizeBytes; @@ -63,11 +62,8 @@ public class Bin { } /** - * Indicates whether the bin has enough items to be considered full. This is - * based on whether the current size of the bin is greater than the minimum - * size in bytes and based on having a number of successive unsuccessful - * attempts to add a new item (because it is so close to the max or the size - * of the objects being attempted do not favor tight packing) + * Indicates whether the bin has enough items to be considered full. This is based on whether the current size of the bin is greater than the minimum size in bytes and based on having a number of + * successive unsuccessful attempts to add a new item (because it is so close to the max or the size of the objects being attempted do not favor tight packing) * * @return true if considered full; false otherwise */ @@ -90,8 +86,7 @@ public class Bin { * * @param duration * @param unit - * @return true if this bin is older than the length of time given; false - * otherwise + * @return true if this bin is older than the length of time given; false otherwise */ public boolean isOlderThan(final int duration, final TimeUnit unit) { final long ageInNanos = System.nanoTime() - creationMomentEpochNs; @@ -109,16 +104,14 @@ public class Bin { } /** - * If this bin has enough room for the size of the given flow file then it - * is added otherwise it is not + * If this bin has enough room for the size of the given flow file then it is added otherwise it is not * * @param flowFile * @param session the ProcessSession to which the FlowFile belongs * @return true if added; false otherwise */ public boolean offer(final FlowFile flowFile, final ProcessSession session) { - if (((size + flowFile.getSize()) > maximumSizeBytes) || (binContents. - size() >= maximumEntries)) { + if (((size + flowFile.getSize()) > maximumSizeBytes) || (binContents.size() >= maximumEntries)) { successiveFailedOfferings++; return false; } @@ -144,8 +137,7 @@ public class Bin { if (value == null) { return null; } - if (!intPattern.matcher(value). - matches()) { + if (!intPattern.matcher(value).matches()) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d29a2d68/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/BinManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/BinManager.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/BinManager.java index eeadfa6..9d0e857 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/BinManager.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/BinManager.java @@ -60,10 +60,8 @@ public class BinManager { try { for (final List<Bin> binList : groupBinMap.values()) { for (final Bin bin : binList) { - for (final FlowFileSessionWrapper wrapper : bin. - getContents()) { - wrapper.getSession(). - rollback(); + for (final FlowFileSessionWrapper wrapper : bin.getContents()) { + wrapper.getSession().rollback(); } } } @@ -108,15 +106,12 @@ public class BinManager { } /** - * Adds the given flowFile to the first available bin in which it fits for - * the given group or creates a new bin in the specified group if necessary. + * Adds the given flowFile to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary. * <p/> - * @param groupIdentifier the group to which the flow file belongs; can be - * null + * @param groupIdentifier the group to which the flow file belongs; can be null * @param flowFile the flow file to bin * @param session the ProcessSession to which the FlowFile belongs - * @return true if added; false if no bin exists which can fit this item and - * no bin can be created based on current min/max criteria + * @return true if added; false if no bin exists which can fit this item and no bin can be created based on current min/max criteria */ public boolean offer(final String groupIdentifier, final FlowFile flowFile, final ProcessSession session) { final long currentMaxSizeBytes = maxSizeBytes.get(); @@ -128,8 +123,7 @@ public class BinManager { final List<Bin> currentBins = groupBinMap.get(groupIdentifier); if (currentBins == null) { // this is a new group we need to register final List<Bin> bins = new ArrayList<>(); - final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries. - get(), maxEntries.get(), fileCountAttribute.get()); + final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), maxEntries.get(), fileCountAttribute.get()); bins.add(bin); groupBinMap.put(groupIdentifier, bins); binCount++; @@ -143,8 +137,7 @@ public class BinManager { } //if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one - final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries. - get(), maxEntries.get(), fileCountAttribute.get()); + final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), maxEntries.get(), fileCountAttribute.get()); currentBins.add(bin); binCount++; return bin.offer(flowFile, session); @@ -155,12 +148,10 @@ public class BinManager { } /** - * Finds all bins that are considered full and removes them from the - * manager. + * Finds all bins that are considered full and removes them from the manager. * <p/> - * @param relaxFullnessConstraint if false will require bins to be full - * before considered ready; if true bins only have to meet their minimum - * size criteria or be 'old' and then they'll be considered ready + * @param relaxFullnessConstraint if false will require bins to be full before considered ready; if true bins only have to meet their minimum size criteria or be 'old' and then they'll be + * considered ready * @return */ public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) { @@ -169,12 +160,10 @@ public class BinManager { wLock.lock(); try { - for (final Map.Entry<String, List<Bin>> group : groupBinMap. - entrySet()) { + for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) { final List<Bin> remainingBins = new ArrayList<>(); for (final Bin bin : group.getValue()) { - if (relaxFullnessConstraint && (bin.isFullEnough() || bin. - isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS))) { //relaxed check + if (relaxFullnessConstraint && (bin.isFullEnough() || bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS))) { //relaxed check readyBins.add(bin); } else if (!relaxFullnessConstraint && bin.isFull()) { //strict check readyBins.add(bin); @@ -201,8 +190,7 @@ public class BinManager { Bin oldestBin = null; String oldestBinGroup = null; - for (final Map.Entry<String, List<Bin>> group : groupBinMap. - entrySet()) { + for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) { for (final Bin bin : group.getValue()) { if (oldestBin == null || bin.isOlderThan(oldestBin)) { oldestBin = bin; @@ -235,8 +223,7 @@ public class BinManager { try { for (final List<Bin> bins : groupBinMap.values()) { for (final Bin bin : bins) { - if (bin. - isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) { + if (bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) { return true; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d29a2d68/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java index 3131f40..8520813 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java @@ -36,8 +36,7 @@ public class DocumentReaderCallback implements InputStreamCallback { /** * Creates a new DocumentReaderCallback . * - * @param isNamespaceAware Whether or not the parse should consider - * namespaces + * @param isNamespaceAware Whether or not the parse should consider namespaces */ public DocumentReaderCallback(boolean isNamespaceAware) { this.isNamespaceAware = isNamespaceAware; @@ -52,8 +51,7 @@ public class DocumentReaderCallback implements InputStreamCallback { @Override public void process(final InputStream stream) throws IOException { try { - DocumentBuilderFactory factory = DocumentBuilderFactory. - newInstance(); + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); factory.setNamespaceAware(isNamespaceAware); DocumentBuilder builder = factory.newDocumentBuilder(); document = builder.parse(stream);
