Yes, but in my opinion, the monotonicity model gets us close enough, or at least gets us headed in the right direction.
Why? Because although we can never be absolutely sure that we have received all of the 09:00 - 10:00 records to emit an hourly total, someone makes a business rule that says that any record arriving more than 5 minutes late will be be ignored or will count towards the next hour. Or that the server creating the sub-total should wait until it has received a "punctuation" signal from all up-stream servers. Those business rules exist, because businesses need to make progress. (My company's finance department is already telling me that the deadline for Q2 expenses is EOB on July 1st.) The policy can be codified in terms of monotonicity, k-sortedness [1], and punctuation [2] (also known as "rowtime bounds" and "current time increment" events). All three of these give a way to say "OK, it's time to emit the 9:00-10:00 sub-total", which is how the system makes progress. The latter two are looser than monotonicity, but tend towards monotonicity in the limit as we reduce k to zero, or increase the frequency of punctuation events. Algebraic reasoning based on monotonicity can be extended to the other models. If we start with the more complex models we'd soon we up to our hubcaps in theoretical mud. Julian [1] http://ilpubs.stanford.edu:8090/560/1/2002-52.pdf [2] http://db.cs.berkeley.edu/cs286/papers/punctuations-tkde2003.pdf On Fri, Jun 26, 2015 at 5:54 PM, Ted Dunning <[email protected]> wrote: > I think that you are headed for trouble with this approach due to the fact > that in real life data, the "monotonic" field often isn't. This happens > because it is important to assign time stamps in a distributed fashion but > records are then subjected to variable transmission delays. > > > > On Fri, Jun 26, 2015 at 6:53 PM, Milinda Pathirage <[email protected]> > wrote: > >> Thanks Julian. I'll give it a try. >> >> Milinda >> >> On Fri, Jun 26, 2015 at 6:14 PM, Julian Hyde <[email protected]> wrote: >> >> > RelNodes know their sort order: call >> > rel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE). That said, >> > in non-streaming planning we start with unsorted relational >> > expressions and deduce sort order later if it is beneficial (e.g. if >> > it allows a sort-merge join) and it seems that we're currently doing >> > that for streaming planning. >> > >> > To see what I'm talking about, run StreamTest.testStreamGroupByHaving >> > and put a break point in LogicalAggregate's constructor. It's traitSet >> > contains only the empty collation. We need to fix that. >> > >> > Julian >> > >> > >> > On Fri, Jun 26, 2015 at 11:40 AM, Milinda Pathirage >> > <[email protected]> wrote: >> > > Hi Yi, >> > > >> > > In this specific case ordering is declared in the schema. Quoting from >> > > Calcite documentation >> > > >> > > ~~~~~~~~~~~~~~~~~~~~ >> > > Monotonic columns need to be declared in the schema. The monotonicity >> is >> > > enforced when records enter the stream and assumed by queries that read >> > > from that stream. We recommend that you give each stream a timestamp >> > column >> > > called rowtime, but you can declare others, orderId, for example. >> > > ~~~~~~~~~~~~~~~~~~~~ >> > > >> > > >> > > >> > > If we can propagate this ordering information to LogicalAggregate then >> we >> > > can easily handle this. As I understand required information is >> > accessible >> > > to Calcite query planner. But in our case we need this information >> after >> > we >> > > get the query plan from Calcite. AFAIK, current API doesn't provide a >> way >> > > to get this information in scenarios like above where ORDER BY is not >> > > specified in the query (I am not 100% sure about ORDER BY case too. I >> > need >> > > to have a look at a query plan generated for a query with ORDER BY). >> > > >> > > Thanks >> > > Milinda >> > > >> > > On Fri, Jun 26, 2015 at 2:30 PM, Yi Pan <[email protected]> wrote: >> > > >> > >> Hi, Milinda, >> > >> >> > >> I thought that in your example, the ordering field is given in GROUP >> BY. >> > >> Are we missing a way to pass the ordering field(s) to the >> > LogicalAggregate? >> > >> >> > >> -Yi >> > >> >> > >> On Fri, Jun 26, 2015 at 10:49 AM, Milinda Pathirage < >> > [email protected] >> > >> > >> > >> wrote: >> > >> >> > >> > Hi Julian, >> > >> > >> > >> > Even though this is a general question across all the streaming >> > >> aggregates >> > >> > which utilize GROUP BY clause and a monotonic timestamp field for >> > >> > specifying the window, but I am going to stick to most basic example >> > >> (which >> > >> > is from Calcite Streaming document). >> > >> > >> > >> > SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime, >> > >> > productId, >> > >> > COUNT(*) AS c, >> > >> > SUM(units) AS units >> > >> > FROM Orders >> > >> > GROUP BY FLOOR(rowtime TO HOUR), productId; >> > >> > >> > >> > I was trying to implement an aggregate operator which handles >> tumbling >> > >> > windows via the monotonic field in GROUP By clause in addition to >> the >> > >> > general aggregations. I went in this path because I thought >> > integrating >> > >> > windowing aspects (at least for tumbling and hopping) into aggregate >> > >> > operator will be easier than trying to extract the window spec from >> > the >> > >> > query plan for a query like above. But I hit a wall when trying to >> > figure >> > >> > out trigger condition for emitting aggregate results. I was >> initially >> > >> > planning to detect new values for FLOOR(rowtime TO HOUR) and emit >> > current >> > >> > aggregate results for previous groups (I was thinking to keep old >> > groups >> > >> > around until we clean them up after a timeout). But when trying to >> > >> > implement this I figured out that I don’t know how to check which >> > GROUP >> > >> BY >> > >> > field is monotonic so that I only detect new values for the >> monotonic >> > >> > field/fields, not for the all the other fields. I think this is not >> a >> > >> > problem for tables because we have the whole input before >> computation >> > and >> > >> > we wait till we are done with the input before emitting the results. >> > >> > >> > >> > With regards to above can you please clarify following things: >> > >> > >> > >> > - Is the method I described above for handling streaming aggregates >> > make >> > >> > sense at all? >> > >> > - Is there a way that I can figure out which fields/expressions in >> > >> > LogicalAggregate are monotonic? >> > >> > - Or can we write a rule to annotate or add extra metadata to >> > >> > LogicalAggregate so that we can get monotonic fields in the GROUP By >> > >> clause >> > >> > >> > >> > Thanks in advance >> > >> > Milinda >> > >> > >> > >> > >> > >> > -- >> > >> > Milinda Pathirage >> > >> > >> > >> > PhD Student | Research Assistant >> > >> > School of Informatics and Computing | Data to Insight Center >> > >> > Indiana University >> > >> > >> > >> > twitter: milindalakmal >> > >> > skype: milinda.pathirage >> > >> > blog: http://milinda.pathirage.org >> > >> > >> > >> >> > > >> > > >> > > >> > > -- >> > > Milinda Pathirage >> > > >> > > PhD Student | Research Assistant >> > > School of Informatics and Computing | Data to Insight Center >> > > Indiana University >> > > >> > > twitter: milindalakmal >> > > skype: milinda.pathirage >> > > blog: http://milinda.pathirage.org >> > >> >> >> >> -- >> Milinda Pathirage >> >> PhD Student | Research Assistant >> School of Informatics and Computing | Data to Insight Center >> Indiana University >> >> twitter: milindalakmal >> skype: milinda.pathirage >> blog: http://milinda.pathirage.org >>
