Jim Hughes created FLINK-39420:
----------------------------------

             Summary: Temporal joins on non-lookup tables in batch mode produce 
a confusing internal error
                 Key: FLINK-39420
                 URL: https://issues.apache.org/jira/browse/FLINK-39420
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Jim Hughes


Temporal joins on non-lookup tables in batch mode produce a confusing internal 
error

When a user writes a temporal join (FOR SYSTEM_TIME AS OF) against a regular 
(non-lookup) table in batch mode, the query fails with an opaque internal error 
from FlinkDecorrelateProgram:

{code}
org.apache.flink.table.api.TableException: unexpected correlate variable $cor0 
in the plan
{code}

*Root cause:* The batch planner's TEMPORAL_JOIN_REWRITE phase only registers 
lookup join rules (LOOKUP_JOIN_WITH_FILTER / LOOKUP_JOIN_WITHOUT_FILTER). 
Non-lookup temporal joins — event-time joins on versioned tables, 
processing-time joins on non-lookup sources — leave an unresolved 
LogicalCorrelate with a LogicalSnapshot in the plan. This correlate survives 
into the decorrelation phase, which doesn't know how to handle it.

In streaming mode these are handled by the WITH_FILTER / WITHOUT_FILTER rewrite 
rules, which are not registered in the batch rule set.

*Example query that triggers this:*
{code:sql}
SELECT o.amount * r.rate
FROM Orders AS o
  JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r
  ON o.currency = r.currency
{code}

*Proposed fix:* Add a rejection rule to the batch EXPAND_PLAN_RULES that 
matches any LogicalCorrelate + LogicalSnapshot remaining after the lookup join 
rules have run, and throws a clear error:

{quote}
Temporal joins (FOR SYSTEM_TIME AS OF) on regular tables are not supported in 
batch mode. Use a lookup join or switch to streaming mode.
{quote}

The rule can unconditionally reject all remaining Correlate + Snapshot patterns 
— no lookup detection logic is needed because the lookup join rules fire first 
in the same HepProgram and rewrite valid lookup joins before the rejection rule 
sees them.

Generated-by: Claude Opus 4.6 (1M context)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to