marton-bod commented on a change in pull request #2750: URL: https://github.com/apache/hive/pull/2750#discussion_r742770672
########## File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java ########## @@ -68,6 +73,23 @@ public Path getPath() { return new Path(tableLocation); } + @Override + public byte[] getBytesForHash() { + Collection<FileScanTask> fileScanTasks = innerSplit.task().files(); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + for (FileScanTask task : fileScanTasks) { + baos.write(task.file().path().toString().getBytes()); + byte[] startBytes = new byte[Long.BYTES]; + SerDeUtils.writeLong(startBytes, 0, task.start()); + baos.write(startBytes); + } + return baos.toByteArray(); + } catch (IOException ioe) { + throw new RuntimeException("Couldn't produce hash input bytes for HiveIcebergSplit: " + this, ioe); Review comment: Would the entire query fail if the hash computation doesn't work here? If so, do we want to consider just ERROR logging it but proceeding with the execution even if it's slower due to suboptimal cache affinity? What do you think? ########## File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java ########## @@ -68,6 +73,23 @@ public Path getPath() { return new Path(tableLocation); } + @Override + public byte[] getBytesForHash() { + Collection<FileScanTask> fileScanTasks = innerSplit.task().files(); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + for (FileScanTask task : fileScanTasks) { + baos.write(task.file().path().toString().getBytes()); + byte[] startBytes = new byte[Long.BYTES]; + SerDeUtils.writeLong(startBytes, 0, task.start()); + baos.write(startBytes); Review comment: Using Guava, I think these 3 lines could be just `baos.write(Longs.toByteArray(task.start()))`, but it's up to you ########## File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java ########## @@ -118,6 +115,13 @@ if (splitSize > 0) { scan = scan.option(TableProperties.SPLIT_SIZE, String.valueOf(splitSize)); } + // In case of LLAP-based execution we ask Iceberg not to combine multiple fileScanTasks into one split. + // This is so that cache affinity can work, and each file(split) is executed/cached on always the same LLAP daemon. + MapWork mapWork = LlapHiveUtils.findMapWork((JobConf) conf); + if (mapWork != null && mapWork.getCacheAffinity()) { + Long openFileCost = splitSize > 0 ? splitSize : TableProperties.SPLIT_SIZE_DEFAULT; + scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(openFileCost)); Review comment: I would also need to understand that better, so a comment would probably be helpful here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org