This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ab6391767e Spark: Aggregate push down followup (#6923)
ab6391767e is described below
commit ab6391767e75357a9a12691513d9f83a4180f7e4
Author: Huaxin Gao <[email protected]>
AuthorDate: Sun Feb 26 11:49:42 2023 -0800
Spark: Aggregate push down followup (#6923)
---
.../main/java/org/apache/iceberg/TableScan.java | 9 --
.../java/org/apache/iceberg/BaseTableScan.java | 5 -
.../java/org/apache/iceberg/TableScanContext.java | 17 ---
.../spark/extensions/TestMergeOnReadDelete.java | 8 +-
.../org/apache/iceberg/spark/SparkAggregates.java | 6 +-
.../apache/iceberg/spark/SparkSQLProperties.java | 2 +-
.../iceberg/spark/source/SparkAggregates.java | 69 ---------
.../iceberg/spark/source/SparkLocalScan.java | 23 ++-
.../iceberg/spark/source/SparkScanBuilder.java | 17 ++-
.../iceberg/spark/sql/TestAggregatePushDown.java | 166 +++++++++++----------
10 files changed, 119 insertions(+), 203 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java
b/api/src/main/java/org/apache/iceberg/TableScan.java
index 3c3c44b369..5d2a1269d6 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -101,13 +101,4 @@ public interface TableScan extends Scan<TableScan,
FileScanTask, CombinedScanTas
* @return the Snapshot this scan will use
*/
Snapshot snapshot();
-
- /**
- * Create a new {@link TableScan} from this scan's configuration that will
have column stats
- *
- * @return a new scan based on this with column stats
- */
- default TableScan withColStats() {
- throw new UnsupportedOperationException("scan with colStats is not
supported");
- }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index f9399041a0..317e50e22e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -47,9 +47,4 @@ abstract class BaseTableScan extends SnapshotScan<TableScan,
FileScanTask, Combi
return TableScanUtil.planTasks(
splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());
}
-
- @Override
- public TableScan withColStats() {
- return newRefinedScan(table(), tableSchema(),
context().withColStats(true));
- }
}
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index d938f16db1..6a3c7cc6e9 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -374,21 +374,4 @@ final class TableScanContext {
fromSnapshotInclusive,
reporter);
}
-
- TableScanContext withColStats(boolean stats) {
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- stats,
- projectedSchema,
- selectedColumns,
- options,
- fromSnapshotId,
- toSnapshotId,
- planExecutor,
- fromSnapshotInclusive,
- metricsReporter);
- }
}
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
index 7793009311..d7b6c0cda4 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
@@ -134,11 +134,11 @@ public class TestMergeOnReadDelete extends TestDelete {
String select = "SELECT max(data), min(data), count(data) FROM %s";
List<Object[]> explain = sql("EXPLAIN " + select, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
- || explainString.contains("min(data)".toLowerCase(Locale.ROOT))
- || explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("max(data)")
+ || explainString.contains("min(data)")
+ || explainString.contains("count(data)")) {
explainContainsPushDownAggregates = true;
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkAggregates.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkAggregates.java
index dbc50fa989..153ef11a9e 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkAggregates.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkAggregates.java
@@ -48,7 +48,7 @@ public class SparkAggregates {
case COUNT:
Count countAgg = (Count) aggregate;
if (countAgg.isDistinct()) {
- // manifest file doesn't have count distinct so this can't be
converted to push down
+ // manifest file doesn't have count distinct so this can't be
pushed down
return null;
}
@@ -57,8 +57,10 @@ public class SparkAggregates {
} else {
return null;
}
+
case COUNT_STAR:
return Expressions.countStar();
+
case MAX:
Max maxAgg = (Max) aggregate;
if (maxAgg.column() instanceof NamedReference) {
@@ -66,6 +68,7 @@ public class SparkAggregates {
} else {
return null;
}
+
case MIN:
Min minAgg = (Min) aggregate;
if (minAgg.column() instanceof NamedReference) {
@@ -75,6 +78,7 @@ public class SparkAggregates {
}
}
}
+
return null;
}
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index a5484d26c1..d36ce76f62 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -50,7 +50,7 @@ public class SparkSQLProperties {
// Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg
public static final String AGGREGATE_PUSH_DOWN_ENABLED =
- "spark.sql.iceberg.aggregate-push-down-enabled";
+ "spark.sql.iceberg.aggregate-push-down.enabled";
public static final boolean AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT = true;
// Controls write distribution mode
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
deleted file mode 100644
index 15ea53495b..0000000000
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark.source;
-
-import java.util.Map;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expression.Operation;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.spark.SparkUtil;
-import org.apache.spark.sql.connector.expressions.NamedReference;
-import org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc;
-import org.apache.spark.sql.connector.expressions.aggregate.Count;
-import org.apache.spark.sql.connector.expressions.aggregate.CountStar;
-import org.apache.spark.sql.connector.expressions.aggregate.Max;
-import org.apache.spark.sql.connector.expressions.aggregate.Min;
-
-public class SparkAggregates {
-
- private SparkAggregates() {}
-
- private static final Map<Class<? extends AggregateFunc>, Operation>
AGGREGATES =
- ImmutableMap.<Class<? extends AggregateFunc>, Operation>builder()
- .put(Count.class, Operation.COUNT)
- .put(CountStar.class, Operation.COUNT_STAR)
- .put(Max.class, Operation.MAX)
- .put(Min.class, Operation.MIN)
- .build();
-
- public static Expression convert(AggregateFunc aggregate) {
- Operation op = AGGREGATES.get(aggregate.getClass());
- if (op != null) {
- switch (op) {
- case COUNT:
- Count countAgg = (Count) aggregate;
- assert (countAgg.column() instanceof NamedReference);
- return Expressions.count(SparkUtil.toColumnName((NamedReference)
countAgg.column()));
- case COUNT_STAR:
- return Expressions.countStar();
- case MAX:
- Max maxAgg = (Max) aggregate;
- assert (maxAgg.column() instanceof NamedReference);
- return Expressions.max(SparkUtil.toColumnName((NamedReference)
maxAgg.column()));
- case MIN:
- Min minAgg = (Min) aggregate;
- assert (minAgg.column() instanceof NamedReference);
- return Expressions.min(SparkUtil.toColumnName((NamedReference)
minAgg.column()));
- }
- }
-
- throw new UnsupportedOperationException("Unsupported aggregate: " +
aggregate);
- }
-}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
index 8d3b6b7bde..c2f9707775 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
@@ -18,12 +18,13 @@
*/
package org.apache.iceberg.spark.source;
-import java.util.Arrays;
-import java.util.stream.Collectors;
+import java.util.List;
import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.LocalScan;
-import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
class SparkLocalScan implements LocalScan {
@@ -31,11 +32,14 @@ class SparkLocalScan implements LocalScan {
private final Table table;
private final StructType readSchema;
private final InternalRow[] rows;
+ private final List<Expression> filterExpressions;
- SparkLocalScan(Table table, StructType readSchema, InternalRow[] rows) {
+ SparkLocalScan(
+ Table table, StructType readSchema, InternalRow[] rows, List<Expression>
filterExpressions) {
this.table = table;
this.readSchema = readSchema;
this.rows = rows;
+ this.filterExpressions = filterExpressions;
}
@Override
@@ -50,8 +54,13 @@ class SparkLocalScan implements LocalScan {
@Override
public String description() {
- String fields =
-
Arrays.stream(readSchema.fields()).map(StructField::name).collect(Collectors.joining(",
"));
- return String.format("%s [%s]", table, fields);
+ return String.format("%s [filters=%s]", table,
Spark3Util.describe(filterExpressions));
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "IcebergLocalScan(table=%s, type=%s, filters=%s)",
+ table, SparkSchemaUtil.convert(readSchema).asStruct(),
filterExpressions);
}
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 1bc751e30b..ee1d86531f 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -47,6 +47,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkAggregates;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
@@ -188,13 +189,12 @@ public class SparkScanBuilder
if (expr != null) {
Expression bound = Binder.bind(schema.asStruct(), expr,
caseSensitive);
expressions.add((BoundAggregate<?, ?>) bound);
+ } else {
+ LOG.info(
+ "Skipping aggregate pushdown: AggregateFunc {} can't be
converted to iceberg expression",
+ aggregateFunc);
+ return false;
}
- } catch (UnsupportedOperationException e) {
- LOG.info(
- "Skipping aggregate pushdown: AggregateFunc {} can't be converted
to iceberg Expression",
- aggregateFunc,
- e);
- return false;
} catch (IllegalArgumentException e) {
LOG.info("Skipping aggregate pushdown: Bind failed for AggregateFunc
{}", aggregateFunc, e);
return false;
@@ -207,7 +207,7 @@ public class SparkScanBuilder
return false;
}
- TableScan scan = table.newScan().withColStats();
+ TableScan scan = table.newScan().includeColumnStats();
Snapshot snapshot = readSnapshot();
if (snapshot == null) {
LOG.info("Skipping aggregate pushdown: table snapshot is null");
@@ -242,7 +242,8 @@ public class SparkScanBuilder
StructLike structLike = aggregateEvaluator.result();
pushedAggregateRows[0] =
new
StructInternalRow(aggregateEvaluator.resultType()).setStruct(structLike);
- localScan = new SparkLocalScan(table, pushedAggregateSchema,
pushedAggregateRows);
+ localScan =
+ new SparkLocalScan(table, pushedAggregateSchema, pushedAggregateRows,
filterExpressions);
return true;
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
index 9cecf89bba..37ae96a248 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
@@ -121,30 +121,30 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
+ "max(binary_data), min(binary_data), count(binary_data) FROM %s";
List<Object[]> explain = sql("EXPLAIN " + select, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("count(*)".toLowerCase(Locale.ROOT))
- && explainString.contains("max(id)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(id)".toLowerCase(Locale.ROOT))
- && explainString.contains("count(id)".toLowerCase(Locale.ROOT))
- && explainString.contains("max(int_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(int_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("count(int_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("max(boolean_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(boolean_data)".toLowerCase(Locale.ROOT))
- &&
explainString.contains("count(boolean_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("max(float_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(float_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("count(float_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("max(double_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(double_data)".toLowerCase(Locale.ROOT))
- &&
explainString.contains("count(double_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("max(decimal_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(decimal_data)".toLowerCase(Locale.ROOT))
- &&
explainString.contains("count(decimal_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("max(binary_data)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(binary_data)".toLowerCase(Locale.ROOT))
- &&
explainString.contains("count(binary_data)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("count(*)")
+ && explainString.contains("max(id)")
+ && explainString.contains("min(id)")
+ && explainString.contains("count(id)")
+ && explainString.contains("max(int_data)")
+ && explainString.contains("min(int_data)")
+ && explainString.contains("count(int_data)")
+ && explainString.contains("max(boolean_data)")
+ && explainString.contains("min(boolean_data)")
+ && explainString.contains("count(boolean_data)")
+ && explainString.contains("max(float_data)")
+ && explainString.contains("min(float_data)")
+ && explainString.contains("count(float_data)")
+ && explainString.contains("max(double_data)")
+ && explainString.contains("min(double_data)")
+ && explainString.contains("count(double_data)")
+ && explainString.contains("max(decimal_data)")
+ && explainString.contains("min(decimal_data)")
+ && explainString.contains("count(decimal_data)")
+ && explainString.contains("max(binary_data)")
+ && explainString.contains("min(binary_data)")
+ && explainString.contains("count(binary_data)")) {
explainContainsPushDownAggregates = true;
}
@@ -197,14 +197,14 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select = "SELECT max(d), min(d), count(d), max(ts), min(ts),
count(ts) FROM %s";
List<Object[]> explain = sql("EXPLAIN " + select, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("max(d)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(d)".toLowerCase(Locale.ROOT))
- && explainString.contains("count(d)".toLowerCase(Locale.ROOT))
- && explainString.contains("max(ts)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(ts)".toLowerCase(Locale.ROOT))
- && explainString.contains("count(ts)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("max(d)")
+ && explainString.contains("min(d)")
+ && explainString.contains("count(d)")
+ && explainString.contains("max(ts)")
+ && explainString.contains("min(ts)")
+ && explainString.contains("count(ts)")) {
explainContainsPushDownAggregates = true;
}
@@ -234,9 +234,9 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select = "SELECT COUNT(data), SUM(data) FROM %s";
List<Object[]> explain = sql("EXPLAIN " + select, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("count(data)")) {
explainContainsPushDownAggregates = true;
}
@@ -268,9 +268,9 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select1 = "SELECT COUNT(data) FROM %s";
List<Object[]> explain1 = sql("EXPLAIN " + select1, tableName);
- String explainString1 = explain1.get(0)[0].toString();
+ String explainString1 =
explain1.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString1.contains("count(data)".toLowerCase(Locale.ROOT))) {
+ if (explainString1.contains("count(data)")) {
explainContainsPushDownAggregates = true;
}
@@ -285,8 +285,8 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select2 = "SELECT COUNT(id) FROM %s";
List<Object[]> explain2 = sql("EXPLAIN " + select2, tableName);
- String explainString2 = explain2.get(0)[0].toString();
- if (explainString2.contains("count(id)".toLowerCase(Locale.ROOT))) {
+ String explainString2 =
explain2.get(0)[0].toString().toLowerCase(Locale.ROOT);
+ if (explainString2.contains("count(id)")) {
explainContainsPushDownAggregates = true;
}
@@ -302,8 +302,8 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select3 = "SELECT COUNT(id), MAX(id) FROM %s";
explainContainsPushDownAggregates = false;
List<Object[]> explain3 = sql("EXPLAIN " + select3, tableName);
- String explainString3 = explain3.get(0)[0].toString();
- if (explainString3.contains("count(id)".toLowerCase(Locale.ROOT))) {
+ String explainString3 =
explain3.get(0)[0].toString().toLowerCase(Locale.ROOT);
+ if (explainString3.contains("count(id)")) {
explainContainsPushDownAggregates = true;
}
@@ -324,13 +324,16 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
sql(
"INSERT INTO TABLE %s VALUES (1, '1111'), (1, '2222'), (2, '3333'),
(2, '4444'), (3, '5555'), (3, '6666') ",
tableName);
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
+ tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "truncate(16)");
String select1 = "SELECT MAX(id), MAX(data) FROM %s";
List<Object[]> explain1 = sql("EXPLAIN " + select1, tableName);
- String explainString1 = explain1.get(0)[0].toString();
+ String explainString1 =
explain1.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString1.contains("max(id)".toLowerCase(Locale.ROOT))) {
+ if (explainString1.contains("max(id)")) {
explainContainsPushDownAggregates = true;
}
@@ -344,8 +347,8 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select2 = "SELECT COUNT(data) FROM %s";
List<Object[]> explain2 = sql("EXPLAIN " + select2, tableName);
- String explainString2 = explain2.get(0)[0].toString();
- if (explainString2.contains("count(data)".toLowerCase(Locale.ROOT))) {
+ String explainString2 =
explain2.get(0)[0].toString().toLowerCase(Locale.ROOT);
+ if (explainString2.contains("count(data)")) {
explainContainsPushDownAggregates = true;
}
@@ -355,7 +358,7 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
List<Object[]> actual2 = sql(select2, tableName);
List<Object[]> expected2 = Lists.newArrayList();
expected2.add(new Object[] {6L});
- assertEquals("min/max/count push down", expected2, actual2);
+ assertEquals("expected and actual should equal", expected2, actual2);
explainContainsPushDownAggregates = false;
sql(
@@ -363,9 +366,8 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "full");
String select3 = "SELECT count(data), max(data) FROM %s";
List<Object[]> explain3 = sql("EXPLAIN " + select3, tableName);
- String explainString3 = explain3.get(0)[0].toString();
- if (explainString3.contains("count(data)".toLowerCase(Locale.ROOT))
- && explainString3.contains("max(data)".toLowerCase(Locale.ROOT))) {
+ String explainString3 =
explain3.get(0)[0].toString().toLowerCase(Locale.ROOT);
+ if (explainString3.contains("count(data)") &&
explainString3.contains("max(data)")) {
explainContainsPushDownAggregates = true;
}
@@ -411,9 +413,9 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select = "SELECT MIN(data) FROM %s WHERE id > 1";
List<Object[]> explain = sql("EXPLAIN " + select, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("min(data)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("min(data)")) {
explainContainsPushDownAggregates = true;
}
@@ -443,9 +445,9 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
tableName);
String select1 = "SELECT count(complex), count(id) FROM %s";
List<Object[]> explain = sql("EXPLAIN " + select1, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("count(complex)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("count(complex)")) {
explainContainsPushDownAggregates = true;
}
@@ -459,9 +461,9 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select2 = "SELECT max(complex) FROM %s";
explain = sql("EXPLAIN " + select2, tableName);
- explainString = explain.get(0)[0].toString();
+ explainString = explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
explainContainsPushDownAggregates = false;
- if (explainString.contains("max(complex)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("max(complex)")) {
explainContainsPushDownAggregates = true;
}
@@ -478,11 +480,11 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select = "SELECT max(data), min(data), count(data) FROM %s";
List<Object[]> explain = sql("EXPLAIN " + select, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(data)".toLowerCase(Locale.ROOT))
- && explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("max(data)")
+ && explainString.contains("min(data)")
+ && explainString.contains("count(data)")) {
explainContainsPushDownAggregates = true;
}
@@ -509,9 +511,9 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
List<Object[]> explain1 =
sql("EXPLAIN SELECT count(id) FROM %s VERSION AS OF %s", tableName,
snapshotId);
- String explainString1 = explain1.get(0)[0].toString();
+ String explainString1 =
explain1.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates1 = false;
- if (explainString1.contains("count(id)".toLowerCase(Locale.ROOT))) {
+ if (explainString1.contains("count(id)")) {
explainContainsPushDownAggregates1 = true;
}
Assert.assertTrue("count pushed down", explainContainsPushDownAggregates1);
@@ -521,9 +523,9 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
assertEquals("count push down", expected1, actual1);
List<Object[]> explain2 = sql("EXPLAIN SELECT count(id) FROM %s",
tableName);
- String explainString2 = explain2.get(0)[0].toString();
+ String explainString2 =
explain2.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates2 = false;
- if (explainString2.contains("count(id)".toLowerCase(Locale.ROOT))) {
+ if (explainString2.contains("count(id)")) {
explainContainsPushDownAggregates2 = true;
}
@@ -547,11 +549,11 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select = "SELECT count(*), max(data), min(data), count(data) FROM
%s";
List<Object[]> explain = sql("EXPLAIN " + select, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(data)".toLowerCase(Locale.ROOT))
- && explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("max(data)")
+ && explainString.contains("min(data)")
+ && explainString.contains("count(data)")) {
explainContainsPushDownAggregates = true;
}
@@ -578,11 +580,11 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select = "SELECT count(*), max(data), min(data), count(data) FROM
%s";
List<Object[]> explain = sql("EXPLAIN " + select, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
- || explainString.contains("min(data)".toLowerCase(Locale.ROOT))
- || explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("max(data)")
+ || explainString.contains("min(data)")
+ || explainString.contains("count(data)")) {
explainContainsPushDownAggregates = true;
}
@@ -609,11 +611,11 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
String select = "SELECT count(*), max(data), min(data), count(data) FROM
%s";
List<Object[]> explain = sql("EXPLAIN " + select, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
- || explainString.contains("min(data)".toLowerCase(Locale.ROOT))
- || explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("max(data)")
+ || explainString.contains("min(data)")
+ || explainString.contains("count(data)")) {
explainContainsPushDownAggregates = true;
}
@@ -643,17 +645,17 @@ public class TestAggregatePushDown extends
SparkCatalogTestBase {
"SELECT count(*), max(data1), min(data1), count(data1), max(data2),
min(data2), count(data2), max(data3), min(data3), count(data3) FROM %s";
List<Object[]> explain = sql("EXPLAIN " + select, tableName);
- String explainString = explain.get(0)[0].toString();
+ String explainString =
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
boolean explainContainsPushDownAggregates = false;
- if (explainString.contains("max(data1)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(data1)".toLowerCase(Locale.ROOT))
- && explainString.contains("count(data1)".toLowerCase(Locale.ROOT))
- && explainString.contains("max(data2)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(data2)".toLowerCase(Locale.ROOT))
- && explainString.contains("count(data2)".toLowerCase(Locale.ROOT))
- && explainString.contains("max(data3)".toLowerCase(Locale.ROOT))
- && explainString.contains("min(data3)".toLowerCase(Locale.ROOT))
- && explainString.contains("count(data3)".toLowerCase(Locale.ROOT))) {
+ if (explainString.contains("max(data1)")
+ && explainString.contains("min(data1)")
+ && explainString.contains("count(data1)")
+ && explainString.contains("max(data2)")
+ && explainString.contains("min(data2)")
+ && explainString.contains("count(data2)")
+ && explainString.contains("max(data3)")
+ && explainString.contains("min(data3)")
+ && explainString.contains("count(data3)")) {
explainContainsPushDownAggregates = true;
}