Hi, First NiFi processor which use modified CSV adapter is done. https://issues.apache.org/jira/browse/NIFI-1280
Modified CSV adapter is included in NiFi code, hopefully this is not a problem. Any feedback is highly welcome. Thanks Toivo 2016-05-05 23:26 GMT+03:00 Toivo Adams <[email protected]>: > Hi, > > This is a work in progress. > Far from ready and done. > > I will try create first NiFi processor using current modified Calcite CSV > adapter. > Hopefully NiFi community will accept idea to use SQL to specify how CSV > (or any other data format) data should be transformed. > > Next step would be to implement right Calcite adapter. > As you said current modified CSV adapter faked out Calcite to believe that > InputStream is the Orders *table* > > It seems current modified CSV adapter is not very useful and not something > we can consider Calcite official adapter, its only prototype. > > I will inform You when I have done NiFi processor. > Hopefully first version is ready within few days. > I am very grateful to receive any feedback. > > I don't have any plans to use any other platforms (Samza, Flink, Storm > etc.) > NiFi itself should cover everything needed. > > And to support advanced features, of course much better Calcite adapter > implementation is needed. > And after CSV, next is probably Avro > > I am not sure I am able to implement correct Calcite streaming adapter, > because of limited knowledge. > But if no one else has time to do it, I may try. > > And finally I used 'stream' and 'streaming' very loosely and this was > confusing. > Sorry. > > Thanks > toivo > > 2016-05-05 22:47 GMT+03:00 Julian Hyde <[email protected]>: > >> It sounds to me as if you will be able to get something half-baked on >> InputStream working easily, but it won't support parallelism, >> distribution, reliability, fail-over, and replay. >> >> I don't have a problem with that. You can write your application now >> and re-deploy on more a robust streaming SQL implementation (Samza, >> Flink, Storm etc.) later. >> >> But be sure that you are using the "stream" keyword in your queries, e.g. >> >> select stream * from Orders where units > 10 >> >> I have a feeling that you have faked out Calcite to believe that your >> InputStream is the Orders *table*, whereas you should be making >> Calcite believe that your InputStrema is the Orders *stream*, to that >> if you ever were to write >> >> select count(*) from Orders >> >> Calcite would be able to go somewhere else for the historical table. >> >> Julian >> >> >> On Wed, May 4, 2016 at 10:44 AM, Toivo Adams <[email protected]> >> wrote: >> > Its >> > java.io.InputStream >> > >> > Actually it was not very difficult to modify CSV adapter to accept >> stream >> > instead of file. >> > But of course, current POC may have some problems which are not shown up >> > yet. >> > >> > CSVReader accept stream reader also. >> > >> > >> > >> > 2016-05-04 20:28 GMT+03:00 Julian Hyde <[email protected]>: >> > >> >> Can you clarify what you mean by Stream? Do you mean >> >> java.util.Stream<String>? >> >> >> >> The Csv adapter is first and foremost a file adapter. It might be >> easier >> >> to create a stream adapter and make it parse csv than start with a file >> >> adapter and make it handle streams. >> >> >> >> > On May 4, 2016, at 10:07 AM, Toivo Adams <[email protected]> >> wrote: >> >> > >> >> > Julian, >> >> > >> >> > Thank You for Your suggestions. >> >> > >> >> > I don't want to monitor file and read appended records. >> >> > Initially I want to read from in-memory stream. >> >> > Such a stream can be very, very large and doesn't fit in memory. >> >> > >> >> > My idea is to create NiFi processor which uses SQL for data >> manipulation. >> >> > https://nifi.apache.org/ >> >> > >> >> > NiFi already contains large set of processors which filter, split, >> route, >> >> > etc. different data. >> >> > Data can be CSV, JSON, Avro, whatever. >> >> > >> >> > Different processors use different parameters how data should be >> >> filtered, >> >> > split, routed, etc. >> >> > I think it would be nice to able to use SQL statement to specify how >> data >> >> > should be filtered, split, etc. >> >> > Because NiFi is able to use very big data sets (called FlowFile in >> Nifi), >> >> > streaming is as must. >> >> > >> >> > I created very simple of POC how to use Stream instead of File. >> >> > I just created new modified versions of >> >> > src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java >> >> > src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java >> >> > src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java >> >> > src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java >> >> > >> src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java >> >> > >> >> > CALCITE-1227 >> >> > describes little bit different use case. >> >> > >> >> > I am ready to contribute back, but my Calcite knowledge is very >> limited. >> >> > So current POC is more like a hack and not good code. >> >> > Should I upload my current POC files to CALCITE-1227 >> >> > or it is better to create another issue? >> >> > >> >> > Thanks >> >> > toivo >> >> > >> >> > >> >> > 2016-05-04 19:02 GMT+03:00 Julian Hyde <[email protected]>: >> >> > >> >> >> I’ve logged https://issues.apache.org/jira/browse/CALCITE-1227 < >> >> >> https://issues.apache.org/jira/browse/CALCITE-1227>. Feel free to >> start >> >> >> implementing it! >> >> >> >> >> >>> On May 4, 2016, at 8:56 AM, Julian Hyde <[email protected]> wrote: >> >> >>> >> >> >>> It’s not straightforward to re-use a table adapter as a stream >> adapter. >> >> >> The reason is that one query might want to see the past (the current >> >> >> contents of the table) and another query might want to see the >> future >> >> (the >> >> >> stream of records added from this point on). >> >> >>> >> >> >>> I’m guessing that you want something like the CSV adapter that >> watches >> >> a >> >> >> file and reports records added to the end of the file (like the tail >> >> >> command[1]). >> >> >>> >> >> >>> You’d have to change CsvTable to implement StreamableTable, and >> >> >> implement the ‘Table stream()’ method to return a variant of the >> table >> >> that >> >> >> is in “follow” mode. >> >> >>> >> >> >>> It would probably be implemented by a variant of CsvEnumerator, >> but it >> >> >> is getting its input in bursts, as the file is appended to. >> >> >>> >> >> >>> Hope that helps. >> >> >>> >> >> >>> Julian >> >> >>> >> >> >>> [1] https://en.wikipedia.org/wiki/Tail_(Unix) < >> >> >> https://en.wikipedia.org/wiki/Tail_(Unix)> >> >> >>> >> >> >>>> On May 2, 2016, at 3:15 AM, Toivo Adams <[email protected] >> >> <mailto: >> >> >> [email protected]>> wrote: >> >> >>>> >> >> >>>> Hi, >> >> >>>> >> >> >>>> One possibility is to modify CsvEnumerator >> >> >>>> Opinions? >> >> >>>> >> >> >>>> Thanks >> >> >>>> Toivo >> >> >>>> >> >> >>>> 2016-05-01 18:35 GMT+03:00 Toivo Adams <[email protected] >> >> <mailto: >> >> >> [email protected]>>: >> >> >>>> >> >> >>>>> Hi, >> >> >>>>> >> >> >>>>> Please help newbie. >> >> >>>>> CSV works well reading files, but I want to read data from >> stream. >> >> >>>>> Data is not fixed length, may be endless stream. >> >> >>>>> >> >> >>>>> Any ideas how to accomplish this? >> >> >>>>> Should I try to modify CsvTranslatableTable? >> >> >>>>> Or should I take Cassandra adapter as example? >> >> >>>>> >> >> >>>>> Initially data will be CSV but later Avro is also good candidate. >> >> >>>>> >> >> >>>>> Thanks >> >> >>>>> Toivo >> >> >>>>> >> >> >>> >> >> >> >> >> >> >> >> >> >> >> > >
