Hi,

This is a work in progress.
Far from ready and done.

I will try create first NiFi processor using current modified Calcite CSV
adapter.
Hopefully NiFi community will accept idea to use SQL to specify how CSV (or
any other data format) data should be transformed.

Next step would be to implement right Calcite adapter.
As you said current modified CSV adapter faked out Calcite to believe that
InputStream is the Orders *table*

It seems current modified CSV adapter is not very useful and not something
we can consider Calcite official adapter, its only prototype.

I will inform You when I have done NiFi processor.
Hopefully first version is ready within few days.
I am very grateful to receive any feedback.

I don't have any plans to use any other platforms (Samza, Flink, Storm etc.)
NiFi itself should cover everything needed.

And to support advanced features, of course much better Calcite adapter
implementation is needed.
And after CSV, next is probably Avro

I am not sure I am able to implement correct Calcite streaming adapter,
because of limited knowledge.
But if no one else has time to do it, I may try.

And finally I used 'stream' and 'streaming' very loosely and this was
confusing.
Sorry.

Thanks
toivo

2016-05-05 22:47 GMT+03:00 Julian Hyde <[email protected]>:

> It sounds to me as if you will be able to get something half-baked on
> InputStream working easily, but it won't support parallelism,
> distribution, reliability, fail-over, and replay.
>
> I don't have a problem with that. You can write your application now
> and re-deploy on more a robust streaming SQL implementation (Samza,
> Flink, Storm etc.) later.
>
> But be sure that you are using the "stream" keyword in your queries, e.g.
>
>   select stream * from Orders where units > 10
>
> I have a feeling that you have faked out Calcite to believe that your
> InputStream is the Orders *table*, whereas you should be making
> Calcite believe that your InputStrema is the Orders *stream*, to that
> if you ever were to write
>
>   select count(*) from Orders
>
> Calcite would be able to go somewhere else for the historical table.
>
> Julian
>
>
> On Wed, May 4, 2016 at 10:44 AM, Toivo Adams <[email protected]>
> wrote:
> > Its
> > java.io.InputStream
> >
> > Actually it was not very difficult to modify CSV adapter to accept stream
> > instead of file.
> > But of course, current POC may have some problems which are not shown up
> > yet.
> >
> > CSVReader accept stream reader also.
> >
> >
> >
> > 2016-05-04 20:28 GMT+03:00 Julian Hyde <[email protected]>:
> >
> >> Can you clarify what you mean by Stream? Do you mean
> >> java.util.Stream<String>?
> >>
> >> The Csv adapter is first and foremost a file adapter. It might be easier
> >> to create a stream adapter and make it parse csv than start with a file
> >> adapter and make it handle streams.
> >>
> >> > On May 4, 2016, at 10:07 AM, Toivo Adams <[email protected]>
> wrote:
> >> >
> >> > Julian,
> >> >
> >> > Thank You for Your suggestions.
> >> >
> >> > I don't want to monitor file and read appended records.
> >> > Initially I want to read from in-memory stream.
> >> > Such a stream can be very, very large and doesn't fit in memory.
> >> >
> >> > My idea is to create NiFi processor which uses SQL for data
> manipulation.
> >> > https://nifi.apache.org/
> >> >
> >> > NiFi already contains large set of processors which filter, split,
> route,
> >> > etc. different data.
> >> > Data can be CSV, JSON, Avro, whatever.
> >> >
> >> > Different processors use different parameters how data should be
> >> filtered,
> >> > split, routed, etc.
> >> > I think it would be nice to able to use SQL statement to specify how
> data
> >> > should be filtered, split, etc.
> >> > Because NiFi is able to use very big data sets (called FlowFile in
> Nifi),
> >> > streaming is as must.
> >> >
> >> > I created very simple of POC how to use Stream instead of File.
> >> > I just created new modified versions of
> >> > src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
> >> > src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
> >> > src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
> >> > src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
> >> >
> src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
> >> >
> >> > CALCITE-1227
> >> > describes little bit different use case.
> >> >
> >> > I am ready to contribute back, but my Calcite knowledge is very
> limited.
> >> > So current POC is more like a hack and not good code.
> >> > Should I upload my current POC files to CALCITE-1227
> >> > or it is better to create another issue?
> >> >
> >> > Thanks
> >> > toivo
> >> >
> >> >
> >> > 2016-05-04 19:02 GMT+03:00 Julian Hyde <[email protected]>:
> >> >
> >> >> I’ve logged https://issues.apache.org/jira/browse/CALCITE-1227 <
> >> >> https://issues.apache.org/jira/browse/CALCITE-1227>. Feel free to
> start
> >> >> implementing it!
> >> >>
> >> >>> On May 4, 2016, at 8:56 AM, Julian Hyde <[email protected]> wrote:
> >> >>>
> >> >>> It’s not straightforward to re-use a table adapter as a stream
> adapter.
> >> >> The reason is that one query might want to see the past (the current
> >> >> contents of the table) and another query might want to see the future
> >> (the
> >> >> stream of records added from this point on).
> >> >>>
> >> >>> I’m guessing that you want something like the CSV adapter that
> watches
> >> a
> >> >> file and reports records added to the end of the file (like the tail
> >> >> command[1]).
> >> >>>
> >> >>> You’d have to change CsvTable to implement StreamableTable, and
> >> >> implement the ‘Table stream()’ method to return a variant of the
> table
> >> that
> >> >> is in “follow” mode.
> >> >>>
> >> >>> It would probably be implemented by a variant of CsvEnumerator, but
> it
> >> >> is getting its input in bursts, as the file is appended to.
> >> >>>
> >> >>> Hope that helps.
> >> >>>
> >> >>> Julian
> >> >>>
> >> >>> [1] https://en.wikipedia.org/wiki/Tail_(Unix) <
> >> >> https://en.wikipedia.org/wiki/Tail_(Unix)>
> >> >>>
> >> >>>> On May 2, 2016, at 3:15 AM, Toivo Adams <[email protected]
> >> <mailto:
> >> >> [email protected]>> wrote:
> >> >>>>
> >> >>>> Hi,
> >> >>>>
> >> >>>> One possibility is to modify CsvEnumerator
> >> >>>> Opinions?
> >> >>>>
> >> >>>> Thanks
> >> >>>> Toivo
> >> >>>>
> >> >>>> 2016-05-01 18:35 GMT+03:00 Toivo Adams <[email protected]
> >> <mailto:
> >> >> [email protected]>>:
> >> >>>>
> >> >>>>> Hi,
> >> >>>>>
> >> >>>>> Please help newbie.
> >> >>>>> CSV works well reading files, but I want to read data from stream.
> >> >>>>> Data is not fixed length, may be endless stream.
> >> >>>>>
> >> >>>>> Any ideas how to accomplish this?
> >> >>>>> Should I try to modify CsvTranslatableTable?
> >> >>>>> Or should I take Cassandra adapter as example?
> >> >>>>>
> >> >>>>> Initially data will be CSV but later Avro is also good candidate.
> >> >>>>>
> >> >>>>> Thanks
> >> >>>>> Toivo
> >> >>>>>
> >> >>>
> >> >>
> >> >>
> >>
> >>
>

Reply via email to