This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3a0d9c4 [BEAM-7783] Adding BeamTableStatistics.
new ff7a803 Merge pull request #9104 from riazela/BeamTableStatistics
3a0d9c4 is described below
commit 3a0d9c4fe16ced30a223557a4ad531365d4977ec
Author: Alireza Samadian <[email protected]>
AuthorDate: Thu Jul 18 16:53:54 2019 -0700
[BEAM-7783] Adding BeamTableStatistics.
---
.../apache/beam/sdk/io/TextRowCountEstimator.java | 6 +-
.../beam/sdk/io/TextRowCountEstimatorTest.java | 12 +--
.../beam/sdk/extensions/sql/BeamSqlTable.java | 6 +-
.../sdk/extensions/sql/impl/BeamCalciteTable.java | 10 +--
.../sql/impl/BeamRowCountStatistics.java | 44 ----------
.../extensions/sql/impl/BeamTableStatistics.java | 93 ++++++++++++++++++++++
.../extensions/sql/impl/rel/BeamIOSourceRel.java | 24 ++++--
.../sql/meta/provider/bigquery/BigQueryTable.java | 14 ++--
.../sql/meta/provider/test/TestTableProvider.java | 9 +--
.../sql/meta/provider/text/TextTable.java | 15 ++--
.../sql/impl/rule/JoinReorderingTest.java | 16 ++--
.../meta/provider/bigquery/BigQueryRowCountIT.java | 13 ++-
.../meta/provider/bigquery/BigQueryTestTable.java | 4 +-
13 files changed, 160 insertions(+), 106 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
index d220505..ad26fb1 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
@@ -72,7 +72,7 @@ public abstract class TextRowCountEstimator {
* @throws
org.apache.beam.sdk.io.TextRowCountEstimator.NoEstimationException if all the
sampled
* lines are empty and we have not read all the lines in the matched
files.
*/
- public Long estimateRowCount(PipelineOptions pipelineOptions)
+ public Double estimateRowCount(PipelineOptions pipelineOptions)
throws IOException, NoEstimationException {
long linesSize = 0;
int numberOfReadLines = 0;
@@ -129,7 +129,7 @@ public abstract class TextRowCountEstimator {
}
if (numberOfReadLines == 0 && sampledEverything) {
- return 0L;
+ return 0d;
}
if (numberOfReadLines == 0) {
@@ -138,7 +138,7 @@ public abstract class TextRowCountEstimator {
}
// This is total file sizes divided by average line size.
- return totalFileSizes * numberOfReadLines / linesSize;
+ return (double) totalFileSizes * numberOfReadLines / linesSize;
}
/** Builder for {@link org.apache.beam.sdk.io.TextRowCountEstimator}. */
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
index b7e3f8e..6f53d1e 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextRowCountEstimatorTest.java
@@ -59,16 +59,16 @@ public class TextRowCountEstimatorTest {
writer.close();
TextRowCountEstimator textRowCountEstimator =
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() +
"/**").build();
- Long rows =
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+ Double rows =
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
Assert.assertNotNull(rows);
- Assert.assertEquals(150L, rows.longValue());
+ Assert.assertEquals(150d, rows, 0.01);
}
@Test(expected = FileNotFoundException.class)
public void testEmptyFolder() throws Exception {
TextRowCountEstimator textRowCountEstimator =
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() +
"/**").build();
- Long rows =
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+ Double rows =
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
}
@Test
@@ -82,8 +82,8 @@ public class TextRowCountEstimatorTest {
writer.close();
TextRowCountEstimator textRowCountEstimator =
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() +
"/**").build();
- Long rows =
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
- Assert.assertEquals(0L, rows.longValue());
+ Double rows =
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+ Assert.assertEquals(0d, rows, 0.01);
}
@Test(expected = TextRowCountEstimator.NoEstimationException.class)
@@ -110,7 +110,7 @@ public class TextRowCountEstimatorTest {
TextRowCountEstimator.builder()
.setFilePattern(temporaryFolder.getRoot() + "/something/**")
.build();
- Long rows =
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
+ Double rows =
textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
Assert.assertNull(rows);
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 63f7158..b759761 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.PBegin;
@@ -40,7 +40,7 @@ public interface BeamSqlTable {
Schema getSchema();
/** Estimates the number of rows or returns null if there is no estimation.
*/
- default BeamRowCountStatistics getRowCount(PipelineOptions options) {
- return BeamRowCountStatistics.UNKNOWN;
+ default BeamTableStatistics getRowCount(PipelineOptions options) {
+ return BeamTableStatistics.UNKNOWN;
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index 5aa0f27..293a60b 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -26,7 +26,6 @@ import
org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.options.PipelineOptions;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.calcite.adapter.java.AbstractQueryableTable;
import org.apache.calcite.linq4j.QueryProvider;
@@ -42,7 +41,6 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.TranslatableTable;
/** Adapter from {@link BeamSqlTable} to a calcite Table. */
@@ -91,10 +89,7 @@ public class BeamCalciteTable extends AbstractQueryableTable
final ClassLoader originalClassLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
- BeamRowCountStatistics beamStatistics =
beamTable.getRowCount(getPipelineOptions());
- return beamStatistics.isUnknown()
- ? Statistics.UNKNOWN
- : Statistics.of(beamStatistics.getRowCount().doubleValue(),
ImmutableList.of());
+ return beamTable.getRowCount(getPipelineOptions());
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
@@ -102,7 +97,8 @@ public class BeamCalciteTable extends AbstractQueryableTable
@Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable
relOptTable) {
- return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable,
pipelineOptionsMap);
+ return new BeamIOSourceRel(
+ context.getCluster(), relOptTable, beamTable, pipelineOptionsMap,
this);
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
deleted file mode 100644
index ac0431d..0000000
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-
-/** This class stores row count statistics. */
-public class BeamRowCountStatistics implements Serializable {
- public static final BeamRowCountStatistics UNKNOWN = new
BeamRowCountStatistics(null);
- private final BigInteger rowCount;
-
- private BeamRowCountStatistics(BigInteger rowCount) {
- this.rowCount = rowCount;
- }
-
- public static BeamRowCountStatistics createBoundedTableStatistics(BigInteger
rowCount) {
- return new BeamRowCountStatistics(rowCount);
- }
-
- /** Is true if the row count cannot be estimated. */
- public boolean isUnknown() {
- return rowCount == null;
- }
-
- public BigInteger getRowCount() {
- return rowCount;
- }
-}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java
new file mode 100644
index 0000000..c010604
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/** This class stores row count statistics. */
+@Experimental
+@Internal
+public class BeamTableStatistics implements Serializable, Statistic {
+ public static final BeamTableStatistics UNKNOWN = new
BeamTableStatistics(100d, 0d, true);
+ public static final BeamTableStatistics UNBOUNDED_UNKNOWN =
+ new BeamTableStatistics(0d, 0.1, true);
+ private final boolean unknown;
+ private final Double rowCount;
+ private final Double rate;
+
+ private BeamTableStatistics(Double rowCount, Double rate, boolean isUnknown)
{
+ this.rowCount = rowCount;
+ this.rate = rate;
+ this.unknown = isUnknown;
+ }
+
+ private BeamTableStatistics(Double rowCount, Double rate) {
+ this(rowCount, rate, false);
+ }
+
+ public static BeamTableStatistics createBoundedTableStatistics(Double
rowCount) {
+ return new BeamTableStatistics(rowCount, 0d);
+ }
+
+ public static BeamTableStatistics createUnboundedTableStatistics(Double
rate) {
+ return new BeamTableStatistics(0d, rate);
+ }
+
+ public Double getRate() {
+ return rate;
+ }
+
+ public boolean isUnknown() {
+ return unknown;
+ }
+
+ @Override
+ public Double getRowCount() {
+ return rowCount;
+ }
+
+ @Override
+ public boolean isKey(ImmutableBitSet columns) {
+ return false;
+ }
+
+ @Override
+ public List<RelReferentialConstraint> getReferentialConstraints() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public List<RelCollation> getCollations() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public RelDistribution getDistribution() {
+ return RelDistributionTraitDef.INSTANCE.getDefault();
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index e14b35b..82fcd3d 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -21,6 +21,7 @@ import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -28,26 +29,35 @@ import org.apache.beam.sdk.values.Row;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
/** BeamRelNode to replace a {@code TableScan} node. */
public class BeamIOSourceRel extends TableScan implements BeamRelNode {
- private final BeamSqlTable sqlTable;
+ private final BeamSqlTable beamTable;
+ private final BeamCalciteTable calciteTable;
private final Map<String, String> pipelineOptions;
public BeamIOSourceRel(
RelOptCluster cluster,
RelOptTable table,
- BeamSqlTable sqlTable,
- Map<String, String> pipelineOptions) {
+ BeamSqlTable beamTable,
+ Map<String, String> pipelineOptions,
+ BeamCalciteTable calciteTable) {
super(cluster, cluster.traitSetOf(BeamLogicalConvention.INSTANCE), table);
- this.sqlTable = sqlTable;
+ this.beamTable = beamTable;
+ this.calciteTable = calciteTable;
this.pipelineOptions = pipelineOptions;
}
@Override
+ public double estimateRowCount(RelMetadataQuery mq) {
+ return super.estimateRowCount(mq);
+ }
+
+ @Override
public PCollection.IsBounded isBounded() {
- return sqlTable.isBounded();
+ return beamTable.isBounded();
}
@Override
@@ -64,12 +74,12 @@ public class BeamIOSourceRel extends TableScan implements
BeamRelNode {
"Should not have received input for %s: %s",
BeamIOSourceRel.class.getSimpleName(),
input);
- return sqlTable.buildIOReader(input.getPipeline().begin());
+ return beamTable.buildIOReader(input.getPipeline().begin());
}
}
protected BeamSqlTable getBeamSqlTable() {
- return sqlTable;
+ return beamTable;
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index 984b1bd..6d0f773 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
class BigQueryTable extends BaseBeamTable implements Serializable {
@VisibleForTesting final String bqLocation;
private final ConversionOptions conversionOptions;
- private BeamRowCountStatistics rowCountStatistics = null;
+ private BeamTableStatistics rowCountStatistics = null;
private static final Logger LOGGER =
LoggerFactory.getLogger(BigQueryTable.class);
BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) {
@@ -57,7 +57,7 @@ class BigQueryTable extends BaseBeamTable implements
Serializable {
}
@Override
- public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+ public BeamTableStatistics getRowCount(PipelineOptions options) {
if (rowCountStatistics == null) {
rowCountStatistics = getRowCountFromBQ(options, bqLocation);
@@ -93,22 +93,22 @@ class BigQueryTable extends BaseBeamTable implements
Serializable {
.to(bqLocation));
}
- private static BeamRowCountStatistics getRowCountFromBQ(PipelineOptions o,
String bqLocation) {
+ private static BeamTableStatistics getRowCountFromBQ(PipelineOptions o,
String bqLocation) {
try {
BigInteger rowCount =
BigQueryHelpers.getNumRows(
o.as(BigQueryOptions.class),
BigQueryHelpers.parseTableSpec(bqLocation));
if (rowCount == null) {
- return BeamRowCountStatistics.UNKNOWN;
+ return BeamTableStatistics.UNKNOWN;
}
- return BeamRowCountStatistics.createBoundedTableStatistics(rowCount);
+ return
BeamTableStatistics.createBoundedTableStatistics(rowCount.doubleValue());
} catch (IOException | InterruptedException e) {
LOGGER.warn("Could not get the row count for the table " + bqLocation,
e);
}
- return BeamRowCountStatistics.UNKNOWN;
+ return BeamTableStatistics.UNKNOWN;
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index 0a6df10..d093eb6 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -21,7 +21,6 @@ import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import com.google.auto.service.AutoService;
import java.io.Serializable;
-import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -31,7 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -140,9 +139,9 @@ public class TestTableProvider extends
InMemoryMetaTableProvider {
}
@Override
- public BeamRowCountStatistics getRowCount(PipelineOptions options) {
- return BeamRowCountStatistics.createBoundedTableStatistics(
- BigInteger.valueOf(tableWithRows.getRows().size()));
+ public BeamTableStatistics getRowCount(PipelineOptions options) {
+ return BeamTableStatistics.createBoundedTableStatistics(
+ (double) tableWithRows.getRows().size());
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
index 60232ab..a30269f 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
@@ -18,9 +18,8 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.text;
import java.io.IOException;
-import java.math.BigInteger;
import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.TextRowCountEstimator;
@@ -51,7 +50,7 @@ public class TextTable extends BaseBeamTable {
private static final TextRowCountEstimator.SamplingStrategy
DEFAULT_SAMPLING_STRATEGY =
new TextRowCountEstimator.LimitNumberOfTotalBytes(1024 * 1024L);
private final String filePattern;
- private BeamRowCountStatistics rowCountStatistics = null;
+ private BeamTableStatistics rowCountStatistics = null;
private static final Logger LOGGER =
LoggerFactory.getLogger(TextTable.class);
/** Text table with the specified read and write transforms. */
@@ -71,7 +70,7 @@ public class TextTable extends BaseBeamTable {
}
@Override
- public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+ public BeamTableStatistics getRowCount(PipelineOptions options) {
if (rowCountStatistics == null) {
rowCountStatistics = getTextRowEstimate(options, getFilePattern());
}
@@ -79,7 +78,7 @@ public class TextTable extends BaseBeamTable {
return rowCountStatistics;
}
- private static BeamRowCountStatistics getTextRowEstimate(
+ private static BeamTableStatistics getTextRowEstimate(
PipelineOptions options, String filePattern) {
TextRowCountEstimator textRowCountEstimator =
TextRowCountEstimator.builder()
@@ -87,12 +86,12 @@ public class TextTable extends BaseBeamTable {
.setSamplingStrategy(DEFAULT_SAMPLING_STRATEGY)
.build();
try {
- Long rows = textRowCountEstimator.estimateRowCount(options);
- return
BeamRowCountStatistics.createBoundedTableStatistics(BigInteger.valueOf(rows));
+ Double rows = textRowCountEstimator.estimateRowCount(options);
+ return BeamTableStatistics.createBoundedTableStatistics(rows);
} catch (IOException | TextRowCountEstimator.NoEstimationException e) {
LOGGER.warn("Could not get the row count for the text table " +
filePattern, e);
}
- return BeamRowCountStatistics.UNKNOWN;
+ return BeamTableStatistics.UNKNOWN;
}
@Override
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
index 9671a44..c8dd998d 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rule;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -85,25 +84,28 @@ public class JoinReorderingTest {
createThreeTables(tableProvider);
Assert.assertEquals(
- BigInteger.ONE,
+ 1d,
tableProvider
.buildBeamSqlTable(tableProvider.getTable("small_table"))
.getRowCount(null)
- .getRowCount());
+ .getRowCount(),
+ 0.01);
Assert.assertEquals(
- BigInteger.valueOf(3),
+ 3d,
tableProvider
.buildBeamSqlTable(tableProvider.getTable("medium_table"))
.getRowCount(null)
- .getRowCount());
+ .getRowCount(),
+ 0.01);
Assert.assertEquals(
- BigInteger.valueOf(100),
+ 100d,
tableProvider
.buildBeamSqlTable(tableProvider.getTable("large_table"))
.getRowCount(null)
- .getRowCount());
+ .getRowCount(),
+ 0.01);
}
@Test
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
index dc03923..b70092e 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
@@ -27,11 +27,10 @@ import static org.junit.Assert.assertTrue;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
-import java.math.BigInteger;
import java.util.stream.Stream;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
@@ -61,9 +60,9 @@ public class BigQueryRowCountIT {
BigQueryTableProvider provider = new BigQueryTableProvider();
Table table = getTable("testTable", bigQuery.tableSpec());
BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
- BeamRowCountStatistics size =
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+ BeamTableStatistics size =
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
assertNotNull(size);
- assertEquals(BigInteger.ZERO, size.getRowCount());
+ assertEquals(0d, size.getRowCount(), 0.1);
}
@Test
@@ -91,10 +90,10 @@ public class BigQueryRowCountIT {
pipeline.run().waitUntilFinish();
BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
- BeamRowCountStatistics size1 =
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+ BeamTableStatistics size1 =
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
assertNotNull(size1);
- assertEquals(BigInteger.valueOf(3), size1.getRowCount());
+ assertEquals(3d, size1.getRowCount(), 0.1);
}
/** This tests if the pipeline options are injected in the path of SQL
Transform. */
@@ -143,7 +142,7 @@ public class BigQueryRowCountIT {
Table table = getTable("fakeTable", "project:dataset.table");
BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
- BeamRowCountStatistics size =
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+ BeamTableStatistics size =
sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
assertTrue(size.isUnknown());
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
index db954ae..a801814 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -34,7 +34,7 @@ public class BigQueryTestTable extends BigQueryTable {
}
@Override
- public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+ public BeamTableStatistics getRowCount(PipelineOptions options) {
jobName = options.getJobName();
return super.getRowCount(options);
}