+1 on the refactoring.

I spent some time a while back trying to get a better understanding on the
several rules mentioned here.
Correct me if I were wrong by I was under the impression that the reason
why the rules are split was because AccMode and UpdateMode are the ones
that we care about and the "NeedToRetract" was only the "intermediate"
indicator. I guess that's the part that confuse me the most.

Another thing that confuses me is whether we can mix the modes of operators
and while traversing the plan to pick the "least restrictive" mode, like
@piotr mentioned, if operators can both support upserts or retractions like
in [2b] (the 2nd [2a]).

--
Rong



On Tue, Jun 5, 2018 at 2:35 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> I think the proposed refactoring is a good idea.
> It should simplify the logic to determine which update mode to use.
> We could also try to make some of the method and field names more intuitive
> and extend the internal documentation a bit.
>
> @Hequn, It would be good to get your thoughts on this issue as well. Thank
> you!
>
> While thinking about this issue I noticed a severe bug in how filters
> handle upsert messages.
> I've opened FLINK-9528 [1] for that.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9528
>
> 2018-06-04 10:23 GMT+02:00 Timo Walther <twal...@apache.org>:
>
> > Hi Piotr,
> >
> > thanks for bringing up this discussion. I was not involved in the design
> > discussions at that time but I also find the logic about upserts and
> > retractions in multiple stages quite confusing. So in general +1 for
> > simplification, however, by using a RelShuttle instead of rules we might
> > loose the flexiblity to perform further optimizations by introducing new
> > rules in the future. Users could not change the static logic in a
> > RelShuttle, right now they can influence the behaviour using
> CalciteConfig
> > and custom rules.
> >
> > Regards,
> > Timo
> >
> > Am 01.06.18 um 13:26 schrieb Piotr Nowojski:
> >
> > Hi,
> >>
> >> Recently I was looking into upserts and upserts sources in Flink and
> >> while doing so, I noticed some potential room for
> >> improvement/simplification.
> >>
> >> Currently there are 3 optimiser rules in DataStreamRetractionRules that
> >> work in three stages followed by UniqueKeyExtractor plan node visitor to
> >> set preferred updates mode, with validation for correct keys for
> upserts.
> >> First DataStreamRetractionRules setups UpdateAsRetractionTrait, next in
> >> another rule we use it setup AccModeTrait. AccModeTrait has only two
> values
> >> Acc (upserts) or AccRetract (retractions). This has some severe
> limitations
> >> and requires additional stage of UniqueKeyExtractor (implemented as a
> >> visitor) to actually verify that keys are set correctly.
> >>
> >> I would propose to unify those into one visitor (probably RelShuttle
> >> implementation), that would traverse the plan from root -> leafs. On a
> way
> >> down it would collect preferences of the nodes regarding updates mode
> >> (including keys for upserts). On a way up, it would pick
> >> upsert(keys)/retraction/append only modes or fail if that was impossible
> >> [1].
> >>
> >> I think that would simplify the code by noticeable margin. Instead of
> >> having this logic distributed among 4 classes in two files/independent
> >> steps, it would be in one simple class.
> >>
> >> It would open us a possibility for further improvements. For operators
> >> that could process both upserts or retractions (with before mentioned
> >> solution that decides upsert vs retract in the same step as validating
> >> keys) we could choose upserts if the keys are matching and fallback to
> >> retractions only if they don't. Now it wouldn’t be possible (example
> [2a],
> >> [2b]).
> >>
> >> Thanks Piotrek
> >>
> >> [1] Example impossible case:
> >>
> >> DataStream<Tuple3<Integer, Long, String>> ds1 =
> >> JavaStreamTestData.getSmall3TupleDataSet(env);
> >> Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG) as
> >> a,b,c");
> >>
> >> DataStream<Tuple3<Integer, Long, String>> ds2 =
> >> JavaStreamTestData.getSmall3TupleDataSet(env);
> >> Table t2 = tableEnv.fromDataStream(ds2, "a,b,c");
> >>
> >> Table g1 = t1.groupBy("a").select("a, b.count");
> >> Table g2 = t2.groupBy("b").select("a.count as a, b");
> >>
> >> g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")},
> >> false));
> >>
> >> [2a]
> >>
> >> val t1 = util.addTable[(Long, Long)]('a, 'b)
> >> val t2 = util.addTable[(Long, Long)](‘x, ‘y)
> >>
> >> val g1 = t1.groupBy("a").select("a, b.count")
> >> val g2 = t2.groupBy(“y").select(“x.count, y")
> >>
> >> val resultTable = g1.join(g2, “a=y”)
> >>
> >> `g1.join(g2, “a=y”)` could accept upserts from both sides. Now both are
> >> retractions.
> >>
> >> [2a]
> >>
> >> val t1 = util.addTable[(Long, Long)]('a, 'b)
> >> val t2 = util.addTable[(Long, Long)](‘x, ‘y)
> >>
> >> val g1 = t1.groupBy("a").select("a, b.count")
> >> val g2 = t2.groupBy(“x").select(“x, y.count as y")
> >>
> >> val resultTable = g1.join(g2, “a=y”)
> >>
> >> `g1.join(g2, “a=y”)` could accept upserts from g1 (same key column) but
> >> only retractions from g2 (different key columns). Now both are
> retractions.
> >>
> >>
> >
>

Reply via email to