amaliujia commented on a change in pull request #11975: URL: https://github.com/apache/beam/pull/11975#discussion_r444548154
########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<RexLiteral> constants, + RelDataType rowType, + List<Group> groups) { + super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + final List<FieldAggregation> analyticFields = Lists.newArrayList(); + this.groups.stream() + .forEach( + anAnalyticGroup -> { + List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList(); + List<Integer> orderByKeys = Lists.newArrayList(); + List<Boolean> orderByDirections = Lists.newArrayList(); + List<Boolean> orderByNullDirections = Lists.newArrayList(); Review comment: Add a test or add a checkArgument to disable (either one works for me) for NULL last or NULL first in ORDER BY. For NULL handling, depends on you, you can leave it for future PRs. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { Review comment: Based on your implementation below, it will be really awesome that you add what is supported or what are constrains. Then you can gradually update this java doc once you add new features. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<RexLiteral> constants, + RelDataType rowType, + List<Group> groups) { + super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + final List<FieldAggregation> analyticFields = Lists.newArrayList(); + this.groups.stream() + .forEach( + anAnalyticGroup -> { + List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList(); + List<Integer> orderByKeys = Lists.newArrayList(); + List<Boolean> orderByDirections = Lists.newArrayList(); + List<Boolean> orderByNullDirections = Lists.newArrayList(); + anAnalyticGroup.orderKeys.getFieldCollations().stream() + .forEach( + fc -> { + orderByKeys.add(fc.getFieldIndex()); + orderByDirections.add( + fc.direction == RelFieldCollation.Direction.ASCENDING); + orderByNullDirections.add( + fc.nullDirection == RelFieldCollation.NullDirection.FIRST); + }); + int lowerB = Integer.MAX_VALUE; // Unbounded by default + int upperB = Integer.MAX_VALUE; // Unbounded by default + if (anAnalyticGroup.lowerBound.isCurrentRow()) { + lowerB = 0; + } else if (anAnalyticGroup.lowerBound.isPreceding()) { + // pending Review comment: Better to use `checkArgument` to stop execution when you are seeing an unsupported case. See: https://www.baeldung.com/guava-preconditions ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java ########## @@ -37,7 +40,20 @@ private BeamCalcRule() { @Override public boolean matches(RelOptRuleCall x) { - return true; + boolean hasRexOver = false; Review comment: Add a comment to explain what this piece of code is doing. Basically it stops converting in the OVER clause case. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { Review comment: Add java doc for classes (java doc means comments starts with `/*` and ends with `*/` ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<RexLiteral> constants, + RelDataType rowType, + List<Group> groups) { + super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + final List<FieldAggregation> analyticFields = Lists.newArrayList(); + this.groups.stream() + .forEach( + anAnalyticGroup -> { + List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList(); + List<Integer> orderByKeys = Lists.newArrayList(); + List<Boolean> orderByDirections = Lists.newArrayList(); + List<Boolean> orderByNullDirections = Lists.newArrayList(); + anAnalyticGroup.orderKeys.getFieldCollations().stream() + .forEach( + fc -> { + orderByKeys.add(fc.getFieldIndex()); + orderByDirections.add( + fc.direction == RelFieldCollation.Direction.ASCENDING); + orderByNullDirections.add( + fc.nullDirection == RelFieldCollation.NullDirection.FIRST); + }); + int lowerB = Integer.MAX_VALUE; // Unbounded by default + int upperB = Integer.MAX_VALUE; // Unbounded by default + if (anAnalyticGroup.lowerBound.isCurrentRow()) { + lowerB = 0; + } else if (anAnalyticGroup.lowerBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.lowerBound.isFollowing()) { + // pending Review comment: Same ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<RexLiteral> constants, + RelDataType rowType, + List<Group> groups) { + super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + final List<FieldAggregation> analyticFields = Lists.newArrayList(); + this.groups.stream() + .forEach( + anAnalyticGroup -> { + List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList(); + List<Integer> orderByKeys = Lists.newArrayList(); + List<Boolean> orderByDirections = Lists.newArrayList(); + List<Boolean> orderByNullDirections = Lists.newArrayList(); + anAnalyticGroup.orderKeys.getFieldCollations().stream() + .forEach( + fc -> { + orderByKeys.add(fc.getFieldIndex()); + orderByDirections.add( + fc.direction == RelFieldCollation.Direction.ASCENDING); + orderByNullDirections.add( + fc.nullDirection == RelFieldCollation.NullDirection.FIRST); + }); + int lowerB = Integer.MAX_VALUE; // Unbounded by default + int upperB = Integer.MAX_VALUE; // Unbounded by default + if (anAnalyticGroup.lowerBound.isCurrentRow()) { + lowerB = 0; + } else if (anAnalyticGroup.lowerBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.lowerBound.isFollowing()) { + // pending + } + if (anAnalyticGroup.upperBound.isCurrentRow()) { + upperB = 0; + } else if (anAnalyticGroup.upperBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.upperBound.isFollowing()) { + // pending + } + final int lowerBFinal = lowerB; + final int upperBFinal = upperB; + List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this); + aggregateCalls.stream() + .forEach( + anAggCall -> { + List<Integer> argList = anAggCall.getArgList(); + Schema.Field field = + CalciteUtils.toField(anAggCall.getName(), anAggCall.getType()); + Combine.CombineFn combineFn = + AggregationCombineFnAdapter.createCombineFn( + anAggCall, field, anAggCall.getAggregation().getName()); + FieldAggregation fieldAggregation = + new FieldAggregation( Review comment: Even though you will stop executions on some unsupported cases, I think it is still ok to keep this class definition (e.g. no need to remove those unused field). ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<RexLiteral> constants, + RelDataType rowType, + List<Group> groups) { + super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + final List<FieldAggregation> analyticFields = Lists.newArrayList(); + this.groups.stream() + .forEach( + anAnalyticGroup -> { + List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList(); + List<Integer> orderByKeys = Lists.newArrayList(); + List<Boolean> orderByDirections = Lists.newArrayList(); Review comment: Add a test that includes DESC for ORDER BY? ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<RexLiteral> constants, + RelDataType rowType, + List<Group> groups) { + super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + final List<FieldAggregation> analyticFields = Lists.newArrayList(); + this.groups.stream() + .forEach( + anAnalyticGroup -> { + List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList(); + List<Integer> orderByKeys = Lists.newArrayList(); + List<Boolean> orderByDirections = Lists.newArrayList(); + List<Boolean> orderByNullDirections = Lists.newArrayList(); + anAnalyticGroup.orderKeys.getFieldCollations().stream() + .forEach( + fc -> { + orderByKeys.add(fc.getFieldIndex()); + orderByDirections.add( + fc.direction == RelFieldCollation.Direction.ASCENDING); + orderByNullDirections.add( + fc.nullDirection == RelFieldCollation.NullDirection.FIRST); + }); + int lowerB = Integer.MAX_VALUE; // Unbounded by default + int upperB = Integer.MAX_VALUE; // Unbounded by default + if (anAnalyticGroup.lowerBound.isCurrentRow()) { + lowerB = 0; + } else if (anAnalyticGroup.lowerBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.lowerBound.isFollowing()) { + // pending + } + if (anAnalyticGroup.upperBound.isCurrentRow()) { + upperB = 0; + } else if (anAnalyticGroup.upperBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.upperBound.isFollowing()) { + // pending + } + final int lowerBFinal = lowerB; + final int upperBFinal = upperB; + List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this); + aggregateCalls.stream() + .forEach( + anAggCall -> { + List<Integer> argList = anAggCall.getArgList(); + Schema.Field field = + CalciteUtils.toField(anAggCall.getName(), anAggCall.getType()); + Combine.CombineFn combineFn = + AggregationCombineFnAdapter.createCombineFn( + anAggCall, field, anAggCall.getAggregation().getName()); + FieldAggregation fieldAggregation = + new FieldAggregation( + partitionKeysDef, + orderByKeys, + orderByDirections, + orderByNullDirections, + lowerBFinal, + upperBFinal, + anAnalyticGroup.isRows, + argList, + combineFn, + field); + analyticFields.add(fieldAggregation); + }); + }); + + return new Transform(outputSchema, analyticFields); + } + + private static class FieldAggregation implements Serializable { + + private List<Integer> partitionKeys; + private List<Integer> orderKeys; + private List<Boolean> orderOrientations; + private List<Boolean> orderNulls; + private int lowerLimit = Integer.MAX_VALUE; + private int upperLimit = Integer.MAX_VALUE; + private boolean rows = true; + private List<Integer> inputFields; + private Combine.CombineFn combineFn; + private Schema.Field outputField; + + public FieldAggregation( + List<Integer> partitionKeys, + List<Integer> orderKeys, + List<Boolean> orderOrientations, + List<Boolean> orderNulls, + int lowerLimit, + int upperLimit, + boolean rows, + List<Integer> inputFields, + Combine.CombineFn combineFn, + Schema.Field outputField) { + this.partitionKeys = partitionKeys; + this.orderKeys = orderKeys; + this.orderOrientations = orderOrientations; + this.orderNulls = orderNulls; + this.lowerLimit = lowerLimit; + this.upperLimit = upperLimit; + this.rows = rows; + this.inputFields = inputFields; + this.combineFn = combineFn; + this.outputField = outputField; + } + } + + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq); + return inputStat; + } + + @Override + public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq); + float multiplier = 1f + 0.125f; + return BeamCostModel.FACTORY.makeCost( + inputStat.getRowCount() * multiplier, inputStat.getRate() * multiplier); + } + + private static class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> { + + private Schema outputSchema; + private List<FieldAggregation> aggFields; + + public Transform(Schema schema, List<FieldAggregation> fieldAgg) { + this.outputSchema = schema; + this.aggFields = fieldAgg; + } + + @Override + public PCollection<Row> expand(PCollectionList<Row> input) { + PCollection<Row> inputData = input.get(0); + Schema inputSchema = inputData.getSchema(); + for (FieldAggregation af : aggFields) { + if (af.partitionKeys.isEmpty()) { + // This sections simulate a KV Row + // Similar to the output of Group.byFieldIds + // When no partitions are specified + Schema inputSch = inputData.getSchema(); Review comment: The mock key generation here could be simplified by, something like ``` windowedStream .apply(WithKeys.of("dummy")) .apply(GroupByKey.create()) ``` note that maybe do not for many schema manipulation. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<RexLiteral> constants, + RelDataType rowType, + List<Group> groups) { + super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + final List<FieldAggregation> analyticFields = Lists.newArrayList(); + this.groups.stream() + .forEach( + anAnalyticGroup -> { + List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList(); + List<Integer> orderByKeys = Lists.newArrayList(); + List<Boolean> orderByDirections = Lists.newArrayList(); + List<Boolean> orderByNullDirections = Lists.newArrayList(); + anAnalyticGroup.orderKeys.getFieldCollations().stream() + .forEach( + fc -> { + orderByKeys.add(fc.getFieldIndex()); + orderByDirections.add( + fc.direction == RelFieldCollation.Direction.ASCENDING); + orderByNullDirections.add( + fc.nullDirection == RelFieldCollation.NullDirection.FIRST); + }); + int lowerB = Integer.MAX_VALUE; // Unbounded by default + int upperB = Integer.MAX_VALUE; // Unbounded by default + if (anAnalyticGroup.lowerBound.isCurrentRow()) { + lowerB = 0; + } else if (anAnalyticGroup.lowerBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.lowerBound.isFollowing()) { + // pending + } + if (anAnalyticGroup.upperBound.isCurrentRow()) { + upperB = 0; + } else if (anAnalyticGroup.upperBound.isPreceding()) { + // pending Review comment: Same ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<RexLiteral> constants, + RelDataType rowType, + List<Group> groups) { + super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + final List<FieldAggregation> analyticFields = Lists.newArrayList(); + this.groups.stream() + .forEach( + anAnalyticGroup -> { + List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList(); + List<Integer> orderByKeys = Lists.newArrayList(); + List<Boolean> orderByDirections = Lists.newArrayList(); + List<Boolean> orderByNullDirections = Lists.newArrayList(); + anAnalyticGroup.orderKeys.getFieldCollations().stream() + .forEach( + fc -> { + orderByKeys.add(fc.getFieldIndex()); + orderByDirections.add( + fc.direction == RelFieldCollation.Direction.ASCENDING); + orderByNullDirections.add( + fc.nullDirection == RelFieldCollation.NullDirection.FIRST); + }); + int lowerB = Integer.MAX_VALUE; // Unbounded by default + int upperB = Integer.MAX_VALUE; // Unbounded by default + if (anAnalyticGroup.lowerBound.isCurrentRow()) { + lowerB = 0; + } else if (anAnalyticGroup.lowerBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.lowerBound.isFollowing()) { + // pending + } + if (anAnalyticGroup.upperBound.isCurrentRow()) { + upperB = 0; + } else if (anAnalyticGroup.upperBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.upperBound.isFollowing()) { + // pending Review comment: Same ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<RexLiteral> constants, + RelDataType rowType, + List<Group> groups) { + super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + final List<FieldAggregation> analyticFields = Lists.newArrayList(); + this.groups.stream() + .forEach( + anAnalyticGroup -> { + List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList(); + List<Integer> orderByKeys = Lists.newArrayList(); + List<Boolean> orderByDirections = Lists.newArrayList(); + List<Boolean> orderByNullDirections = Lists.newArrayList(); + anAnalyticGroup.orderKeys.getFieldCollations().stream() + .forEach( + fc -> { + orderByKeys.add(fc.getFieldIndex()); + orderByDirections.add( + fc.direction == RelFieldCollation.Direction.ASCENDING); + orderByNullDirections.add( + fc.nullDirection == RelFieldCollation.NullDirection.FIRST); + }); + int lowerB = Integer.MAX_VALUE; // Unbounded by default + int upperB = Integer.MAX_VALUE; // Unbounded by default + if (anAnalyticGroup.lowerBound.isCurrentRow()) { + lowerB = 0; + } else if (anAnalyticGroup.lowerBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.lowerBound.isFollowing()) { + // pending + } + if (anAnalyticGroup.upperBound.isCurrentRow()) { + upperB = 0; + } else if (anAnalyticGroup.upperBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.upperBound.isFollowing()) { + // pending + } + final int lowerBFinal = lowerB; + final int upperBFinal = upperB; + List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this); + aggregateCalls.stream() + .forEach( + anAggCall -> { + List<Integer> argList = anAggCall.getArgList(); + Schema.Field field = + CalciteUtils.toField(anAggCall.getName(), anAggCall.getType()); + Combine.CombineFn combineFn = + AggregationCombineFnAdapter.createCombineFn( + anAggCall, field, anAggCall.getAggregation().getName()); + FieldAggregation fieldAggregation = + new FieldAggregation( + partitionKeysDef, + orderByKeys, + orderByDirections, + orderByNullDirections, + lowerBFinal, + upperBFinal, + anAnalyticGroup.isRows, + argList, + combineFn, + field); + analyticFields.add(fieldAggregation); + }); + }); + + return new Transform(outputSchema, analyticFields); + } + + private static class FieldAggregation implements Serializable { + + private List<Integer> partitionKeys; + private List<Integer> orderKeys; + private List<Boolean> orderOrientations; + private List<Boolean> orderNulls; + private int lowerLimit = Integer.MAX_VALUE; + private int upperLimit = Integer.MAX_VALUE; + private boolean rows = true; + private List<Integer> inputFields; + private Combine.CombineFn combineFn; + private Schema.Field outputField; + + public FieldAggregation( + List<Integer> partitionKeys, + List<Integer> orderKeys, + List<Boolean> orderOrientations, + List<Boolean> orderNulls, + int lowerLimit, + int upperLimit, + boolean rows, + List<Integer> inputFields, + Combine.CombineFn combineFn, + Schema.Field outputField) { + this.partitionKeys = partitionKeys; + this.orderKeys = orderKeys; + this.orderOrientations = orderOrientations; + this.orderNulls = orderNulls; + this.lowerLimit = lowerLimit; + this.upperLimit = upperLimit; + this.rows = rows; + this.inputFields = inputFields; + this.combineFn = combineFn; + this.outputField = outputField; + } + } + + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq); + return inputStat; + } + + @Override + public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { Review comment: Add a function java doc to describe your choice of cost and why. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamWindowRule.java ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamWindowRel; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Convention; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.convert.ConverterRule; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalWindow; + +public class BeamWindowRule extends ConverterRule { Review comment: Class java doc will be very helpful. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List<RexLiteral> constants, + RelDataType rowType, + List<Group> groups) { + super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + final List<FieldAggregation> analyticFields = Lists.newArrayList(); + this.groups.stream() + .forEach( + anAnalyticGroup -> { + List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList(); + List<Integer> orderByKeys = Lists.newArrayList(); + List<Boolean> orderByDirections = Lists.newArrayList(); + List<Boolean> orderByNullDirections = Lists.newArrayList(); + anAnalyticGroup.orderKeys.getFieldCollations().stream() + .forEach( + fc -> { + orderByKeys.add(fc.getFieldIndex()); + orderByDirections.add( + fc.direction == RelFieldCollation.Direction.ASCENDING); + orderByNullDirections.add( + fc.nullDirection == RelFieldCollation.NullDirection.FIRST); + }); + int lowerB = Integer.MAX_VALUE; // Unbounded by default + int upperB = Integer.MAX_VALUE; // Unbounded by default + if (anAnalyticGroup.lowerBound.isCurrentRow()) { + lowerB = 0; + } else if (anAnalyticGroup.lowerBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.lowerBound.isFollowing()) { + // pending + } + if (anAnalyticGroup.upperBound.isCurrentRow()) { + upperB = 0; + } else if (anAnalyticGroup.upperBound.isPreceding()) { + // pending + } else if (anAnalyticGroup.upperBound.isFollowing()) { + // pending + } + final int lowerBFinal = lowerB; + final int upperBFinal = upperB; + List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this); + aggregateCalls.stream() + .forEach( + anAggCall -> { + List<Integer> argList = anAggCall.getArgList(); + Schema.Field field = + CalciteUtils.toField(anAggCall.getName(), anAggCall.getType()); + Combine.CombineFn combineFn = + AggregationCombineFnAdapter.createCombineFn( + anAggCall, field, anAggCall.getAggregation().getName()); + FieldAggregation fieldAggregation = + new FieldAggregation( + partitionKeysDef, + orderByKeys, + orderByDirections, + orderByNullDirections, + lowerBFinal, + upperBFinal, + anAnalyticGroup.isRows, + argList, + combineFn, + field); + analyticFields.add(fieldAggregation); + }); + }); + + return new Transform(outputSchema, analyticFields); + } + + private static class FieldAggregation implements Serializable { + + private List<Integer> partitionKeys; + private List<Integer> orderKeys; + private List<Boolean> orderOrientations; + private List<Boolean> orderNulls; + private int lowerLimit = Integer.MAX_VALUE; + private int upperLimit = Integer.MAX_VALUE; + private boolean rows = true; + private List<Integer> inputFields; + private Combine.CombineFn combineFn; + private Schema.Field outputField; + + public FieldAggregation( + List<Integer> partitionKeys, + List<Integer> orderKeys, + List<Boolean> orderOrientations, + List<Boolean> orderNulls, + int lowerLimit, + int upperLimit, + boolean rows, + List<Integer> inputFields, + Combine.CombineFn combineFn, + Schema.Field outputField) { + this.partitionKeys = partitionKeys; + this.orderKeys = orderKeys; + this.orderOrientations = orderOrientations; + this.orderNulls = orderNulls; + this.lowerLimit = lowerLimit; + this.upperLimit = upperLimit; + this.rows = rows; + this.inputFields = inputFields; + this.combineFn = combineFn; + this.outputField = outputField; + } + } + + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq); + return inputStat; + } + + @Override + public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq); + float multiplier = 1f + 0.125f; + return BeamCostModel.FACTORY.makeCost( + inputStat.getRowCount() * multiplier, inputStat.getRate() * multiplier); + } + + private static class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> { + + private Schema outputSchema; + private List<FieldAggregation> aggFields; + + public Transform(Schema schema, List<FieldAggregation> fieldAgg) { + this.outputSchema = schema; + this.aggFields = fieldAgg; + } + + @Override + public PCollection<Row> expand(PCollectionList<Row> input) { + PCollection<Row> inputData = input.get(0); + Schema inputSchema = inputData.getSchema(); + for (FieldAggregation af : aggFields) { + if (af.partitionKeys.isEmpty()) { + // This sections simulate a KV Row + // Similar to the output of Group.byFieldIds + // When no partitions are specified + Schema inputSch = inputData.getSchema(); + Schema mockKeySchema = + Schema.of(Schema.Field.of("mock", Schema.FieldType.STRING.withNullable(true))); + Schema simulatedKeyValueSchema = + Schema.of( + Schema.Field.of("key", Schema.FieldType.row(mockKeySchema)), + Schema.Field.of( + "value", Schema.FieldType.iterable(Schema.FieldType.row(inputSch)))); + PCollection<Iterable<Row>> apply = + inputData.apply(org.apache.beam.sdk.schemas.transforms.Group.globally()); + inputData = + apply + .apply(ParDo.of(uniquePartition(mockKeySchema, simulatedKeyValueSchema))) + .setRowSchema(simulatedKeyValueSchema); + } else { + org.apache.beam.sdk.schemas.transforms.Group.ByFields<Row> myg = + org.apache.beam.sdk.schemas.transforms.Group.byFieldIds(af.partitionKeys); + inputData = inputData.apply("partitionBy", myg); + } + inputData = Review comment: Can you skip sort transform when there is no order by? Also leave a comment to say migrate to SortValues transform in the future. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
