This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new abe5a7e [BEAM-7783] BeamIOSourceRel using custom BeamStatistics
new e9e8731 Merge pull request #9161 from riazela/BeamTableStatistics
abe5a7e is described below
commit abe5a7e486f400d5c204e734482ba731a89f9921
Author: Alireza Samadian <[email protected]>
AuthorDate: Thu Jul 25 15:27:01 2019 -0700
[BEAM-7783] BeamIOSourceRel using custom BeamStatistics
---
.../beam/sdk/extensions/sql/BeamSqlTable.java | 2 +-
.../sdk/extensions/sql/impl/BeamCalciteTable.java | 3 +-
.../extensions/sql/impl/rel/BeamIOSourceRel.java | 18 ++-
.../sql/meta/provider/test/TestBoundedTable.java | 7 ++
.../sql/meta/provider/test/TestUnboundedTable.java | 14 +++
.../sdk/extensions/sql/impl/rel/BaseRelTest.java | 6 +-
.../sql/impl/rel/BeamIOSourceRelTest.java | 128 +++++++++++++++++++++
.../rel/BeamJoinRelUnboundedVsBoundedTest.java | 2 +-
8 files changed, 174 insertions(+), 6 deletions(-)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index b759761..6ddf8bd 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -39,7 +39,7 @@ public interface BeamSqlTable {
/** Get the schema info of the table. */
Schema getSchema();
- /** Estimates the number of rows or returns null if there is no estimation.
*/
+ /** Estimates the number of rows or the rate for unbounded Tables. */
default BeamTableStatistics getRowCount(PipelineOptions options) {
return BeamTableStatistics.UNKNOWN;
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index 293a60b..b6dbf53 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -40,7 +40,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.TranslatableTable;
/** Adapter from {@link BeamSqlTable} to a calcite Table. */
@@ -81,7 +80,7 @@ public class BeamCalciteTable extends AbstractQueryableTable
}
@Override
- public Statistic getStatistic() {
+ public BeamTableStatistics getStatistic() {
/*
Changing class loader is required for the JDBC path. It is similar to
what done in
{@link BeamEnumerableConverter#toRowList} and {@link
BeamEnumerableConverter#toEnumerable }.
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 82fcd3d..e22b64b 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -22,11 +22,14 @@ import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
@@ -52,7 +55,12 @@ public class BeamIOSourceRel extends TableScan implements
BeamRelNode {
@Override
public double estimateRowCount(RelMetadataQuery mq) {
- return super.estimateRowCount(mq);
+ BeamTableStatistics rowCountStatistics = calciteTable.getStatistic();
+ if (beamTable.isBounded() == PCollection.IsBounded.BOUNDED) {
+ return rowCountStatistics.getRowCount();
+ } else {
+ return rowCountStatistics.getRate();
+ }
}
@Override
@@ -78,6 +86,14 @@ public class BeamIOSourceRel extends TableScan implements
BeamRelNode {
}
}
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery
mq) {
+ // We should technically avoid this function. This happens if we are in
JDBC path or the
+ // costFactory is not set correctly.
+ double rowCount = this.estimateRowCount(mq);
+ return planner.getCostFactory().makeCost(rowCount, rowCount, rowCount);
+ }
+
protected BeamSqlTable getBeamSqlTable() {
return beamTable;
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
index 5c92c47..73edca9 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
@@ -22,6 +22,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -45,6 +47,11 @@ public class TestBoundedTable extends TestTable {
}
@Override
+ public BeamTableStatistics getRowCount(PipelineOptions options) {
+ return BeamTableStatistics.createBoundedTableStatistics((double)
rows.size());
+ }
+
+ @Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.BOUNDED;
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
index 5d41fef..1fc4da5 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.SerializableFunctions;
@@ -40,6 +42,8 @@ public class TestUnboundedTable extends TestTable {
/** specify the index of column in the row which stands for the event time
field. */
private int timestampField;
+ private BeamTableStatistics statistics =
BeamTableStatistics.UNBOUNDED_UNKNOWN;
+
private TestUnboundedTable(Schema beamSchema) {
super(beamSchema);
}
@@ -61,6 +65,16 @@ public class TestUnboundedTable extends TestTable {
return new TestUnboundedTable(TestTableUtils.buildBeamSqlSchema(args));
}
+ public TestUnboundedTable setStatistics(BeamTableStatistics statistics) {
+ this.statistics = statistics;
+ return this;
+ }
+
+ @Override
+ public BeamTableStatistics getRowCount(PipelineOptions options) {
+ return this.statistics;
+ }
+
public TestUnboundedTable timestampColumnIndex(int idx) {
this.timestampField = idx;
return this;
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
index fafe10c..1e5f708 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.values.Row;
/** Base class for rel test. */
public abstract class BaseRelTest {
private static Map<String, BeamSqlTable> tables = new HashMap<>();
- private static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
+ protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
protected static PCollection<Row> compilePipeline(String sql, Pipeline
pipeline) {
return BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery(sql));
@@ -37,4 +37,8 @@ public abstract class BaseRelTest {
protected static void registerTable(String tableName, BeamSqlTable table) {
tables.put(tableName, table);
}
+
+ protected static BeamSqlTable getTable(String tableName) {
+ return tables.get(tableName);
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
new file mode 100644
index 0000000..3da71e0
--- /dev/null
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.math.BigDecimal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test for {@code BeamIOSourceRel}. */
+public class BeamIOSourceRelTest extends BaseRelTest {
+ @Rule public final TestPipeline pipeline = TestPipeline.create();
+
+ public static final DateTime FIRST_DATE = new DateTime(1);
+ public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+
+ private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
+ @BeforeClass
+ public static void prepare() {
+ registerTable(
+ "ORDER_DETAILS_BOUNDED",
+ TestBoundedTable.of(
+ Schema.FieldType.INT64, "order_id",
+ Schema.FieldType.INT32, "site_id",
+ Schema.FieldType.DECIMAL, "price")
+ .addRows(
+ 1L,
+ 1,
+ new BigDecimal(1.0),
+ 1L,
+ 1,
+ new BigDecimal(1.0),
+ 2L,
+ 2,
+ new BigDecimal(2.0),
+ 4L,
+ 4,
+ new BigDecimal(4.0),
+ 4L,
+ 4,
+ new BigDecimal(4.0)));
+
+ registerTable(
+ "ORDER_DETAILS_UNBOUNDED",
+ TestUnboundedTable.of(
+ Schema.FieldType.INT32, "order_id",
+ Schema.FieldType.INT32, "site_id",
+ Schema.FieldType.INT32, "price",
+ Schema.FieldType.DATETIME, "order_time")
+ .timestampColumnIndex(3)
+ .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE)
+ .addRows(
+ WINDOW_SIZE.plus(Duration.standardMinutes(1)),
+ 2,
+ 2,
+ 7,
+ SECOND_DATE,
+ 2,
+ 3,
+ 8,
+ SECOND_DATE,
+ // this late record is omitted(First window)
+ 1,
+ 3,
+ 3,
+ FIRST_DATE)
+ .addRows(
+ // this late record is omitted(Second window)
+
WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)),
+ 2,
+ 3,
+ 3,
+ SECOND_DATE)
+
.setStatistics(BeamTableStatistics.createUnboundedTableStatistics(2d)));
+ }
+
+ @Test
+ public void boundedRowCount() {
+ String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamIOSourceRel)) {
+ root = env.parseQuery(sql).getInput(0);
+ }
+
+ Assert.assertEquals(5d,
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+ }
+
+ @Test
+ public void unboundedRowCount() {
+ String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamIOSourceRel)) {
+ root = env.parseQuery(sql).getInput(0);
+ }
+
+ Assert.assertEquals(2d,
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 067af8a..190b091 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -112,7 +112,7 @@ public class BeamJoinRelUnboundedVsBoundedTest extends
BaseRelTest {
@Override
public PCollection.IsBounded isBounded() {
- throw new UnsupportedOperationException();
+ return PCollection.IsBounded.BOUNDED;
}
@Override