Github user markap14 commented on the pull request:
https://github.com/apache/nifi/pull/420#issuecomment-218531630
@ToivoAdams After looking at this a bit more, I've got a few very
high-level thoughts. First is that what you have built here is incredibly
awesome and powerful! I think FilterCSVColumns is the wrong name for this
Processor - it should be perhaps TransformCSV, as leveraging Calcite allows us
to do some pretty powerful queries, such as "SELECT * FROM CSV.A WHERE AMOUNT2
< 99" and I imagine that this type of use case will be very common.
I do have a concern, though, which is that if you attempt to perform a JOIN
operation, for example "SELECT X.AMOUNT2, X.AMOUNT3 FROM CSV.A as X JOIN CSV.A
AS Y ON X.AMOUNT2=Y.AMOUNT2" we end up with an IOException: Stream closed. This
is because Calcite will have to read the data multiple times in order to
perform the JOIN. I think we can get around this by changing the
CsvSchemaFactory2 to be something like CsvInputStreamFactory, and that class,
rather than receiving the InputStream directly would be passed the FlowFIle and
ProcessSession and could create the InputStream on-demand. This would allow the
data to be read multiple times by creating two InputStream's. The nice thing is
that if this runs on a system with sufficient RAM the Operating System's disk
cache will generally mean that we don't even have to read the data for the
second pass unless it's a really massive amount of CSV.
Additionally, I think we need to have some sort of validator for the SQL
Select Statement property, as right now if the query is invalid, the processor
is valid and just routes everything to failure.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---