This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ab6391767e Spark: Aggregate push down followup (#6923)
ab6391767e is described below

commit ab6391767e75357a9a12691513d9f83a4180f7e4
Author: Huaxin Gao <[email protected]>
AuthorDate: Sun Feb 26 11:49:42 2023 -0800

    Spark: Aggregate push down followup (#6923)
---
 .../main/java/org/apache/iceberg/TableScan.java    |   9 --
 .../java/org/apache/iceberg/BaseTableScan.java     |   5 -
 .../java/org/apache/iceberg/TableScanContext.java  |  17 ---
 .../spark/extensions/TestMergeOnReadDelete.java    |   8 +-
 .../org/apache/iceberg/spark/SparkAggregates.java  |   6 +-
 .../apache/iceberg/spark/SparkSQLProperties.java   |   2 +-
 .../iceberg/spark/source/SparkAggregates.java      |  69 ---------
 .../iceberg/spark/source/SparkLocalScan.java       |  23 ++-
 .../iceberg/spark/source/SparkScanBuilder.java     |  17 ++-
 .../iceberg/spark/sql/TestAggregatePushDown.java   | 166 +++++++++++----------
 10 files changed, 119 insertions(+), 203 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java 
b/api/src/main/java/org/apache/iceberg/TableScan.java
index 3c3c44b369..5d2a1269d6 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -101,13 +101,4 @@ public interface TableScan extends Scan<TableScan, 
FileScanTask, CombinedScanTas
    * @return the Snapshot this scan will use
    */
   Snapshot snapshot();
-
-  /**
-   * Create a new {@link TableScan} from this scan's configuration that will 
have column stats
-   *
-   * @return a new scan based on this with column stats
-   */
-  default TableScan withColStats() {
-    throw new UnsupportedOperationException("scan with colStats is not 
supported");
-  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java 
b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index f9399041a0..317e50e22e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -47,9 +47,4 @@ abstract class BaseTableScan extends SnapshotScan<TableScan, 
FileScanTask, Combi
     return TableScanUtil.planTasks(
         splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());
   }
-
-  @Override
-  public TableScan withColStats() {
-    return newRefinedScan(table(), tableSchema(), 
context().withColStats(true));
-  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java 
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index d938f16db1..6a3c7cc6e9 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -374,21 +374,4 @@ final class TableScanContext {
         fromSnapshotInclusive,
         reporter);
   }
-
-  TableScanContext withColStats(boolean stats) {
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        stats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        fromSnapshotId,
-        toSnapshotId,
-        planExecutor,
-        fromSnapshotInclusive,
-        metricsReporter);
-  }
 }
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
index 7793009311..d7b6c0cda4 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
@@ -134,11 +134,11 @@ public class TestMergeOnReadDelete extends TestDelete {
     String select = "SELECT max(data), min(data), count(data) FROM %s";
 
     List<Object[]> explain = sql("EXPLAIN " + select, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
-        || explainString.contains("min(data)".toLowerCase(Locale.ROOT))
-        || explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("max(data)")
+        || explainString.contains("min(data)")
+        || explainString.contains("count(data)")) {
       explainContainsPushDownAggregates = true;
     }
 
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkAggregates.java 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkAggregates.java
index dbc50fa989..153ef11a9e 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkAggregates.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkAggregates.java
@@ -48,7 +48,7 @@ public class SparkAggregates {
         case COUNT:
           Count countAgg = (Count) aggregate;
           if (countAgg.isDistinct()) {
-            // manifest file doesn't have count distinct so this can't be 
converted to push down
+            // manifest file doesn't have count distinct so this can't be 
pushed down
             return null;
           }
 
@@ -57,8 +57,10 @@ public class SparkAggregates {
           } else {
             return null;
           }
+
         case COUNT_STAR:
           return Expressions.countStar();
+
         case MAX:
           Max maxAgg = (Max) aggregate;
           if (maxAgg.column() instanceof NamedReference) {
@@ -66,6 +68,7 @@ public class SparkAggregates {
           } else {
             return null;
           }
+
         case MIN:
           Min minAgg = (Min) aggregate;
           if (minAgg.column() instanceof NamedReference) {
@@ -75,6 +78,7 @@ public class SparkAggregates {
           }
       }
     }
+
     return null;
   }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index a5484d26c1..d36ce76f62 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -50,7 +50,7 @@ public class SparkSQLProperties {
 
   // Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg
   public static final String AGGREGATE_PUSH_DOWN_ENABLED =
-      "spark.sql.iceberg.aggregate-push-down-enabled";
+      "spark.sql.iceberg.aggregate-push-down.enabled";
   public static final boolean AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT = true;
 
   // Controls write distribution mode
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
deleted file mode 100644
index 15ea53495b..0000000000
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark.source;
-
-import java.util.Map;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expression.Operation;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.spark.SparkUtil;
-import org.apache.spark.sql.connector.expressions.NamedReference;
-import org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc;
-import org.apache.spark.sql.connector.expressions.aggregate.Count;
-import org.apache.spark.sql.connector.expressions.aggregate.CountStar;
-import org.apache.spark.sql.connector.expressions.aggregate.Max;
-import org.apache.spark.sql.connector.expressions.aggregate.Min;
-
-public class SparkAggregates {
-
-  private SparkAggregates() {}
-
-  private static final Map<Class<? extends AggregateFunc>, Operation> 
AGGREGATES =
-      ImmutableMap.<Class<? extends AggregateFunc>, Operation>builder()
-          .put(Count.class, Operation.COUNT)
-          .put(CountStar.class, Operation.COUNT_STAR)
-          .put(Max.class, Operation.MAX)
-          .put(Min.class, Operation.MIN)
-          .build();
-
-  public static Expression convert(AggregateFunc aggregate) {
-    Operation op = AGGREGATES.get(aggregate.getClass());
-    if (op != null) {
-      switch (op) {
-        case COUNT:
-          Count countAgg = (Count) aggregate;
-          assert (countAgg.column() instanceof NamedReference);
-          return Expressions.count(SparkUtil.toColumnName((NamedReference) 
countAgg.column()));
-        case COUNT_STAR:
-          return Expressions.countStar();
-        case MAX:
-          Max maxAgg = (Max) aggregate;
-          assert (maxAgg.column() instanceof NamedReference);
-          return Expressions.max(SparkUtil.toColumnName((NamedReference) 
maxAgg.column()));
-        case MIN:
-          Min minAgg = (Min) aggregate;
-          assert (minAgg.column() instanceof NamedReference);
-          return Expressions.min(SparkUtil.toColumnName((NamedReference) 
minAgg.column()));
-      }
-    }
-
-    throw new UnsupportedOperationException("Unsupported aggregate: " + 
aggregate);
-  }
-}
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
index 8d3b6b7bde..c2f9707775 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
@@ -18,12 +18,13 @@
  */
 package org.apache.iceberg.spark.source;
 
-import java.util.Arrays;
-import java.util.stream.Collectors;
+import java.util.List;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.read.LocalScan;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
 class SparkLocalScan implements LocalScan {
@@ -31,11 +32,14 @@ class SparkLocalScan implements LocalScan {
   private final Table table;
   private final StructType readSchema;
   private final InternalRow[] rows;
+  private final List<Expression> filterExpressions;
 
-  SparkLocalScan(Table table, StructType readSchema, InternalRow[] rows) {
+  SparkLocalScan(
+      Table table, StructType readSchema, InternalRow[] rows, List<Expression> 
filterExpressions) {
     this.table = table;
     this.readSchema = readSchema;
     this.rows = rows;
+    this.filterExpressions = filterExpressions;
   }
 
   @Override
@@ -50,8 +54,13 @@ class SparkLocalScan implements LocalScan {
 
   @Override
   public String description() {
-    String fields =
-        
Arrays.stream(readSchema.fields()).map(StructField::name).collect(Collectors.joining(",
 "));
-    return String.format("%s [%s]", table, fields);
+    return String.format("%s [filters=%s]", table, 
Spark3Util.describe(filterExpressions));
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergLocalScan(table=%s, type=%s, filters=%s)",
+        table, SparkSchemaUtil.convert(readSchema).asStruct(), 
filterExpressions);
   }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 1bc751e30b..ee1d86531f 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -47,6 +47,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkAggregates;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.spark.SparkReadOptions;
@@ -188,13 +189,12 @@ public class SparkScanBuilder
         if (expr != null) {
           Expression bound = Binder.bind(schema.asStruct(), expr, 
caseSensitive);
           expressions.add((BoundAggregate<?, ?>) bound);
+        } else {
+          LOG.info(
+              "Skipping aggregate pushdown: AggregateFunc {} can't be 
converted to iceberg expression",
+              aggregateFunc);
+          return false;
         }
-      } catch (UnsupportedOperationException e) {
-        LOG.info(
-            "Skipping aggregate pushdown: AggregateFunc {} can't be converted 
to iceberg Expression",
-            aggregateFunc,
-            e);
-        return false;
       } catch (IllegalArgumentException e) {
         LOG.info("Skipping aggregate pushdown: Bind failed for AggregateFunc 
{}", aggregateFunc, e);
         return false;
@@ -207,7 +207,7 @@ public class SparkScanBuilder
       return false;
     }
 
-    TableScan scan = table.newScan().withColStats();
+    TableScan scan = table.newScan().includeColumnStats();
     Snapshot snapshot = readSnapshot();
     if (snapshot == null) {
       LOG.info("Skipping aggregate pushdown: table snapshot is null");
@@ -242,7 +242,8 @@ public class SparkScanBuilder
     StructLike structLike = aggregateEvaluator.result();
     pushedAggregateRows[0] =
         new 
StructInternalRow(aggregateEvaluator.resultType()).setStruct(structLike);
-    localScan = new SparkLocalScan(table, pushedAggregateSchema, 
pushedAggregateRows);
+    localScan =
+        new SparkLocalScan(table, pushedAggregateSchema, pushedAggregateRows, 
filterExpressions);
 
     return true;
   }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
index 9cecf89bba..37ae96a248 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
@@ -121,30 +121,30 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
             + "max(binary_data), min(binary_data), count(binary_data) FROM %s";
 
     List<Object[]> explain = sql("EXPLAIN " + select, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("count(*)".toLowerCase(Locale.ROOT))
-        && explainString.contains("max(id)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(id)".toLowerCase(Locale.ROOT))
-        && explainString.contains("count(id)".toLowerCase(Locale.ROOT))
-        && explainString.contains("max(int_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(int_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("count(int_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("max(boolean_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(boolean_data)".toLowerCase(Locale.ROOT))
-        && 
explainString.contains("count(boolean_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("max(float_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(float_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("count(float_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("max(double_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(double_data)".toLowerCase(Locale.ROOT))
-        && 
explainString.contains("count(double_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("max(decimal_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(decimal_data)".toLowerCase(Locale.ROOT))
-        && 
explainString.contains("count(decimal_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("max(binary_data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(binary_data)".toLowerCase(Locale.ROOT))
-        && 
explainString.contains("count(binary_data)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("count(*)")
+        && explainString.contains("max(id)")
+        && explainString.contains("min(id)")
+        && explainString.contains("count(id)")
+        && explainString.contains("max(int_data)")
+        && explainString.contains("min(int_data)")
+        && explainString.contains("count(int_data)")
+        && explainString.contains("max(boolean_data)")
+        && explainString.contains("min(boolean_data)")
+        && explainString.contains("count(boolean_data)")
+        && explainString.contains("max(float_data)")
+        && explainString.contains("min(float_data)")
+        && explainString.contains("count(float_data)")
+        && explainString.contains("max(double_data)")
+        && explainString.contains("min(double_data)")
+        && explainString.contains("count(double_data)")
+        && explainString.contains("max(decimal_data)")
+        && explainString.contains("min(decimal_data)")
+        && explainString.contains("count(decimal_data)")
+        && explainString.contains("max(binary_data)")
+        && explainString.contains("min(binary_data)")
+        && explainString.contains("count(binary_data)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -197,14 +197,14 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     String select = "SELECT max(d), min(d), count(d), max(ts), min(ts), 
count(ts) FROM %s";
 
     List<Object[]> explain = sql("EXPLAIN " + select, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("max(d)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(d)".toLowerCase(Locale.ROOT))
-        && explainString.contains("count(d)".toLowerCase(Locale.ROOT))
-        && explainString.contains("max(ts)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(ts)".toLowerCase(Locale.ROOT))
-        && explainString.contains("count(ts)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("max(d)")
+        && explainString.contains("min(d)")
+        && explainString.contains("count(d)")
+        && explainString.contains("max(ts)")
+        && explainString.contains("min(ts)")
+        && explainString.contains("count(ts)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -234,9 +234,9 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     String select = "SELECT COUNT(data), SUM(data) FROM %s";
 
     List<Object[]> explain = sql("EXPLAIN " + select, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("count(data)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -268,9 +268,9 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     String select1 = "SELECT COUNT(data) FROM %s";
 
     List<Object[]> explain1 = sql("EXPLAIN " + select1, tableName);
-    String explainString1 = explain1.get(0)[0].toString();
+    String explainString1 = 
explain1.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString1.contains("count(data)".toLowerCase(Locale.ROOT))) {
+    if (explainString1.contains("count(data)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -285,8 +285,8 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
 
     String select2 = "SELECT COUNT(id) FROM %s";
     List<Object[]> explain2 = sql("EXPLAIN " + select2, tableName);
-    String explainString2 = explain2.get(0)[0].toString();
-    if (explainString2.contains("count(id)".toLowerCase(Locale.ROOT))) {
+    String explainString2 = 
explain2.get(0)[0].toString().toLowerCase(Locale.ROOT);
+    if (explainString2.contains("count(id)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -302,8 +302,8 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     String select3 = "SELECT COUNT(id), MAX(id) FROM %s";
     explainContainsPushDownAggregates = false;
     List<Object[]> explain3 = sql("EXPLAIN " + select3, tableName);
-    String explainString3 = explain3.get(0)[0].toString();
-    if (explainString3.contains("count(id)".toLowerCase(Locale.ROOT))) {
+    String explainString3 = 
explain3.get(0)[0].toString().toLowerCase(Locale.ROOT);
+    if (explainString3.contains("count(id)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -324,13 +324,16 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     sql(
         "INSERT INTO TABLE %s VALUES (1, '1111'), (1, '2222'), (2, '3333'), 
(2, '4444'), (3, '5555'), (3, '6666') ",
         tableName);
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
+        tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "truncate(16)");
 
     String select1 = "SELECT MAX(id), MAX(data) FROM %s";
 
     List<Object[]> explain1 = sql("EXPLAIN " + select1, tableName);
-    String explainString1 = explain1.get(0)[0].toString();
+    String explainString1 = 
explain1.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString1.contains("max(id)".toLowerCase(Locale.ROOT))) {
+    if (explainString1.contains("max(id)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -344,8 +347,8 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
 
     String select2 = "SELECT COUNT(data) FROM %s";
     List<Object[]> explain2 = sql("EXPLAIN " + select2, tableName);
-    String explainString2 = explain2.get(0)[0].toString();
-    if (explainString2.contains("count(data)".toLowerCase(Locale.ROOT))) {
+    String explainString2 = 
explain2.get(0)[0].toString().toLowerCase(Locale.ROOT);
+    if (explainString2.contains("count(data)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -355,7 +358,7 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     List<Object[]> actual2 = sql(select2, tableName);
     List<Object[]> expected2 = Lists.newArrayList();
     expected2.add(new Object[] {6L});
-    assertEquals("min/max/count push down", expected2, actual2);
+    assertEquals("expected and actual should equal", expected2, actual2);
 
     explainContainsPushDownAggregates = false;
     sql(
@@ -363,9 +366,8 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
         tableName, TableProperties.DEFAULT_WRITE_METRICS_MODE, "full");
     String select3 = "SELECT count(data), max(data) FROM %s";
     List<Object[]> explain3 = sql("EXPLAIN " + select3, tableName);
-    String explainString3 = explain3.get(0)[0].toString();
-    if (explainString3.contains("count(data)".toLowerCase(Locale.ROOT))
-        && explainString3.contains("max(data)".toLowerCase(Locale.ROOT))) {
+    String explainString3 = 
explain3.get(0)[0].toString().toLowerCase(Locale.ROOT);
+    if (explainString3.contains("count(data)") && 
explainString3.contains("max(data)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -411,9 +413,9 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     String select = "SELECT MIN(data) FROM %s WHERE id > 1";
 
     List<Object[]> explain = sql("EXPLAIN " + select, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("min(data)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("min(data)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -443,9 +445,9 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
         tableName);
     String select1 = "SELECT count(complex), count(id) FROM %s";
     List<Object[]> explain = sql("EXPLAIN " + select1, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("count(complex)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("count(complex)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -459,9 +461,9 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
 
     String select2 = "SELECT max(complex) FROM %s";
     explain = sql("EXPLAIN " + select2, tableName);
-    explainString = explain.get(0)[0].toString();
+    explainString = explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     explainContainsPushDownAggregates = false;
-    if (explainString.contains("max(complex)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("max(complex)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -478,11 +480,11 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     String select = "SELECT max(data), min(data), count(data) FROM %s";
 
     List<Object[]> explain = sql("EXPLAIN " + select, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("max(data)")
+        && explainString.contains("min(data)")
+        && explainString.contains("count(data)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -509,9 +511,9 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
 
     List<Object[]> explain1 =
         sql("EXPLAIN SELECT count(id) FROM %s VERSION AS OF %s", tableName, 
snapshotId);
-    String explainString1 = explain1.get(0)[0].toString();
+    String explainString1 = 
explain1.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates1 = false;
-    if (explainString1.contains("count(id)".toLowerCase(Locale.ROOT))) {
+    if (explainString1.contains("count(id)")) {
       explainContainsPushDownAggregates1 = true;
     }
     Assert.assertTrue("count pushed down", explainContainsPushDownAggregates1);
@@ -521,9 +523,9 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     assertEquals("count push down", expected1, actual1);
 
     List<Object[]> explain2 = sql("EXPLAIN SELECT count(id) FROM %s", 
tableName);
-    String explainString2 = explain2.get(0)[0].toString();
+    String explainString2 = 
explain2.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates2 = false;
-    if (explainString2.contains("count(id)".toLowerCase(Locale.ROOT))) {
+    if (explainString2.contains("count(id)")) {
       explainContainsPushDownAggregates2 = true;
     }
 
@@ -547,11 +549,11 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     String select = "SELECT count(*), max(data), min(data), count(data) FROM 
%s";
 
     List<Object[]> explain = sql("EXPLAIN " + select, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(data)".toLowerCase(Locale.ROOT))
-        && explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("max(data)")
+        && explainString.contains("min(data)")
+        && explainString.contains("count(data)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -578,11 +580,11 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     String select = "SELECT count(*), max(data), min(data), count(data) FROM 
%s";
 
     List<Object[]> explain = sql("EXPLAIN " + select, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
-        || explainString.contains("min(data)".toLowerCase(Locale.ROOT))
-        || explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("max(data)")
+        || explainString.contains("min(data)")
+        || explainString.contains("count(data)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -609,11 +611,11 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
     String select = "SELECT count(*), max(data), min(data), count(data) FROM 
%s";
 
     List<Object[]> explain = sql("EXPLAIN " + select, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("max(data)".toLowerCase(Locale.ROOT))
-        || explainString.contains("min(data)".toLowerCase(Locale.ROOT))
-        || explainString.contains("count(data)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("max(data)")
+        || explainString.contains("min(data)")
+        || explainString.contains("count(data)")) {
       explainContainsPushDownAggregates = true;
     }
 
@@ -643,17 +645,17 @@ public class TestAggregatePushDown extends 
SparkCatalogTestBase {
         "SELECT count(*), max(data1), min(data1), count(data1), max(data2), 
min(data2), count(data2), max(data3), min(data3), count(data3) FROM %s";
 
     List<Object[]> explain = sql("EXPLAIN " + select, tableName);
-    String explainString = explain.get(0)[0].toString();
+    String explainString = 
explain.get(0)[0].toString().toLowerCase(Locale.ROOT);
     boolean explainContainsPushDownAggregates = false;
-    if (explainString.contains("max(data1)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(data1)".toLowerCase(Locale.ROOT))
-        && explainString.contains("count(data1)".toLowerCase(Locale.ROOT))
-        && explainString.contains("max(data2)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(data2)".toLowerCase(Locale.ROOT))
-        && explainString.contains("count(data2)".toLowerCase(Locale.ROOT))
-        && explainString.contains("max(data3)".toLowerCase(Locale.ROOT))
-        && explainString.contains("min(data3)".toLowerCase(Locale.ROOT))
-        && explainString.contains("count(data3)".toLowerCase(Locale.ROOT))) {
+    if (explainString.contains("max(data1)")
+        && explainString.contains("min(data1)")
+        && explainString.contains("count(data1)")
+        && explainString.contains("max(data2)")
+        && explainString.contains("min(data2)")
+        && explainString.contains("count(data2)")
+        && explainString.contains("max(data3)")
+        && explainString.contains("min(data3)")
+        && explainString.contains("count(data3)")) {
       explainContainsPushDownAggregates = true;
     }
 

Reply via email to