Hi,

First NiFi processor which use modified CSV adapter is done.
https://issues.apache.org/jira/browse/NIFI-1280

Modified CSV adapter is included in NiFi code, hopefully this is not a
problem.

Any feedback is highly welcome.

Thanks
Toivo


2016-05-05 23:26 GMT+03:00 Toivo Adams <[email protected]>:

> Hi,
>
> This is a work in progress.
> Far from ready and done.
>
> I will try create first NiFi processor using current modified Calcite CSV
> adapter.
> Hopefully NiFi community will accept idea to use SQL to specify how CSV
> (or any other data format) data should be transformed.
>
> Next step would be to implement right Calcite adapter.
> As you said current modified CSV adapter faked out Calcite to believe that
> InputStream is the Orders *table*
>
> It seems current modified CSV adapter is not very useful and not something
> we can consider Calcite official adapter, its only prototype.
>
> I will inform You when I have done NiFi processor.
> Hopefully first version is ready within few days.
> I am very grateful to receive any feedback.
>
> I don't have any plans to use any other platforms (Samza, Flink, Storm
> etc.)
> NiFi itself should cover everything needed.
>
> And to support advanced features, of course much better Calcite adapter
> implementation is needed.
> And after CSV, next is probably Avro
>
> I am not sure I am able to implement correct Calcite streaming adapter,
> because of limited knowledge.
> But if no one else has time to do it, I may try.
>
> And finally I used 'stream' and 'streaming' very loosely and this was
> confusing.
> Sorry.
>
> Thanks
> toivo
>
> 2016-05-05 22:47 GMT+03:00 Julian Hyde <[email protected]>:
>
>> It sounds to me as if you will be able to get something half-baked on
>> InputStream working easily, but it won't support parallelism,
>> distribution, reliability, fail-over, and replay.
>>
>> I don't have a problem with that. You can write your application now
>> and re-deploy on more a robust streaming SQL implementation (Samza,
>> Flink, Storm etc.) later.
>>
>> But be sure that you are using the "stream" keyword in your queries, e.g.
>>
>>   select stream * from Orders where units > 10
>>
>> I have a feeling that you have faked out Calcite to believe that your
>> InputStream is the Orders *table*, whereas you should be making
>> Calcite believe that your InputStrema is the Orders *stream*, to that
>> if you ever were to write
>>
>>   select count(*) from Orders
>>
>> Calcite would be able to go somewhere else for the historical table.
>>
>> Julian
>>
>>
>> On Wed, May 4, 2016 at 10:44 AM, Toivo Adams <[email protected]>
>> wrote:
>> > Its
>> > java.io.InputStream
>> >
>> > Actually it was not very difficult to modify CSV adapter to accept
>> stream
>> > instead of file.
>> > But of course, current POC may have some problems which are not shown up
>> > yet.
>> >
>> > CSVReader accept stream reader also.
>> >
>> >
>> >
>> > 2016-05-04 20:28 GMT+03:00 Julian Hyde <[email protected]>:
>> >
>> >> Can you clarify what you mean by Stream? Do you mean
>> >> java.util.Stream<String>?
>> >>
>> >> The Csv adapter is first and foremost a file adapter. It might be
>> easier
>> >> to create a stream adapter and make it parse csv than start with a file
>> >> adapter and make it handle streams.
>> >>
>> >> > On May 4, 2016, at 10:07 AM, Toivo Adams <[email protected]>
>> wrote:
>> >> >
>> >> > Julian,
>> >> >
>> >> > Thank You for Your suggestions.
>> >> >
>> >> > I don't want to monitor file and read appended records.
>> >> > Initially I want to read from in-memory stream.
>> >> > Such a stream can be very, very large and doesn't fit in memory.
>> >> >
>> >> > My idea is to create NiFi processor which uses SQL for data
>> manipulation.
>> >> > https://nifi.apache.org/
>> >> >
>> >> > NiFi already contains large set of processors which filter, split,
>> route,
>> >> > etc. different data.
>> >> > Data can be CSV, JSON, Avro, whatever.
>> >> >
>> >> > Different processors use different parameters how data should be
>> >> filtered,
>> >> > split, routed, etc.
>> >> > I think it would be nice to able to use SQL statement to specify how
>> data
>> >> > should be filtered, split, etc.
>> >> > Because NiFi is able to use very big data sets (called FlowFile in
>> Nifi),
>> >> > streaming is as must.
>> >> >
>> >> > I created very simple of POC how to use Stream instead of File.
>> >> > I just created new modified versions of
>> >> > src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
>> >> > src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
>> >> > src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
>> >> > src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
>> >> >
>> src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
>> >> >
>> >> > CALCITE-1227
>> >> > describes little bit different use case.
>> >> >
>> >> > I am ready to contribute back, but my Calcite knowledge is very
>> limited.
>> >> > So current POC is more like a hack and not good code.
>> >> > Should I upload my current POC files to CALCITE-1227
>> >> > or it is better to create another issue?
>> >> >
>> >> > Thanks
>> >> > toivo
>> >> >
>> >> >
>> >> > 2016-05-04 19:02 GMT+03:00 Julian Hyde <[email protected]>:
>> >> >
>> >> >> I’ve logged https://issues.apache.org/jira/browse/CALCITE-1227 <
>> >> >> https://issues.apache.org/jira/browse/CALCITE-1227>. Feel free to
>> start
>> >> >> implementing it!
>> >> >>
>> >> >>> On May 4, 2016, at 8:56 AM, Julian Hyde <[email protected]> wrote:
>> >> >>>
>> >> >>> It’s not straightforward to re-use a table adapter as a stream
>> adapter.
>> >> >> The reason is that one query might want to see the past (the current
>> >> >> contents of the table) and another query might want to see the
>> future
>> >> (the
>> >> >> stream of records added from this point on).
>> >> >>>
>> >> >>> I’m guessing that you want something like the CSV adapter that
>> watches
>> >> a
>> >> >> file and reports records added to the end of the file (like the tail
>> >> >> command[1]).
>> >> >>>
>> >> >>> You’d have to change CsvTable to implement StreamableTable, and
>> >> >> implement the ‘Table stream()’ method to return a variant of the
>> table
>> >> that
>> >> >> is in “follow” mode.
>> >> >>>
>> >> >>> It would probably be implemented by a variant of CsvEnumerator,
>> but it
>> >> >> is getting its input in bursts, as the file is appended to.
>> >> >>>
>> >> >>> Hope that helps.
>> >> >>>
>> >> >>> Julian
>> >> >>>
>> >> >>> [1] https://en.wikipedia.org/wiki/Tail_(Unix) <
>> >> >> https://en.wikipedia.org/wiki/Tail_(Unix)>
>> >> >>>
>> >> >>>> On May 2, 2016, at 3:15 AM, Toivo Adams <[email protected]
>> >> <mailto:
>> >> >> [email protected]>> wrote:
>> >> >>>>
>> >> >>>> Hi,
>> >> >>>>
>> >> >>>> One possibility is to modify CsvEnumerator
>> >> >>>> Opinions?
>> >> >>>>
>> >> >>>> Thanks
>> >> >>>> Toivo
>> >> >>>>
>> >> >>>> 2016-05-01 18:35 GMT+03:00 Toivo Adams <[email protected]
>> >> <mailto:
>> >> >> [email protected]>>:
>> >> >>>>
>> >> >>>>> Hi,
>> >> >>>>>
>> >> >>>>> Please help newbie.
>> >> >>>>> CSV works well reading files, but I want to read data from
>> stream.
>> >> >>>>> Data is not fixed length, may be endless stream.
>> >> >>>>>
>> >> >>>>> Any ideas how to accomplish this?
>> >> >>>>> Should I try to modify CsvTranslatableTable?
>> >> >>>>> Or should I take Cassandra adapter as example?
>> >> >>>>>
>> >> >>>>> Initially data will be CSV but later Avro is also good candidate.
>> >> >>>>>
>> >> >>>>> Thanks
>> >> >>>>> Toivo
>> >> >>>>>
>> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>
>>
>
>

Reply via email to