Hi community,

In BeamSQL, SESSION window is supported in GROUP BY. Example query:

    "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
        + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`, "
        + " SESSION_END(f_timestamp, INTERVAL '5' MINUTE) AS `window_end` "
        + " FROM TABLE_A"
        + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";


However, I observed SESSION_END (window_end) always returns the same
timestamp as what SESSION_START(window_start) returns, so BeamSQL misses
the implementation to SESSION_END. Here is something about the
investigation of root cause and proposed fix:

*Why we are not missing tumble_end and hop_end?*
Because when generating logical plan, Calcite replaces tumble_start and
hop_start with a reference to GROUP BY's TUMBLE/HOP. The GROUP BY's
TUMBLE/HOP is supposed to return a timestamp. Then Calcite replaces
tumble_end and hop_end with a PLUS(timestamp reference, window_size as a
constant). As tumble and hop has a fixed window size as constants in their
function signatures, Calcite generates the PLUS in logical plan, which
means for tumble and hop, we only need a timestamp (which represents
window_start in our implementation) to generate both window_start and
window_end in Projection.

We are emitting window_start timestamp as the result of TUMBLE/HOP/SESSION
functions:
https://github.com/amaliujia/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java#L84



*Why we are missing session_end?*Because Calcite does not know what's the
window size of session window, so in logical plan, Calcite generates a
reference to GROUP BY's SESSION for session_end, as the same as the
reference generated for session_start. So in logical plan, session_start =
session_end. Because in BeamSQL, we don't differentiate session with tumble
and hop, so we returns window start as the result of SESSION function, and
then in the final result, we see session_start = session_end.

*Is this a Calcite bug?*
Yes and No.

Clearly Calcite shouldn't hide window_end by creating a wrong reference in
logical plan. If Calcite does not know what's session_end, it should at
least keep it. Ideally Calcite should keep window_end in logical plan and
let us decide what it means: either a reference or a PLUS or something else.

However, Calcite leaves space for us to add the window_end back in physical
plan nodes. For example, we can add window_end back in BeamAggregationRel.
We can probably change the reference of session_end to a reference to our
window_end in BeamAggregationRel.

*What is the fix?*
In BeamAggregationRel, we should add a window_end right after window
functions. We can emit window_end timestamp for the added field. And in
Projection, we should change window_end from a PLUS (for tumble and hop)
and a wrong reference (for session) to a right reference to the newly added
window_end in Aggregation.

Jira: https://issues.apache.org/jira/browse/BEAM-5843


-Rui

Reply via email to