tpalfy commented on a change in pull request #5446:
URL: https://github.com/apache/nifi/pull/5446#discussion_r728227041
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
##########
@@ -273,13 +310,40 @@ public AllowableValue getAllowableValue() {
private final AtomicReference<ProcessSessionFactory>
sessionFactoryReference = new AtomicReference<>();
private final AtomicReference<StreamThrottler> throttlerRef = new
AtomicReference<>();
+ private volatile boolean isRecordReaderSet;
+ private volatile boolean isRecordWriterSet;
+
@Override
- protected Collection<ValidationResult> customValidate(ValidationContext
context) {
- List<ValidationResult> results = new ArrayList<>(1);
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ if (RECORD_READER.equals(descriptor)) {
+ isRecordReaderSet = StringUtils.isNotEmpty(newValue);
+ } else if (RECORD_WRITER.equals(descriptor)) {
+ isRecordWriterSet = StringUtils.isNotEmpty(newValue);
+ }
+ }
- validatePortsAreNotEqual(context, results);
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
Review comment:
This validation could be achieved easier by changing the `RECORD_WRITER`
to depend on the `RECORD_READER` and be `required(true)`
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
##########
@@ -249,6 +263,9 @@ private void handleException(final HttpServletRequest
request, final HttpServlet
private Set<FlowFile> handleMultipartRequest(HttpServletRequest request,
ProcessSession session, String foundSubject, String foundIssuer)
throws IOException, IllegalStateException, ServletException {
+ if (isRecordProcessing()) {
+ logger.debug("Record processing will not be utilized while
processing multipart request. Request URI: {}", request.getRequestURI());
Review comment:
I think it would be better to provide a notice on this limitation in the
`ListenHTTP` processor's `CapabilityDescription`.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
##########
@@ -433,6 +423,64 @@ protected void proceedFlow(final HttpServletRequest
request, final HttpServletRe
}
}
+ protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest
request, final ProcessSession session,
+ final String
foundSubject, final String foundIssuer, FlowFile flowFile) {
+ Map<String, String> attributes = new HashMap<>();
+ addMatchingRequestHeaders(request, attributes);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.putAttribute(flowFile,
"restlistener.remote.source.host", request.getRemoteHost());
+ flowFile = session.putAttribute(flowFile, "restlistener.request.uri",
request.getRequestURI());
+ flowFile = session.putAttribute(flowFile,
"restlistener.remote.user.dn", foundSubject);
+ flowFile = session.putAttribute(flowFile,
"restlistener.remote.issuer.dn", foundIssuer);
+ return flowFile;
+ }
+
+ private void processRecord(InputStream in, FlowFile flowFile, OutputStream
out) {
+ try (final RecordReader reader =
readerFactory.createRecordReader(flowFile, new BufferedInputStream(in),
logger)) {
+ final RecordSet recordSet = reader.createRecordSet();
+ final PushBackRecordSet pushBackSet = new
PushBackRecordSet(recordSet);
+
+ try (final RecordSetWriter writer =
writerFactory.createWriter(logger, reader.getSchema(), out, flowFile)) {
+ // Reading in records and evaluate script
+ while (pushBackSet.isAnotherRecord()) {
+ final Record record = pushBackSet.next();
+ writer.write(record);
+ }
+ }
+ } catch (IOException | MalformedRecordException |
SchemaNotFoundException e) {
Review comment:
This exception handling seems too broad. `MalformedRecordException` is
probably the only one that should warrant a 400.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
##########
@@ -433,6 +423,64 @@ protected void proceedFlow(final HttpServletRequest
request, final HttpServletRe
}
}
+ protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest
request, final ProcessSession session,
+ final String
foundSubject, final String foundIssuer, FlowFile flowFile) {
+ Map<String, String> attributes = new HashMap<>();
+ addMatchingRequestHeaders(request, attributes);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.putAttribute(flowFile,
"restlistener.remote.source.host", request.getRemoteHost());
+ flowFile = session.putAttribute(flowFile, "restlistener.request.uri",
request.getRequestURI());
+ flowFile = session.putAttribute(flowFile,
"restlistener.remote.user.dn", foundSubject);
+ flowFile = session.putAttribute(flowFile,
"restlistener.remote.issuer.dn", foundIssuer);
+ return flowFile;
+ }
+
+ private void processRecord(InputStream in, FlowFile flowFile, OutputStream
out) {
+ try (final RecordReader reader =
readerFactory.createRecordReader(flowFile, new BufferedInputStream(in),
logger)) {
+ final RecordSet recordSet = reader.createRecordSet();
+ final PushBackRecordSet pushBackSet = new
PushBackRecordSet(recordSet);
+
+ try (final RecordSetWriter writer =
writerFactory.createWriter(logger, reader.getSchema(), out, flowFile)) {
+ // Reading in records and evaluate script
+ while (pushBackSet.isAnotherRecord()) {
+ final Record record = pushBackSet.next();
+ writer.write(record);
+ }
+ }
+ } catch (IOException | MalformedRecordException |
SchemaNotFoundException e) {
+ logger.error("Could not process record.", e);
+ returnCode = HttpServletResponse.SC_BAD_REQUEST;
Review comment:
This method may be called from multiple threads when multiple clients
send requests at the same time. We can't modify this while the servlet is
running. Instead we may want to have a return value or throw an exception.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
##########
@@ -433,6 +423,64 @@ protected void proceedFlow(final HttpServletRequest
request, final HttpServletRe
}
}
+ protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest
request, final ProcessSession session,
+ final String
foundSubject, final String foundIssuer, FlowFile flowFile) {
+ Map<String, String> attributes = new HashMap<>();
+ addMatchingRequestHeaders(request, attributes);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.putAttribute(flowFile,
"restlistener.remote.source.host", request.getRemoteHost());
+ flowFile = session.putAttribute(flowFile, "restlistener.request.uri",
request.getRequestURI());
+ flowFile = session.putAttribute(flowFile,
"restlistener.remote.user.dn", foundSubject);
+ flowFile = session.putAttribute(flowFile,
"restlistener.remote.issuer.dn", foundIssuer);
+ return flowFile;
+ }
+
+ private void processRecord(InputStream in, FlowFile flowFile, OutputStream
out) {
+ try (final RecordReader reader =
readerFactory.createRecordReader(flowFile, new BufferedInputStream(in),
logger)) {
+ final RecordSet recordSet = reader.createRecordSet();
+ final PushBackRecordSet pushBackSet = new
PushBackRecordSet(recordSet);
+
+ try (final RecordSetWriter writer =
writerFactory.createWriter(logger, reader.getSchema(), out, flowFile)) {
+ // Reading in records and evaluate script
+ while (pushBackSet.isAnotherRecord()) {
+ final Record record = pushBackSet.next();
+ writer.write(record);
+ }
Review comment:
```suggestion
writer.write(recordSet);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]