[
https://issues.apache.org/jira/browse/BEAM-4663?focusedWorklogId=126533&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-126533
]
ASF GitHub Bot logged work on BEAM-4663:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Jul/18 09:12
Start Date: 24/Jul/18 09:12
Worklog Time Spent: 10m
Work Description: vectorijk closed pull request #5825: [WIP] [BEAM-4663]
CBO cost calculation
URL: https://github.com/apache/beam/pull/5825
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 6d6e7395a14..2ce3acc20cc 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -44,10 +44,13 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
@@ -73,6 +76,15 @@ public BeamAggregationRel(
this.windowFieldIndex =
windowField.map(AggregateWindowField::fieldIndex).orElse(-1);
}
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery
metadata) {
+ RelNode child = getInput();
+ Double rowCnt = metadata.getRowCount(child);
+ double rowSize = estimateRowSize(child.getRowType());
+ int aggCnt = getNamedAggCalls().size();
+ return planner.getCostFactory().makeCost(rowCnt, rowCnt * aggCnt, rowCnt *
rowSize);
+ }
+
@Override
public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
return new Transform();
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index 350bccfa02b..695ba56878e 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -27,12 +27,14 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql2rel.RelStructuredTypeFlattener;
@@ -99,6 +101,13 @@ public void register(RelOptPlanner planner) {
super.register(planner);
}
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery
mq) {
+ double rowSize = estimateRowSize(getRowType());
+ double rowCnt = mq.getRowCount(this);
+ return planner.getCostFactory().makeCost(rowCnt, rowCnt, rowCnt * rowSize);
+ }
+
@Override
public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
return new Transform();
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 4345d04b765..4a3f667b462 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -51,11 +51,14 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
@@ -125,6 +128,21 @@ public Join copy(
return BeamRelNode.super.getPCollectionInputs();
}
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery
mq) {
+ double leftRowCnt = mq.getRowCount(getLeft());
+ double leftRowSize = estimateRowSize(getLeft().getRowType());
+
+ double rightRowCnt = mq.getRowCount(getRight());
+ double rightRowSize = estimateRowSize(getRight().getRowType());
+
+ double ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize);
+ double cpuCost = leftRowCnt + rightRowCnt;
+ double rowCnt = leftRowCnt + rightRowCnt;
+
+ return planner.getCostFactory().makeCost(rowCnt, cpuCost, ioCost);
+ }
+
@Override
public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
return new Transform();
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index 6ac89832f9f..f929829e180 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -24,6 +24,8 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
/** A {@link RelNode} that can also give a {@link PTransform} that implements
the expression. */
public interface BeamRelNode extends RelNode {
@@ -45,4 +47,61 @@
}
return options;
}
+
+ default double estimateRowSize(RelDataType rowType) {
+ List<RelDataTypeField> fieldList = rowType.getFieldList();
+
+ double score =
+ fieldList
+ .stream()
+ .mapToDouble(typeField ->
estimateDataTypeSize(typeField.getType()))
+ .sum();
+
+ return score;
+ }
+
+ default double estimateDataTypeSize(RelDataType type) {
+ switch (type.getSqlTypeName()) {
+ case TINYINT:
+ return 1;
+ case SMALLINT:
+ return 2;
+ case INTEGER:
+ return 4;
+ case BIGINT:
+ return 8;
+ case BOOLEAN:
+ return 1;
+ case FLOAT:
+ return 4;
+ case DOUBLE:
+ return 8;
+ case VARCHAR:
+ return 12;
+ case CHAR:
+ return 1;
+ case DECIMAL:
+ return 12;
+ // case typeName if
SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) : 8
+ // case typeName if
SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) : 4
+ case TIME:
+ case TIMESTAMP:
+ case DATE:
+ return 12;
+ case ROW:
+ return estimateRowSize(type);
+ // 16 is an arbitrary estimate
+ case ARRAY:
+ return estimateDataTypeSize(type.getComponentType()) * 16;
+ case MAP:
+ case MULTISET:
+ // 16 is an arbitrary estimate
+ return (estimateDataTypeSize(type.getKeyType()) +
estimateDataTypeSize(type.getValueType()))
+ * 16;
+ case ANY:
+ return 128; // 128 is an arbitrary estimate
+ default:
+ throw new UnsupportedOperationException("Unsupported data type");
+ }
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index 4d0790a4512..9e0dae45846 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -25,10 +25,13 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
/**
* {@link BeamRelNode} to replace a {@link Union}.
@@ -73,6 +76,14 @@ public SetOp copy(RelTraitSet traitSet, List<RelNode>
inputs, boolean all) {
return new BeamUnionRel(getCluster(), traitSet, inputs, all);
}
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery
mq) {
+ List<RelNode> children = getInputs();
+ double rowCnt = children.stream().mapToDouble(child ->
mq.getRowCount(child)).sum();
+
+ return planner.getCostFactory().makeCost(rowCnt, 0, 0);
+ }
+
@Override
public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
return new BeamSetOperatorRelBase(this,
BeamSetOperatorRelBase.OpType.UNION, all);
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index e3ab1dc4a3b..1fa9e7e6143 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -34,8 +34,11 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
@@ -59,6 +62,12 @@ public BeamValuesRel(
super(cluster, rowType, tuples, traits);
}
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery
mq) {
+ double rowCnt = mq.getRowCount(this);
+ return planner.getCostFactory().makeCost(rowCnt, 1, 0);
+ }
+
@Override
public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
return new Transform();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 126533)
Time Spent: 50m (was: 40m)
> Implement Cost calculations for Cost-Based Optimization (CBO)
> --------------------------------------------------------------
>
> Key: BEAM-4663
> URL: https://issues.apache.org/jira/browse/BEAM-4663
> Project: Beam
> Issue Type: Sub-task
> Components: dsl-sql
> Reporter: Kai Jiang
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> To support CBO, we should implement methods in each Beam*Rel.java.
> computeSelfCost(...) as our first step.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)