ayushtkn commented on code in PR #5386:
URL: https://github.com/apache/hive/pull/5386#discussion_r1753108049


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -601,46 +602,52 @@ private void commitCompaction(Table table, Long 
snapshotId, long startTime, File
     List<DeleteFile> existingDeleteFiles = 
IcebergTableUtil.getDeleteFiles(table, partitionPath);
 
     RewriteFiles rewriteFiles = table.newRewrite();
-    rewriteFiles.validateFromSnapshot(getSnapshotId(table, null));
-    if (snapshotId != null) {
-      rewriteFiles.validateFromSnapshot(snapshotId);
-    }
-
     existingDataFiles.forEach(rewriteFiles::deleteFile);
     existingDeleteFiles.forEach(rewriteFiles::deleteFile);
     results.dataFiles().forEach(rewriteFiles::addFile);
 
+    if (snapshotId != null) {
+      rewriteFiles.validateFromSnapshot(snapshotId);
+    }
     rewriteFiles.commit();
     LOG.info("Compaction commit took {} ms for table: {} partition: {} with {} 
file(s)",
-        System.currentTimeMillis() - startTime, table, partitionPath == null ? 
"N/A" : partitionPath,
+        System.currentTimeMillis() - startTime, table, 
StringUtils.defaultString(partitionPath, "N/A"),
         results.dataFiles().size());
   }
 
   /**
    * Creates and commits an Iceberg insert overwrite change with the provided 
data files.
-   * For unpartitioned tables the table content is replaced with the new data 
files. If not data files are provided
-   * then the unpartitioned table is truncated.
-   * For partitioned tables the relevant partitions are replaced with the new 
data files. If no data files are provided
-   * then the unpartitioned table remains unchanged.
+   * For unpartitioned tables the table content is replaced with the new data 
files. Table is truncated
+   * if no data files are provided.
+   * For partitioned tables the relevant partitions are replaced with the new 
data files. Table remains unchanged
+   * unless data files are provided.
    *
-   * @param table                   The table we are changing
-   * @param startTime               The start time of the commit - used only 
for logging
-   * @param results                 The object containing the new files
+   * @param table         The table we are changing
+   * @param startTime     The start time of the commit - used only for logging
+   * @param results       The object containing the new files

Review Comment:
   ``snapshotId`` needs to be added as a param here



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -1443,6 +1449,25 @@ static void overlayTableProperties(Configuration 
configuration, TableDesc tableD
     map.remove("columns.comments");
   }
 
+  @Override
+  public void validateCurrentSnapshot(TableDesc tableDesc) {
+    if (conf.getBoolean(ConfigProperties.LOCK_HIVE_ENABLED, 
TableProperties.HIVE_LOCK_ENABLED_DEFAULT) ||
+        !HiveConf.getBoolVar(conf, ConfVars.HIVE_TXN_EXT_LOCKING_ENABLED)) {
+      return;
+    }
+    Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());

Review Comment:
   This would be fetching the table from cache, by default `skipCache` is 
`false`, can that lead to getting an outdated snapshot id in any case?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -645,7 +648,10 @@ private void checkAndMergeColStats(ColumnStatistics 
statsObjNew, Table tbl) thro
    */
   @Override

Review Comment:
   need to update the javadoc to reflect the change



##########
ql/src/java/org/apache/hadoop/hive/ql/Driver.java:
##########
@@ -184,7 +186,17 @@ private void runInternal(String command, boolean 
alreadyCompiled) throws Command
       DriverUtils.checkInterrupted(driverState, driverContext, "at acquiring 
the lock.", null, null);
 
       lockAndRespond();
-
+      try {
+        context.getLoadTableOutputMap().forEach(
+          (ltd, we) -> {
+            HiveStorageHandler handler = we.getTable().getStorageHandler();
+            if (handler != null) {
+              handler.validateCurrentSnapshot(ltd.getTable());
+            }
+          });
+      } catch (ReCompileException ex) {
+        compileInternal(context.getCmd(), true);

Review Comment:
   Question -> Like If my validateCurrentSnapshot() fails, we will be compiling 
again & under the hood fetching the snapshot again, does 
``handler.validateCurrentSnapshot(ltd.getTable());`` gets called again?
   
   Just thinking the snapshot gets outdated in the another attempt as well, 
what happens



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to