okumin commented on code in PR #5409:
URL: https://github.com/apache/hive/pull/5409#discussion_r1739764504


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -752,6 +753,39 @@ private void addCustomSortExpr(Table table,  
org.apache.hadoop.hive.ql.metadata.
     ).collect(Collectors.toList()));
   }
 
+  @Override
+  public boolean 
supportsPartitionAwareOptimization(org.apache.hadoop.hive.ql.metadata.Table 
table) {
+    if (hasUndergonePartitionEvolution(table)) {
+      // Don't support complex cases yet
+      return false;
+    }
+    final List<TransformSpec> specs = getPartitionTransformSpec(table);
+    // Currently, we support the only bucket transform
+    return specs.stream().anyMatch(HiveIcebergStorageHandler::isBucket);
+  }
+
+  @Override
+  public PartitionAwareOptimizationCtx createPartitionAwareOptimizationContext(
+      org.apache.hadoop.hive.ql.metadata.Table table) {
+    // Currently, we support the only bucket transform
+    final List<String> bucketColumnNames = Lists.newArrayList();
+    final List<Integer> numBuckets = Lists.newArrayList();
+    
getPartitionTransformSpec(table).stream().filter(HiveIcebergStorageHandler::isBucket).forEach(spec
 -> {
+      bucketColumnNames.add(spec.getColumnName());
+      numBuckets.add(spec.getTransformParam().get());
+    });
+
+    if (bucketColumnNames.isEmpty()) {
+      return null;
+    }
+    final IcebergBucketFunction bucketFunction = new 
IcebergBucketFunction(bucketColumnNames, numBuckets);
+    return new PartitionAwareOptimizationCtx(bucketFunction);
+  }
+
+  private static boolean isBucket(TransformSpec spec) {

Review Comment:
   I think this can be here or in another class in the iceberg-handler module. 
That's because it is an Iceberg-specific invariant condition that [a bucket 
transform is always together with # of 
buckets](https://iceberg.apache.org/spec/#partition-transforms). Another 
storage format might give a default or fixed number. This could be overthinking.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java:
##########
@@ -75,7 +81,7 @@ public Path getPath() {
 
   @Override
   public byte[] getBytesForHash() {
-    Collection<FileScanTask> fileScanTasks = innerSplit.task().files();
+    Collection<FileScanTask> fileScanTasks = innerSplit.task().tasks();

Review Comment:
   Because only `CombinedScanTask` has `tasks`. Renamed `IcebergSplit#task`
   
https://github.com/apache/hive/pull/5409/commits/3c57a67327340197c2f1798503ab1927a5104cdd



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergBucketFunction.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CustomBucketFunction;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergBucket;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class IcebergBucketFunction implements CustomBucketFunction {
+  private static final class SettableDeferredObject implements DeferredObject {
+    private Object value;
+
+    @Override
+    public void prepare(int i) {
+      throw new UnsupportedOperationException("Not supported");
+    }
+
+    @Override
+    public Object get() {
+      return value;
+    }
+
+    void set(Object object) {
+      this.value = object;
+    }
+  }
+
+  private static final long serialVersionUID = 1L;
+
+  private final List<String> sourceColumnNames;
+  private final List<Integer> numBuckets;
+
+  private transient List<GenericUDFIcebergBucket> bucketUdfs;
+  private transient List<IntObjectInspector> bucketIdInspectors;
+  private transient SettableDeferredObject[] deferredObjects;
+
+  public IcebergBucketFunction(List<String> sourceColumnNames, List<Integer> 
numBuckets) {
+    Objects.requireNonNull(sourceColumnNames);
+    Objects.requireNonNull(numBuckets);
+    Preconditions.checkArgument(sourceColumnNames.size() == numBuckets.size());
+    Preconditions.checkArgument(!sourceColumnNames.isEmpty());
+    this.sourceColumnNames = sourceColumnNames;
+    this.numBuckets = numBuckets;
+  }
+
+  @Override
+  public Optional<CustomBucketFunction> select(boolean[] retainedColumns) {
+    Preconditions.checkArgument(retainedColumns.length == numBuckets.size());
+    Preconditions.checkState(bucketUdfs == null);
+    Preconditions.checkState(bucketIdInspectors == null);
+    Preconditions.checkState(deferredObjects == null);
+
+    final List<String> newSourceColumnNames = Lists.newArrayList();
+    final List<Integer> newNumBuckets = Lists.newArrayList();
+    for (int i = 0; i < retainedColumns.length; i++) {
+      if (retainedColumns[i]) {
+        newSourceColumnNames.add(sourceColumnNames.get(i));
+        newNumBuckets.add(numBuckets.get(i));
+      }
+    }
+
+    if (newSourceColumnNames.isEmpty()) {
+      return Optional.empty();
+    }
+
+    final IcebergBucketFunction newBucketFunction = new 
IcebergBucketFunction(newSourceColumnNames, newNumBuckets);
+    return Optional.of(newBucketFunction);
+  }
+
+  @Override
+  public void initialize(ObjectInspector[] objectInspectors) {
+    Preconditions.checkArgument(objectInspectors.length == numBuckets.size());
+    bucketIdInspectors = Lists.newArrayList();
+    bucketUdfs = Lists.newArrayList();
+    for (int i = 0; i < objectInspectors.length; i++) {
+      final GenericUDFIcebergBucket udf = new GenericUDFIcebergBucket();
+      final ObjectInspector numBucketInspector = 
PrimitiveObjectInspectorFactory
+          .getPrimitiveWritableConstantObjectInspector(
+              TypeInfoFactory.intTypeInfo,
+              new IntWritable(numBuckets.get(i)));
+      final ObjectInspector[] args = { objectInspectors[i], numBucketInspector 
};
+      try {
+        final ObjectInspector inspector = udf.initialize(args);
+        Preconditions.checkState(inspector.getCategory() == 
Category.PRIMITIVE);
+        final PrimitiveObjectInspector primitiveInspector = 
(PrimitiveObjectInspector) inspector;
+        Preconditions.checkState(primitiveInspector.getPrimitiveCategory() == 
PrimitiveCategory.INT);
+        bucketIdInspectors.add((IntObjectInspector) primitiveInspector);
+      } catch (UDFArgumentException e) {
+        throw new IllegalArgumentException("The given object inspector is 
illegal", e);
+      }
+      bucketUdfs.add(udf);
+    }
+    deferredObjects = new SettableDeferredObject[1];
+    deferredObjects[0] = new SettableDeferredObject();
+
+    Preconditions.checkState(bucketIdInspectors.size() == numBuckets.size());
+    Preconditions.checkState(bucketUdfs.size() == numBuckets.size());
+  }
+
+  @Override
+  public List<String> getSourceColumnNames() {
+    return sourceColumnNames;
+  }
+
+  @Override
+  public int getNumBuckets() {
+    return numBuckets.stream().reduce(1, (a, b) -> a * b);

Review Comment:
   Good to know the method. Done
   
https://github.com/apache/hive/pull/5409/commits/41ea3ab3b78b2fc88e30a4fcd641540188819e60



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java:
##########
@@ -304,9 +294,9 @@ public void process(Object row, int tag) throws 
HiveException {
             conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
         valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval,
             conf.getOutputValueColumnNames(), rowInspector);
-        partitionObjectInspectors = initEvaluators(partitionEval, 
rowInspector);
+        partitionHashFunc = 
conf.getPartitionFunction(initEvaluators(partitionEval, rowInspector));

Review Comment:
   Done
   
https://github.com/apache/hive/pull/5409/commits/072ff8e6094d08013e935e9b7ad12764af0c52f4



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergBucketFunction.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CustomBucketFunction;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergBucket;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class IcebergBucketFunction implements CustomBucketFunction {
+  private static final class SettableDeferredObject implements DeferredObject {

Review Comment:
   Done
   
https://github.com/apache/hive/pull/5409/commits/ab37fd32de9ba6c94b04229735c043b816e0896a



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergBucketFunction.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;

Review Comment:
   Done
   
https://github.com/apache/hive/pull/5409/commits/18802061e1c6a5da3be3a962fcdc0b3c43a04d09



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java:
##########
@@ -88,6 +94,25 @@ public byte[] getBytesForHash() {
     }
   }
 
+  @Override
+  public OptionalInt getBucketHashCode() {
+    final StructLike key = innerSplit.task().groupingKey();
+    if (key.size() == 0) {
+      return OptionalInt.empty();
+    }
+    final int numBucketKeys = key.size();

Review Comment:
   Thanks. Done
   
https://github.com/apache/hive/pull/5409/commits/41ea3ab3b78b2fc88e30a4fcd641540188819e60



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java:
##########
@@ -45,12 +53,12 @@ public class IcebergSplit extends InputSplit implements 
IcebergSplitContainer {
   public IcebergSplit() {
   }
 
-  IcebergSplit(Configuration conf, CombinedScanTask task) {
+  IcebergSplit(Configuration conf, ScanTaskGroup<FileScanTask> task) {
     this.task = task;
     this.conf = conf;
   }
 
-  public CombinedScanTask task() {
+  public ScanTaskGroup<FileScanTask> task() {

Review Comment:
   Renamed all related methods
   
https://github.com/apache/hive/pull/5409/commits/3c57a67327340197c2f1798503ab1927a5104cdd



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergBucketFunction.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CustomBucketFunction;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergBucket;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class IcebergBucketFunction implements CustomBucketFunction {
+  private static final class SettableDeferredObject implements DeferredObject {
+    private Object value;
+
+    @Override
+    public void prepare(int i) {
+      throw new UnsupportedOperationException("Not supported");
+    }
+
+    @Override
+    public Object get() {
+      return value;
+    }
+
+    void set(Object object) {
+      this.value = object;
+    }
+  }
+
+  private static final long serialVersionUID = 1L;
+
+  private final List<String> sourceColumnNames;
+  private final List<Integer> numBuckets;
+
+  private transient List<GenericUDFIcebergBucket> bucketUdfs;
+  private transient List<IntObjectInspector> bucketIdInspectors;
+  private transient SettableDeferredObject[] deferredObjects;
+
+  public IcebergBucketFunction(List<String> sourceColumnNames, List<Integer> 
numBuckets) {
+    Objects.requireNonNull(sourceColumnNames);
+    Objects.requireNonNull(numBuckets);
+    Preconditions.checkArgument(sourceColumnNames.size() == numBuckets.size());
+    Preconditions.checkArgument(!sourceColumnNames.isEmpty());
+    this.sourceColumnNames = sourceColumnNames;
+    this.numBuckets = numBuckets;
+  }
+
+  @Override
+  public Optional<CustomBucketFunction> select(boolean[] retainedColumns) {
+    Preconditions.checkArgument(retainedColumns.length == numBuckets.size());
+    Preconditions.checkState(bucketUdfs == null);
+    Preconditions.checkState(bucketIdInspectors == null);
+    Preconditions.checkState(deferredObjects == null);
+
+    final List<String> newSourceColumnNames = Lists.newArrayList();
+    final List<Integer> newNumBuckets = Lists.newArrayList();
+    for (int i = 0; i < retainedColumns.length; i++) {
+      if (retainedColumns[i]) {
+        newSourceColumnNames.add(sourceColumnNames.get(i));
+        newNumBuckets.add(numBuckets.get(i));
+      }
+    }
+
+    if (newSourceColumnNames.isEmpty()) {
+      return Optional.empty();
+    }
+
+    final IcebergBucketFunction newBucketFunction = new 
IcebergBucketFunction(newSourceColumnNames, newNumBuckets);
+    return Optional.of(newBucketFunction);
+  }
+
+  @Override
+  public void initialize(ObjectInspector[] objectInspectors) {
+    Preconditions.checkArgument(objectInspectors.length == numBuckets.size());
+    bucketIdInspectors = Lists.newArrayList();
+    bucketUdfs = Lists.newArrayList();
+    for (int i = 0; i < objectInspectors.length; i++) {
+      final GenericUDFIcebergBucket udf = new GenericUDFIcebergBucket();
+      final ObjectInspector numBucketInspector = 
PrimitiveObjectInspectorFactory
+          .getPrimitiveWritableConstantObjectInspector(
+              TypeInfoFactory.intTypeInfo,
+              new IntWritable(numBuckets.get(i)));
+      final ObjectInspector[] args = { objectInspectors[i], numBucketInspector 
};
+      try {
+        final ObjectInspector inspector = udf.initialize(args);
+        Preconditions.checkState(inspector.getCategory() == 
Category.PRIMITIVE);
+        final PrimitiveObjectInspector primitiveInspector = 
(PrimitiveObjectInspector) inspector;
+        Preconditions.checkState(primitiveInspector.getPrimitiveCategory() == 
PrimitiveCategory.INT);
+        bucketIdInspectors.add((IntObjectInspector) primitiveInspector);
+      } catch (UDFArgumentException e) {
+        throw new IllegalArgumentException("The given object inspector is 
illegal", e);
+      }
+      bucketUdfs.add(udf);
+    }
+    deferredObjects = new SettableDeferredObject[1];
+    deferredObjects[0] = new SettableDeferredObject();
+
+    Preconditions.checkState(bucketIdInspectors.size() == numBuckets.size());
+    Preconditions.checkState(bucketUdfs.size() == numBuckets.size());
+  }
+
+  @Override
+  public List<String> getSourceColumnNames() {
+    return sourceColumnNames;
+  }
+
+  @Override
+  public int getNumBuckets() {
+    return numBuckets.stream().reduce(1, (a, b) -> a * b);
+  }
+
+  @Override
+  public int getBucketHashCode(Object[] bucketFields) {
+    final int[] bucketIds = new int[numBuckets.size()];
+    for (int i = 0; i < bucketUdfs.size(); i++) {
+      deferredObjects[0].set(bucketFields[i]);
+      try {
+        final Object bucketId = bucketUdfs.get(i).evaluate(deferredObjects);
+        bucketIds[i] = bucketIdInspectors.get(i).get(bucketId);
+      } catch (HiveException e) {
+        throw new IllegalArgumentException("Failed to evaluate the given 
objects", e);
+      }
+    }
+    return getHashCode(bucketIds);
+  }
+
+  static int getHashCode(int[] bucketIds) {

Review Comment:
   I am currently biased toward explicitly having it under our control. That's 
because `Arrays.hashCode` might not guarantee consistency among JDK versions or 
implementations.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java:
##########
@@ -70,14 +78,31 @@ public String[] getLocations() {
     // getLocations() won't be accurate when called on worker nodes and will 
always return "*"
     if (locations == null && conf != null) {
       boolean localityPreferred = conf.getBoolean(InputFormatConfig.LOCALITY, 
false);
-      locations = localityPreferred ? Util.blockLocations(task, conf) : 
ANYWHERE;
+      locations = localityPreferred ? blockLocations(task, conf) : ANYWHERE;
     } else {
       locations = ANYWHERE;
     }
 
     return locations;
   }
 
+  private static String[] blockLocations(ScanTaskGroup<FileScanTask> task, 
Configuration conf) {

Review Comment:
   I created a PR https://github.com/apache/iceberg/pull/11053, and add a 
comment
   
https://github.com/apache/hive/pull/5409/commits/4206c61a80c06db7a8f899bb848ae65091eb1857



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to