This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new abe5a7e  [BEAM-7783] BeamIOSourceRel using custom BeamStatistics
     new e9e8731  Merge pull request #9161 from riazela/BeamTableStatistics
abe5a7e is described below

commit abe5a7e486f400d5c204e734482ba731a89f9921
Author: Alireza Samadian <[email protected]>
AuthorDate: Thu Jul 25 15:27:01 2019 -0700

    [BEAM-7783] BeamIOSourceRel using custom BeamStatistics
---
 .../beam/sdk/extensions/sql/BeamSqlTable.java      |   2 +-
 .../sdk/extensions/sql/impl/BeamCalciteTable.java  |   3 +-
 .../extensions/sql/impl/rel/BeamIOSourceRel.java   |  18 ++-
 .../sql/meta/provider/test/TestBoundedTable.java   |   7 ++
 .../sql/meta/provider/test/TestUnboundedTable.java |  14 +++
 .../sdk/extensions/sql/impl/rel/BaseRelTest.java   |   6 +-
 .../sql/impl/rel/BeamIOSourceRelTest.java          | 128 +++++++++++++++++++++
 .../rel/BeamJoinRelUnboundedVsBoundedTest.java     |   2 +-
 8 files changed, 174 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index b759761..6ddf8bd 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -39,7 +39,7 @@ public interface BeamSqlTable {
   /** Get the schema info of the table. */
   Schema getSchema();
 
-  /** Estimates the number of rows or returns null if there is no estimation. 
*/
+  /** Estimates the number of rows or the rate for unbounded Tables. */
   default BeamTableStatistics getRowCount(PipelineOptions options) {
     return BeamTableStatistics.UNKNOWN;
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index 293a60b..b6dbf53 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -40,7 +40,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.ModifiableTable;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
 import org.apache.calcite.schema.TranslatableTable;
 
 /** Adapter from {@link BeamSqlTable} to a calcite Table. */
@@ -81,7 +80,7 @@ public class BeamCalciteTable extends AbstractQueryableTable
   }
 
   @Override
-  public Statistic getStatistic() {
+  public BeamTableStatistics getStatistic() {
     /*
      Changing class loader is required for the JDBC path. It is similar to 
what done in
      {@link BeamEnumerableConverter#toRowList} and {@link 
BeamEnumerableConverter#toEnumerable }.
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 82fcd3d..e22b64b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -22,11 +22,14 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
@@ -52,7 +55,12 @@ public class BeamIOSourceRel extends TableScan implements 
BeamRelNode {
 
   @Override
   public double estimateRowCount(RelMetadataQuery mq) {
-    return super.estimateRowCount(mq);
+    BeamTableStatistics rowCountStatistics = calciteTable.getStatistic();
+    if (beamTable.isBounded() == PCollection.IsBounded.BOUNDED) {
+      return rowCountStatistics.getRowCount();
+    } else {
+      return rowCountStatistics.getRate();
+    }
   }
 
   @Override
@@ -78,6 +86,14 @@ public class BeamIOSourceRel extends TableScan implements 
BeamRelNode {
     }
   }
 
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
+    // We should technically avoid this function. This happens if we are in 
JDBC path or the
+    // costFactory is not set correctly.
+    double rowCount = this.estimateRowCount(mq);
+    return planner.getCostFactory().makeCost(rowCount, rowCount, rowCount);
+  }
+
   protected BeamSqlTable getBeamSqlTable() {
     return beamTable;
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
index 5c92c47..73edca9 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
@@ -22,6 +22,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -45,6 +47,11 @@ public class TestBoundedTable extends TestTable {
   }
 
   @Override
+  public BeamTableStatistics getRowCount(PipelineOptions options) {
+    return BeamTableStatistics.createBoundedTableStatistics((double) 
rows.size());
+  }
+
+  @Override
   public PCollection.IsBounded isBounded() {
     return PCollection.IsBounded.BOUNDED;
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
index 5d41fef..1fc4da5 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
@@ -40,6 +42,8 @@ public class TestUnboundedTable extends TestTable {
   /** specify the index of column in the row which stands for the event time 
field. */
   private int timestampField;
 
+  private BeamTableStatistics statistics = 
BeamTableStatistics.UNBOUNDED_UNKNOWN;
+
   private TestUnboundedTable(Schema beamSchema) {
     super(beamSchema);
   }
@@ -61,6 +65,16 @@ public class TestUnboundedTable extends TestTable {
     return new TestUnboundedTable(TestTableUtils.buildBeamSqlSchema(args));
   }
 
+  public TestUnboundedTable setStatistics(BeamTableStatistics statistics) {
+    this.statistics = statistics;
+    return this;
+  }
+
+  @Override
+  public BeamTableStatistics getRowCount(PipelineOptions options) {
+    return this.statistics;
+  }
+
   public TestUnboundedTable timestampColumnIndex(int idx) {
     this.timestampField = idx;
     return this;
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
index fafe10c..1e5f708 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.values.Row;
 /** Base class for rel test. */
 public abstract class BaseRelTest {
   private static Map<String, BeamSqlTable> tables = new HashMap<>();
-  private static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
+  protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
   protected static PCollection<Row> compilePipeline(String sql, Pipeline 
pipeline) {
     return BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery(sql));
@@ -37,4 +37,8 @@ public abstract class BaseRelTest {
   protected static void registerTable(String tableName, BeamSqlTable table) {
     tables.put(tableName, table);
   }
+
+  protected static BeamSqlTable getTable(String tableName) {
+    return tables.get(tableName);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
new file mode 100644
index 0000000..3da71e0
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.math.BigDecimal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test for {@code BeamIOSourceRel}. */
+public class BeamIOSourceRelTest extends BaseRelTest {
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
+
+  public static final DateTime FIRST_DATE = new DateTime(1);
+  public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+
+  private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
+  @BeforeClass
+  public static void prepare() {
+    registerTable(
+        "ORDER_DETAILS_BOUNDED",
+        TestBoundedTable.of(
+                Schema.FieldType.INT64, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.DECIMAL, "price")
+            .addRows(
+                1L,
+                1,
+                new BigDecimal(1.0),
+                1L,
+                1,
+                new BigDecimal(1.0),
+                2L,
+                2,
+                new BigDecimal(2.0),
+                4L,
+                4,
+                new BigDecimal(4.0),
+                4L,
+                4,
+                new BigDecimal(4.0)));
+
+    registerTable(
+        "ORDER_DETAILS_UNBOUNDED",
+        TestUnboundedTable.of(
+                Schema.FieldType.INT32, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.INT32, "price",
+                Schema.FieldType.DATETIME, "order_time")
+            .timestampColumnIndex(3)
+            .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE)
+            .addRows(
+                WINDOW_SIZE.plus(Duration.standardMinutes(1)),
+                2,
+                2,
+                7,
+                SECOND_DATE,
+                2,
+                3,
+                8,
+                SECOND_DATE,
+                // this late record is omitted(First window)
+                1,
+                3,
+                3,
+                FIRST_DATE)
+            .addRows(
+                // this late record is omitted(Second window)
+                
WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)),
+                2,
+                3,
+                3,
+                SECOND_DATE)
+            
.setStatistics(BeamTableStatistics.createUnboundedTableStatistics(2d)));
+  }
+
+  @Test
+  public void boundedRowCount() {
+    String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamIOSourceRel)) {
+      root = env.parseQuery(sql).getInput(0);
+    }
+
+    Assert.assertEquals(5d, 
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+  }
+
+  @Test
+  public void unboundedRowCount() {
+    String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamIOSourceRel)) {
+      root = env.parseQuery(sql).getInput(0);
+    }
+
+    Assert.assertEquals(2d, 
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 067af8a..190b091 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -112,7 +112,7 @@ public class BeamJoinRelUnboundedVsBoundedTest extends 
BaseRelTest {
 
     @Override
     public PCollection.IsBounded isBounded() {
-      throw new UnsupportedOperationException();
+      return PCollection.IsBounded.BOUNDED;
     }
 
     @Override

Reply via email to