deniskuzZ commented on code in PR #6224:
URL: https://github.com/apache/hive/pull/6224#discussion_r2784708704


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/variant/VariantProjectionUtil.java:
##########
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.variant;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+
+/**
+ * Utility class for handling Variant Projection Pushdown (Column Pruning).
+ */
+public final class VariantProjectionUtil {
+
+  private VariantProjectionUtil() {
+  }
+
+  public record VariantColumnDescriptor(
+      int rootColumnIndex,
+      int[] fieldPath,
+      String[] physicalPath,
+      Type prunedSchema) {
+  }
+
+  public record VariantProjection(
+      List<VariantColumnDescriptor> variantColumns,
+      List<ColumnDescriptor> requestedColumns) {
+
+    public static VariantProjection create(
+        MessageType fileSchema, Configuration conf, Schema icebergSchema) {
+      String columns = conf.get(IOConstants.COLUMNS);
+      if (columns == null || columns.isEmpty()) {
+        return null;
+      }
+
+      List<String> columnNames = 
DataWritableReadSupport.getColumnNames(columns);
+      if (columnNames == null || columnNames.isEmpty()) {
+        return null;
+      }
+
+      boolean readAll = ColumnProjectionUtils.isReadAllColumns(conf);
+      List<Integer> readColumnIds = 
ColumnProjectionUtils.getReadColumnIDs(conf);
+      Set<String> nestedPaths = 
ColumnProjectionUtils.getNestedColumnPaths(conf);
+
+      List<VariantColumnDescriptor> variantColumns =
+          discoverVariantColumns(fileSchema, icebergSchema, columnNames, 
readAll, readColumnIds, nestedPaths);
+      if (variantColumns.isEmpty()) {
+        return null;
+      }
+
+      List<ColumnDescriptor> requestedColumns = 
computeRequestedColumns(fileSchema, variantColumns);
+      if (requestedColumns.isEmpty()) {
+        return null;
+      }
+
+      return new VariantProjection(variantColumns, requestedColumns);
+    }
+  }
+
+  private static List<VariantColumnDescriptor> discoverVariantColumns(
+      MessageType fileSchema,
+      Schema icebergSchema,
+      List<String> columnNames,
+      boolean readAll,
+      List<Integer> readColumnIds,
+      Set<String> nestedPaths) {
+
+    boolean[] projected = projectedTopLevelColumns(readAll, readColumnIds, 
columnNames.size());
+
+    List<VariantColumnDescriptor> result = Lists.newArrayList();
+    for (int colIndex = 0; colIndex < columnNames.size(); colIndex++) {
+      if (!projected[colIndex]) {
+        continue;
+      }
+
+      String columnName = columnNames.get(colIndex);
+      // Resolve the logical Iceberg field (handling potential name mismatches 
from Hive)
+      Types.NestedField field = findIcebergField(icebergSchema, columnName, 
colIndex);
+      if (field == null || field.type() == null) {
+        continue;
+      }
+
+      // Resolve the physical Parquet type (handling schema evolution/IDs)
+      Type parquetType = findParquetType(fileSchema, field);
+      if (parquetType == null) {
+        continue;
+      }
+
+      collectVariantColumns(
+          colIndex,
+          field.type(),
+          parquetType,
+          new int[0],
+          new String[] { columnName },
+          new String[] { parquetType.getName() },
+          result,
+          nestedPaths);
+    }
+
+    return result;
+  }
+
+  /**
+   * Resolves the Iceberg field for the given Hive column.
+   * Prioritizes lookup by name, falling back to positional index if not found 
(e.g. rename).
+   */
+  private static Types.NestedField findIcebergField(Schema schema, String 
name, int index) {
+    Types.NestedField field = schema.findField(name);
+    if (field != null) {
+      return field;
+    }
+
+    // Fallback: If Hive's column configuration uses an old name (e.g. 
"payload") but the Iceberg schema
+    // uses the new name (e.g. "data"), try to match by position if the index 
is valid.
+    List<Types.NestedField> columns = schema.columns();
+    if (index >= 0 && index < columns.size()) {
+      return columns.get(index);
+    }
+
+    return null;
+  }
+
+  /**
+   * Resolves the Parquet Type for the given Iceberg field.
+   * Prioritizes lookup by Field ID (for schema evolution), falling back to 
name.
+   */
+  private static Type findParquetType(GroupType parent, Types.NestedField 
child) {
+    int id = child.fieldId();
+    for (Type candidate : parent.getFields()) {
+      if (candidate.getId() != null && candidate.getId().intValue() == id) {
+        return candidate;
+      }
+    }
+
+    String name = child.name();
+    if (name != null && !name.isEmpty() && parent.containsField(name)) {
+      return parent.getType(name);
+    }
+
+    return null;
+  }
+
+  private static boolean[] projectedTopLevelColumns(
+      boolean readAll, List<Integer> readColumnIds, int fieldCount) {
+    boolean[] projected = new boolean[fieldCount];
+    if (readAll) {
+      Arrays.fill(projected, true);
+      return projected;
+    }
+
+    if (readColumnIds == null || readColumnIds.isEmpty()) {
+      return projected;
+    }
+
+    for (Integer id : readColumnIds) {
+      if (id != null && id >= 0 && id < fieldCount) {
+        projected[id] = true;
+      }
+    }
+
+    return projected;
+  }
+
+  private static void collectVariantColumns(
+      int rootColumnIndex,
+      org.apache.iceberg.types.Type icebergType,
+      Type parquetType,
+      int[] fieldPath,
+      String[] logicalPath,
+      String[] physicalPath,
+      List<VariantColumnDescriptor> results,
+      Set<String> nestedPaths) {

Review Comment:
   spotless



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/variant/VariantPathUtil.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.variant;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.iceberg.expressions.UnboundExtract;
+import org.apache.iceberg.expressions.UnboundPredicate;
+
+public final class VariantPathUtil {
+
+  // Variant field names (matching ParquetVariantVisitor package-protected 
constants)
+  public static final String METADATA = "metadata";
+  public static final String VALUE = "value";
+  public static final String TYPED_VALUE = "typed_value";
+

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to