marton-bod commented on a change in pull request #2750:
URL: https://github.com/apache/hive/pull/2750#discussion_r742770672



##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
##########
@@ -68,6 +73,23 @@ public Path getPath() {
     return new Path(tableLocation);
   }
 
+  @Override
+  public byte[] getBytesForHash() {
+    Collection<FileScanTask> fileScanTasks = innerSplit.task().files();
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      for (FileScanTask task : fileScanTasks) {
+        baos.write(task.file().path().toString().getBytes());
+        byte[] startBytes = new byte[Long.BYTES];
+        SerDeUtils.writeLong(startBytes, 0, task.start());
+        baos.write(startBytes);
+      }
+      return baos.toByteArray();
+    } catch (IOException ioe) {
+      throw new RuntimeException("Couldn't produce hash input bytes for 
HiveIcebergSplit: " + this, ioe);

Review comment:
       Would the entire query fail if the hash computation doesn't work here? 
If so, do we want to consider just ERROR logging it but proceeding with the 
execution even if it's slower due to suboptimal cache affinity? What do you 
think?

##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
##########
@@ -68,6 +73,23 @@ public Path getPath() {
     return new Path(tableLocation);
   }
 
+  @Override
+  public byte[] getBytesForHash() {
+    Collection<FileScanTask> fileScanTasks = innerSplit.task().files();
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      for (FileScanTask task : fileScanTasks) {
+        baos.write(task.file().path().toString().getBytes());
+        byte[] startBytes = new byte[Long.BYTES];
+        SerDeUtils.writeLong(startBytes, 0, task.start());
+        baos.write(startBytes);

Review comment:
       Using Guava, I think these 3 lines could be just 
`baos.write(Longs.toByteArray(task.start()))`, but it's up to you

##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
##########
@@ -118,6 +115,13 @@
     if (splitSize > 0) {
       scan = scan.option(TableProperties.SPLIT_SIZE, 
String.valueOf(splitSize));
     }
+    // In case of LLAP-based execution we ask Iceberg not to combine multiple 
fileScanTasks into one split.
+    // This is so that cache affinity can work, and each file(split) is 
executed/cached on always the same LLAP daemon.
+    MapWork mapWork = LlapHiveUtils.findMapWork((JobConf) conf);
+    if (mapWork != null && mapWork.getCacheAffinity()) {
+      Long openFileCost = splitSize > 0 ? splitSize : 
TableProperties.SPLIT_SIZE_DEFAULT;
+      scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, 
String.valueOf(openFileCost));

Review comment:
       I would also need to understand that better, so a comment would probably 
be helpful here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to