lincoln-lil commented on code in PR #22978:
URL: https://github.com/apache/flink/pull/22978#discussion_r1264690852
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java:
##########
@@ -80,45 +81,60 @@ public WrapJsonAggFunctionArgumentsRule(Config config) {
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalAggregate aggregate = call.rel(0);
- final AggregateCall aggCall = aggregate.getAggCallList().get(0);
-
final RelNode aggInput = aggregate.getInput();
final RelBuilder relBuilder = call.builder().push(aggInput);
- final List<Integer> affectedArgs = getAffectedArgs(aggCall);
- addProjections(aggregate.getCluster(), relBuilder, affectedArgs);
-
- final TargetMapping argsMapping =
- getAggArgsMapping(aggInput.getRowType().getFieldCount(),
affectedArgs);
-
- final AggregateCall newAggregateCall = aggCall.transform(argsMapping);
- final LogicalAggregate newAggregate =
- aggregate.copy(
- aggregate.getTraitSet(),
- relBuilder.build(),
- aggregate.getGroupSet(),
- aggregate.getGroupSets(),
- Collections.singletonList(newAggregateCall));
-
call.transformTo(newAggregate.withHints(Collections.singletonList(MARKER_HINT)));
+ final LogicalAggregate wrappedAggregate = wrapJsonAggregate(aggregate,
relBuilder);
+
call.transformTo(wrappedAggregate.withHints(Collections.singletonList(MARKER_HINT)));
}
- /**
- * Returns the aggregation's arguments which need to be wrapped.
- *
- * <p>This list is a subset of {@link AggregateCall#getArgList()} as not
every argument may need
- * to be wrapped into a {@link BuiltInFunctionDefinitions#JSON_STRING}
call.
- *
- * <p>Duplicates (e.g. for {@code JSON_OBJECTAGG(f0 VALUE f0)}) are
removed as we only need to
- * wrap them once.
- */
- private List<Integer> getAffectedArgs(AggregateCall aggCall) {
- if (aggCall.getAggregation() instanceof SqlJsonObjectAggAggFunction) {
- // For JSON_OBJECTAGG we only need to wrap its second (= value)
argument
- final int valueIndex = aggCall.getArgList().get(1);
- return Collections.singletonList(valueIndex);
+ private LogicalAggregate wrapJsonAggregate(LogicalAggregate aggregate,
RelBuilder relBuilder) {
+ int inputCount = aggregate.getInput().getRowType().getFieldCount();
+ List<AggregateCall> aggCallList = new
ArrayList<>(aggregate.getAggCallList());
+ // This map is a mapping relationship between jsonObjectAggCall and
the argument index
+ // need to be wrapped into a BuiltInFunctionDefinitions#JSON_STRING.
This map will be used
+ // to create newWrappedArgCallList after creating a new Project.
+ Map<Integer, Integer> wrapIndicesMap = new HashMap<>();
+ for (int i = 0; i < aggCallList.size(); i++) {
+ AggregateCall currentCall = aggCallList.get(i);
+ if (currentCall.getAggregation() instanceof
SqlJsonObjectAggAggFunction) {
+ // For JSON_OBJECTAGG we only need to wrap its second (=
value) argument
+ final int valueIndex = currentCall.getArgList().get(1);
+ wrapIndicesMap.put(i, valueIndex);
+ } else if (currentCall.getAggregation() instanceof
SqlJsonArrayAggAggFunction) {
+ final int valueIndex = currentCall.getArgList().get(0);
+ wrapIndicesMap.put(i, valueIndex);
+ }
+ }
+
+ if (wrapIndicesMap.isEmpty()) {
Review Comment:
IIUC, the `wrapIndicesMap` should never be empty here since the rule only
matches aggregate which contains json agg(s), so we should remove this check
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java:
##########
@@ -80,45 +81,60 @@ public WrapJsonAggFunctionArgumentsRule(Config config) {
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalAggregate aggregate = call.rel(0);
- final AggregateCall aggCall = aggregate.getAggCallList().get(0);
-
final RelNode aggInput = aggregate.getInput();
final RelBuilder relBuilder = call.builder().push(aggInput);
- final List<Integer> affectedArgs = getAffectedArgs(aggCall);
- addProjections(aggregate.getCluster(), relBuilder, affectedArgs);
-
- final TargetMapping argsMapping =
- getAggArgsMapping(aggInput.getRowType().getFieldCount(),
affectedArgs);
-
- final AggregateCall newAggregateCall = aggCall.transform(argsMapping);
- final LogicalAggregate newAggregate =
- aggregate.copy(
- aggregate.getTraitSet(),
- relBuilder.build(),
- aggregate.getGroupSet(),
- aggregate.getGroupSets(),
- Collections.singletonList(newAggregateCall));
-
call.transformTo(newAggregate.withHints(Collections.singletonList(MARKER_HINT)));
+ final LogicalAggregate wrappedAggregate = wrapJsonAggregate(aggregate,
relBuilder);
+
call.transformTo(wrappedAggregate.withHints(Collections.singletonList(MARKER_HINT)));
}
- /**
- * Returns the aggregation's arguments which need to be wrapped.
- *
- * <p>This list is a subset of {@link AggregateCall#getArgList()} as not
every argument may need
- * to be wrapped into a {@link BuiltInFunctionDefinitions#JSON_STRING}
call.
- *
- * <p>Duplicates (e.g. for {@code JSON_OBJECTAGG(f0 VALUE f0)}) are
removed as we only need to
- * wrap them once.
- */
- private List<Integer> getAffectedArgs(AggregateCall aggCall) {
- if (aggCall.getAggregation() instanceof SqlJsonObjectAggAggFunction) {
- // For JSON_OBJECTAGG we only need to wrap its second (= value)
argument
- final int valueIndex = aggCall.getArgList().get(1);
- return Collections.singletonList(valueIndex);
+ private LogicalAggregate wrapJsonAggregate(LogicalAggregate aggregate,
RelBuilder relBuilder) {
+ int inputCount = aggregate.getInput().getRowType().getFieldCount();
+ List<AggregateCall> aggCallList = new
ArrayList<>(aggregate.getAggCallList());
+ // This map is a mapping relationship between jsonObjectAggCall and
the argument index
+ // need to be wrapped into a BuiltInFunctionDefinitions#JSON_STRING.
This map will be used
+ // to create newWrappedArgCallList after creating a new Project.
+ Map<Integer, Integer> wrapIndicesMap = new HashMap<>();
+ for (int i = 0; i < aggCallList.size(); i++) {
+ AggregateCall currentCall = aggCallList.get(i);
+ if (currentCall.getAggregation() instanceof
SqlJsonObjectAggAggFunction) {
+ // For JSON_OBJECTAGG we only need to wrap its second (=
value) argument
+ final int valueIndex = currentCall.getArgList().get(1);
+ wrapIndicesMap.put(i, valueIndex);
+ } else if (currentCall.getAggregation() instanceof
SqlJsonArrayAggAggFunction) {
+ final int valueIndex = currentCall.getArgList().get(0);
+ wrapIndicesMap.put(i, valueIndex);
+ }
+ }
+
+ if (wrapIndicesMap.isEmpty()) {
+ return aggregate;
+ }
+
+ // Create a new Project.
+ ArrayList<Integer> affectedArgs = new
ArrayList<>(wrapIndicesMap.values());
+ addProjections(aggregate.getCluster(), relBuilder, affectedArgs);
+
+ List<AggregateCall> newWrappedArgCallList = new ArrayList<>();
+ for (int i = 0; i < aggCallList.size(); i++) {
+ AggregateCall currentCall = aggCallList.get(i);
+ if (currentCall.getAggregation() instanceof
SqlJsonObjectAggAggFunction
Review Comment:
use `isJsonAggregation` directly
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java:
##########
@@ -145,13 +161,13 @@ private void addProjections(
* Returns a {@link TargetMapping} that defines how the arguments of the
aggregation must be
* mapped such that the wrapped arguments are used instead.
*/
- private TargetMapping getAggArgsMapping(int inputCount, List<Integer>
affectedArgs) {
- final int newCount = inputCount + affectedArgs.size();
+ private TargetMapping getAggArgsMapping(int inputCount, int
affectedArgsSize, int affectedArg) {
+ final int newCount = inputCount + affectedArgsSize;
final TargetMapping argsMapping =
Mappings.create(MappingType.BIJECTION, newCount, newCount);
- for (int i = 0; i < affectedArgs.size(); i++) {
- argsMapping.set(affectedArgs.get(i), inputCount + i);
+ for (int i = 0; i < affectedArgsSize; i++) {
Review Comment:
do we still need the for loop after changing to the single value
`affectedArg`?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java:
##########
@@ -80,45 +81,60 @@ public WrapJsonAggFunctionArgumentsRule(Config config) {
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalAggregate aggregate = call.rel(0);
- final AggregateCall aggCall = aggregate.getAggCallList().get(0);
-
final RelNode aggInput = aggregate.getInput();
final RelBuilder relBuilder = call.builder().push(aggInput);
- final List<Integer> affectedArgs = getAffectedArgs(aggCall);
- addProjections(aggregate.getCluster(), relBuilder, affectedArgs);
-
- final TargetMapping argsMapping =
- getAggArgsMapping(aggInput.getRowType().getFieldCount(),
affectedArgs);
-
- final AggregateCall newAggregateCall = aggCall.transform(argsMapping);
- final LogicalAggregate newAggregate =
- aggregate.copy(
- aggregate.getTraitSet(),
- relBuilder.build(),
- aggregate.getGroupSet(),
- aggregate.getGroupSets(),
- Collections.singletonList(newAggregateCall));
-
call.transformTo(newAggregate.withHints(Collections.singletonList(MARKER_HINT)));
+ final LogicalAggregate wrappedAggregate = wrapJsonAggregate(aggregate,
relBuilder);
+
call.transformTo(wrappedAggregate.withHints(Collections.singletonList(MARKER_HINT)));
}
- /**
- * Returns the aggregation's arguments which need to be wrapped.
- *
- * <p>This list is a subset of {@link AggregateCall#getArgList()} as not
every argument may need
- * to be wrapped into a {@link BuiltInFunctionDefinitions#JSON_STRING}
call.
- *
- * <p>Duplicates (e.g. for {@code JSON_OBJECTAGG(f0 VALUE f0)}) are
removed as we only need to
- * wrap them once.
- */
- private List<Integer> getAffectedArgs(AggregateCall aggCall) {
- if (aggCall.getAggregation() instanceof SqlJsonObjectAggAggFunction) {
- // For JSON_OBJECTAGG we only need to wrap its second (= value)
argument
- final int valueIndex = aggCall.getArgList().get(1);
- return Collections.singletonList(valueIndex);
+ private LogicalAggregate wrapJsonAggregate(LogicalAggregate aggregate,
RelBuilder relBuilder) {
+ int inputCount = aggregate.getInput().getRowType().getFieldCount();
+ List<AggregateCall> aggCallList = new
ArrayList<>(aggregate.getAggCallList());
+ // This map is a mapping relationship between jsonObjectAggCall and
the argument index
+ // need to be wrapped into a BuiltInFunctionDefinitions#JSON_STRING.
This map will be used
+ // to create newWrappedArgCallList after creating a new Project.
+ Map<Integer, Integer> wrapIndicesMap = new HashMap<>();
+ for (int i = 0; i < aggCallList.size(); i++) {
+ AggregateCall currentCall = aggCallList.get(i);
+ if (currentCall.getAggregation() instanceof
SqlJsonObjectAggAggFunction) {
+ // For JSON_OBJECTAGG we only need to wrap its second (=
value) argument
+ final int valueIndex = currentCall.getArgList().get(1);
+ wrapIndicesMap.put(i, valueIndex);
+ } else if (currentCall.getAggregation() instanceof
SqlJsonArrayAggAggFunction) {
+ final int valueIndex = currentCall.getArgList().get(0);
+ wrapIndicesMap.put(i, valueIndex);
+ }
+ }
+
+ if (wrapIndicesMap.isEmpty()) {
+ return aggregate;
+ }
+
+ // Create a new Project.
+ ArrayList<Integer> affectedArgs = new
ArrayList<>(wrapIndicesMap.values());
Review Comment:
the `wrapIndicesMap` may contains duplicate values, we can avoid adding
redundant fields projection
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]