tpalfy commented on a change in pull request #5446:
URL: https://github.com/apache/nifi/pull/5446#discussion_r728227041



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
##########
@@ -273,13 +310,40 @@ public AllowableValue getAllowableValue() {
     private final AtomicReference<ProcessSessionFactory> 
sessionFactoryReference = new AtomicReference<>();
     private final AtomicReference<StreamThrottler> throttlerRef = new 
AtomicReference<>();
 
+    private volatile boolean isRecordReaderSet;
+    private volatile boolean isRecordWriterSet;
+
     @Override
-    protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
-        List<ValidationResult> results = new ArrayList<>(1);
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (RECORD_READER.equals(descriptor)) {
+            isRecordReaderSet = StringUtils.isNotEmpty(newValue);
+        } else if (RECORD_WRITER.equals(descriptor)) {
+            isRecordWriterSet = StringUtils.isNotEmpty(newValue);
+        }
+    }
 
-        validatePortsAreNotEqual(context, results);
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {

Review comment:
       This validation could be achieved easier by changing the `RECORD_WRITER` 
to depend on the `RECORD_READER` and be `required(true)`

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
##########
@@ -249,6 +263,9 @@ private void handleException(final HttpServletRequest 
request, final HttpServlet
 
     private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, 
ProcessSession session, String foundSubject, String foundIssuer)
             throws IOException, IllegalStateException, ServletException {
+        if (isRecordProcessing()) {
+            logger.debug("Record processing will not be utilized while 
processing multipart request. Request URI: {}", request.getRequestURI());

Review comment:
       I think it would be better to provide a notice on this limitation in the 
`ListenHTTP` processor's `CapabilityDescription`. 

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
##########
@@ -433,6 +423,64 @@ protected void proceedFlow(final HttpServletRequest 
request, final HttpServletRe
         }
     }
 
+    protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest 
request, final ProcessSession session,
+                                                      final String 
foundSubject, final String foundIssuer, FlowFile flowFile) {
+        Map<String, String> attributes = new HashMap<>();
+        addMatchingRequestHeaders(request, attributes);
+        flowFile = session.putAllAttributes(flowFile, attributes);
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.source.host", request.getRemoteHost());
+        flowFile = session.putAttribute(flowFile, "restlistener.request.uri", 
request.getRequestURI());
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.user.dn", foundSubject);
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.issuer.dn", foundIssuer);
+        return flowFile;
+    }
+
+    private void processRecord(InputStream in, FlowFile flowFile, OutputStream 
out) {
+        try (final RecordReader reader = 
readerFactory.createRecordReader(flowFile, new BufferedInputStream(in), 
logger)) {
+            final RecordSet recordSet = reader.createRecordSet();
+            final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+
+            try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, reader.getSchema(), out, flowFile)) {
+                // Reading in records and evaluate script
+                while (pushBackSet.isAnotherRecord()) {
+                    final Record record = pushBackSet.next();
+                    writer.write(record);
+                }
+            }
+        } catch (IOException | MalformedRecordException | 
SchemaNotFoundException e) {

Review comment:
       This exception handling seems too broad. `MalformedRecordException` is 
probably the only one that should warrant a 400.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
##########
@@ -433,6 +423,64 @@ protected void proceedFlow(final HttpServletRequest 
request, final HttpServletRe
         }
     }
 
+    protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest 
request, final ProcessSession session,
+                                                      final String 
foundSubject, final String foundIssuer, FlowFile flowFile) {
+        Map<String, String> attributes = new HashMap<>();
+        addMatchingRequestHeaders(request, attributes);
+        flowFile = session.putAllAttributes(flowFile, attributes);
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.source.host", request.getRemoteHost());
+        flowFile = session.putAttribute(flowFile, "restlistener.request.uri", 
request.getRequestURI());
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.user.dn", foundSubject);
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.issuer.dn", foundIssuer);
+        return flowFile;
+    }
+
+    private void processRecord(InputStream in, FlowFile flowFile, OutputStream 
out) {
+        try (final RecordReader reader = 
readerFactory.createRecordReader(flowFile, new BufferedInputStream(in), 
logger)) {
+            final RecordSet recordSet = reader.createRecordSet();
+            final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+
+            try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, reader.getSchema(), out, flowFile)) {
+                // Reading in records and evaluate script
+                while (pushBackSet.isAnotherRecord()) {
+                    final Record record = pushBackSet.next();
+                    writer.write(record);
+                }
+            }
+        } catch (IOException | MalformedRecordException | 
SchemaNotFoundException e) {
+            logger.error("Could not process record.", e);
+            returnCode = HttpServletResponse.SC_BAD_REQUEST;

Review comment:
       This method may be called from multiple threads when multiple clients 
send requests at the same time. We can't modify this while the servlet is 
running. Instead we may want to have a return value or throw an exception.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
##########
@@ -433,6 +423,64 @@ protected void proceedFlow(final HttpServletRequest 
request, final HttpServletRe
         }
     }
 
+    protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest 
request, final ProcessSession session,
+                                                      final String 
foundSubject, final String foundIssuer, FlowFile flowFile) {
+        Map<String, String> attributes = new HashMap<>();
+        addMatchingRequestHeaders(request, attributes);
+        flowFile = session.putAllAttributes(flowFile, attributes);
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.source.host", request.getRemoteHost());
+        flowFile = session.putAttribute(flowFile, "restlistener.request.uri", 
request.getRequestURI());
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.user.dn", foundSubject);
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.issuer.dn", foundIssuer);
+        return flowFile;
+    }
+
+    private void processRecord(InputStream in, FlowFile flowFile, OutputStream 
out) {
+        try (final RecordReader reader = 
readerFactory.createRecordReader(flowFile, new BufferedInputStream(in), 
logger)) {
+            final RecordSet recordSet = reader.createRecordSet();
+            final PushBackRecordSet pushBackSet = new 
PushBackRecordSet(recordSet);
+
+            try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, reader.getSchema(), out, flowFile)) {
+                // Reading in records and evaluate script
+                while (pushBackSet.isAnotherRecord()) {
+                    final Record record = pushBackSet.next();
+                    writer.write(record);
+                }

Review comment:
       ```suggestion
                   writer.write(recordSet);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to