aokolnychyi commented on code in PR #6622:
URL: https://github.com/apache/iceberg/pull/6622#discussion_r1117655788


##########
api/src/main/java/org/apache/iceberg/TableScan.java:
##########
@@ -101,4 +101,13 @@ default TableScan appendsAfter(long fromSnapshotId) {
    * @return the Snapshot this scan will use
    */
   Snapshot snapshot();
+
+  /**
+   * Create a new {@link TableScan} from this scan's configuration that will 
have column stats
+   *
+   * @return a new scan based on this with column stats
+   */
+  default TableScan withColStats() {

Review Comment:
   Why do we have to add this if we already have `includeColumnStats` defined 
in `Scan`?
   I think we should be able to use that.



##########
core/src/main/java/org/apache/iceberg/TableScanContext.java:
##########
@@ -374,4 +374,21 @@ TableScanContext reportWith(MetricsReporter reporter) {
         fromSnapshotInclusive,
         reporter);
   }
+
+  TableScanContext withColStats(boolean stats) {

Review Comment:
   I think this is no different compared to `shouldReturnColumnStats` above, 
which is already used by scans.
   If we switch to `scan$includeColumnStats`, this won't be needed.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java:
##########
@@ -48,6 +48,11 @@ private SparkSQLProperties() {}
       "spark.sql.iceberg.planning.preserve-data-grouping";
   public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false;
 
+  // Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg
+  public static final String AGGREGATE_PUSH_DOWN_ENABLED =

Review Comment:
   In my view, this name violates our naming policy for SQL configs. We usually 
use `.enabled` in table props and in SQL props and `-enabled` in catalog and 
read/write options. I think this should be changed to this.
   
   ```
   spark.sql.iceberg.aggregate-push-down.enabled
   ```
   
   That would match our other SQL properties and what Spark does itself.
   
   ```
   spark.sql.iceberg.vectorization.enabled
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc;
+import org.apache.spark.sql.connector.expressions.aggregate.Count;
+import org.apache.spark.sql.connector.expressions.aggregate.CountStar;
+import org.apache.spark.sql.connector.expressions.aggregate.Max;
+import org.apache.spark.sql.connector.expressions.aggregate.Min;
+
+public class SparkAggregates {
+
+  private SparkAggregates() {}
+
+  private static final Map<Class<? extends AggregateFunc>, Operation> 
AGGREGATES =
+      ImmutableMap.<Class<? extends AggregateFunc>, Operation>builder()
+          .put(Count.class, Operation.COUNT)
+          .put(CountStar.class, Operation.COUNT_STAR)
+          .put(Max.class, Operation.MAX)
+          .put(Min.class, Operation.MIN)
+          .build();
+
+  public static Expression convert(AggregateFunc aggregate) {
+    Operation op = AGGREGATES.get(aggregate.getClass());
+    if (op != null) {
+      switch (op) {
+        case COUNT:
+          Count countAgg = (Count) aggregate;
+          assert (countAgg.column() instanceof NamedReference);
+          return Expressions.count(SparkUtil.toColumnName((NamedReference) 
countAgg.column()));
+        case COUNT_STAR:
+          return Expressions.countStar();
+        case MAX:
+          Max maxAgg = (Max) aggregate;
+          assert (maxAgg.column() instanceof NamedReference);
+          return Expressions.max(SparkUtil.toColumnName((NamedReference) 
maxAgg.column()));
+        case MIN:
+          Min minAgg = (Min) aggregate;
+          assert (minAgg.column() instanceof NamedReference);
+          return Expressions.min(SparkUtil.toColumnName((NamedReference) 
minAgg.column()));
+      }
+    }
+
+    throw new UnsupportedOperationException("Unsupported aggregate: " + 
aggregate);

Review Comment:
   Do we throw an exception deliberately? I think we return `null` whenever 
converting filters.
   Why not follow that?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.iceberg.Table;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.LocalScan;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class SparkLocalScan implements LocalScan {

Review Comment:
   Shall we also override `toString`? I think we do that in other scans but I 
am not sure how helpful it is.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc;
+import org.apache.spark.sql.connector.expressions.aggregate.Count;
+import org.apache.spark.sql.connector.expressions.aggregate.CountStar;
+import org.apache.spark.sql.connector.expressions.aggregate.Max;
+import org.apache.spark.sql.connector.expressions.aggregate.Min;
+
+public class SparkAggregates {

Review Comment:
   How come we have two `SparkAggregates` classes now? Do we need both?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.iceberg.Table;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.LocalScan;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+class SparkLocalScan implements LocalScan {
+
+  private final Table table;
+  private final StructType readSchema;
+  private final InternalRow[] rows;
+
+  SparkLocalScan(Table table, StructType readSchema, InternalRow[] rows) {
+    this.table = table;
+    this.readSchema = readSchema;
+    this.rows = rows;
+  }
+
+  @Override
+  public InternalRow[] rows() {
+    return rows;
+  }
+
+  @Override
+  public StructType readSchema() {
+    return readSchema;
+  }
+
+  @Override
+  public String description() {

Review Comment:
   The Javadoc of this method explicitly states that we should not include read 
schema as it is already done by Spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to