Timo Walther created FLINK-37211:
------------------------------------
Summary: Fix field resolution for PARTITION BY/ORDER BY
Key: FLINK-37211
URL: https://issues.apache.org/jira/browse/FLINK-37211
Project: Flink
Issue Type: Sub-task
Components: Table SQL / Planner
Reporter: Timo Walther
Currently, there is an issue in the SqlNode to RelNode conversion around
resolving the PARTITION BY/ORDER BY.
Steps to reproduce:
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java#L119
just change it to
{code}
util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY score, i => 1)");
{code}
Some insights from [~dwysakowicz]:
I think what you see is actually not a problem with scoping.
Actually the scope of SqlRexContext cx is correct in private RexNode
convertTableArgs(SqlRexContext cx, final SqlCall call) {
You convert the entire table function and the cx context has that function's
scope.
I think the logic you have in FlinkConvertletTable better fits into
SqlToRelConverter which can resolve the operands in their own scope. Similarly
as sub queries.
I think what you see is actually not a problem with scoping.
Actually the scope of SqlRexContext cx is correct in private RexNode
convertTableArgs(SqlRexContext cx, final SqlCall call) {
You convert the entire table function and the cx context has that function's
scope.
I think the logic you have in FlinkConvertletTable better fits into
SqlToRelConverter:
https://github.com/apache/flink/blob/55de8d683cc56733c03c11f927f511b2ba712851/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java#L5394
which can resolve the operands in their own scope. Similarly as sub queries.
That's how I managed to compute correct partition keys, but it now fails while
evaluating i => 1: SqlToRelConverter:
{code}
/**
* Converts a non-standard expression.
*
* <p>This method is an extension-point that derived classes can override.
If this method
* returns a null result, the normal expression translation process will
proceed. The default
* implementation always returns null.
*
* @param node Expression
* @param bb Blackboard
* @return null to proceed with the usual expression translation process
*/
protected @Nullable RexNode convertExtendedExpression(SqlNode node,
Blackboard bb) {
if (!(node instanceof SqlCall)) {
return null;
}
final SqlCall call = (SqlCall) node;
final SqlOperator operator = call.getOperator();
if (operator instanceof SqlTableFunction) {
final RelDataType returnType = validator.getValidatedNodeType(node);
final List<RexNode> rewrittenOperands = new ArrayList<>();
int tableInputCount = 0;
for (int pos = 0; pos < call.getOperandList().size(); pos++) {
final SqlNode operand = call.operand(pos);
if (operand.getKind() == SqlKind.ARGUMENT_ASSIGNMENT
&& ((SqlCall) operand).operand(0).getKind()
== SqlKind.SET_SEMANTICS_TABLE) {
final SqlNode query = ((SqlCall) operand).operand(0);
rewrittenOperands.add(convertSetSemanticsArg(
(SqlCall) query,
tableInputCount++));
} else if (operand.getKind() == SqlKind.SET_SEMANTICS_TABLE) {
rewrittenOperands.add(convertSetSemanticsArg(
(SqlCall) operand,
tableInputCount++));
} else if (operand.isA(SqlKind.QUERY)) {
rewrittenOperands.add(
new RexTableArgCall(
validator.getValidatedNodeType(operand),
tableInputCount++,
new int[0],
new int[0]));
} else {
rewrittenOperands.add(this.convertExpression(operand));
}
}
return rexBuilder.makeCall(returnType, operator, rewrittenOperands);
}
return null;
}
private RexNode convertSetSemanticsArg(SqlCall ptf, int tableInputCount) {
final SqlSelect query = ptf.operand(0);
final SqlNodeList partitionKeys = ptf.operand(1);
final SqlNodeList orderKeys = ptf.operand(2);
Preconditions.checkArgument(
orderKeys.isEmpty(), "Table functions do not support order keys
yet.");
final SqlValidatorScope queryScope = validator.getSelectScope(query);
final Blackboard bb = createBlackboard(queryScope, null, false);
final int[] partitionKeyIndices = getPartitionKeyIndices(bb,
partitionKeys);
return new RexTableArgCall(
validator.getValidatedNodeType(ptf),
tableInputCount,
partitionKeyIndices,
new int[0]);
}
private static int[] getPartitionKeyIndices(SqlRexContext cx, SqlNodeList
partitions) {
final int[] result = new int[partitions.size()];
for (int i = 0; i < partitions.getList().size(); i++) {
final RexNode expr = cx.convertExpression(partitions.get(i));
result[i] = parseFieldIdx(expr);
}
return result;
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)