deniskuzZ commented on a change in pull request #1087:
URL: https://github.com/apache/hive/pull/1087#discussion_r447468545



##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
##########
@@ -229,102 +239,168 @@ public int repair(MsckInfo msckInfo) {
             throw new MetastoreException(e);
           }
         }
+        if (transactionalTable && !MetaStoreServerUtils.isPartitioned(table)) {
+          if (result.getMaxWriteId() > 0) {
+            if (txnId < 0) {
+              // We need the txnId to check against even if we didn't do the 
locking
+              txnId = getMsc().openTxn(getUserName());
+            }
+
+            validateAndAddMaxTxnIdAndWriteId(result.getMaxWriteId(), 
result.getMaxTxnId(),
+                table.getDbName(), table.getTableName(), txnId);
+          }
+        }
       }
       success = true;
     } catch (Exception e) {
       LOG.warn("Failed to run metacheck: ", e);
       success = false;
-      ret = 1;
     } finally {
-      if (msckInfo.getResFile() != null) {
-        BufferedWriter resultOut = null;
-        try {
-          Path resFile = new Path(msckInfo.getResFile());
-          FileSystem fs = resFile.getFileSystem(getConf());
-          resultOut = new BufferedWriter(new 
OutputStreamWriter(fs.create(resFile)));
-
-          boolean firstWritten = false;
-          firstWritten |= writeMsckResult(result.getTablesNotInMs(),
-            "Tables not in metastore:", resultOut, firstWritten);
-          firstWritten |= writeMsckResult(result.getTablesNotOnFs(),
-            "Tables missing on filesystem:", resultOut, firstWritten);
-          firstWritten |= writeMsckResult(result.getPartitionsNotInMs(),
-            "Partitions not in metastore:", resultOut, firstWritten);
-          firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(),
-            "Partitions missing from filesystem:", resultOut, firstWritten);
-          firstWritten |= writeMsckResult(result.getExpiredPartitions(),
-            "Expired partitions (retention period: " + partitionExpirySeconds 
+ "s) :", resultOut, firstWritten);
-          // sorting to stabilize qfile output (msck_repair_drop.q)
-          Collections.sort(repairOutput);
-          for (String rout : repairOutput) {
-            if (firstWritten) {
-              resultOut.write(terminator);
-            } else {
-              firstWritten = true;
-            }
-            resultOut.write(rout);
-          }
-        } catch (IOException e) {
-          LOG.warn("Failed to save metacheck output: ", e);
-          ret = 1;
-        } finally {
-          if (resultOut != null) {
-            try {
-              resultOut.close();
-            } catch (IOException e) {
-              LOG.warn("Failed to close output file: ", e);
-              ret = 1;
-            }
-          }
+      if (result!=null) {
+        logResult(result);
+        if (msckInfo.getResFile() != null) {
+          success = writeResultToFile(msckInfo, result, repairOutput, 
partitionExpirySeconds) && success;
         }
       }
 
-      LOG.info("Tables not in metastore: {}", result.getTablesNotInMs());
-      LOG.info("Tables missing on filesystem: {}", result.getTablesNotOnFs());
-      LOG.info("Partitions not in metastore: {}", 
result.getPartitionsNotInMs());
-      LOG.info("Partitions missing from filesystem: {}", 
result.getPartitionsNotOnFs());
-      LOG.info("Expired partitions: {}", result.getExpiredPartitions());
-      if (acquireLock && txnId > 0) {
-          if (success) {
-            try {
-              LOG.info("txnId: {} succeeded. Committing..", txnId);
-              getMsc().commitTxn(txnId);
-            } catch (Exception e) {
-              LOG.warn("Error while committing txnId: {} for table: {}", 
txnId, qualifiedTableName, e);
-              ret = 1;
-            }
-          } else {
-            try {
-              LOG.info("txnId: {} failed. Aborting..", txnId);
-              getMsc().abortTxns(Lists.newArrayList(txnId));
-            } catch (Exception e) {
-              LOG.warn("Error while aborting txnId: {} for table: {}", txnId, 
qualifiedTableName, e);
-              ret = 1;
-            }
-          }
+      if (txnId > 0) {
+        success = closeTxn(qualifiedTableName, success, txnId) && success;
       }
       if (getMsc() != null) {
         getMsc().close();
         msc = null;
       }
     }
+    return success ? 0 : 1;
+  }
 
+  private boolean closeTxn(String qualifiedTableName, boolean success, long 
txnId) {
+    boolean ret = true;
+    if (success) {
+      try {
+        LOG.info("txnId: {} succeeded. Committing..", txnId);
+        getMsc().commitTxn(txnId);
+      } catch (Exception e) {
+        LOG.warn("Error while committing txnId: {} for table: {}", txnId, 
qualifiedTableName, e);
+        ret = false;
+      }
+    } else {
+      try {
+        LOG.info("txnId: {} failed. Aborting..", txnId);
+        getMsc().abortTxns(Lists.newArrayList(txnId));
+      } catch (Exception e) {
+        LOG.warn("Error while aborting txnId: {} for table: {}", txnId, 
qualifiedTableName, e);
+        ret = false;
+      }
+    }
     return ret;
   }
 
-  private LockRequest createLockRequest(final String dbName, final String 
tableName) throws TException {
-    UserGroupInformation loggedInUser = null;
-    String username;
+  private void logResult(CheckResult result) {
+    LOG.info("Tables not in metastore: {}", result.getTablesNotInMs());
+    LOG.info("Tables missing on filesystem: {}", result.getTablesNotOnFs());
+    LOG.info("Partitions not in metastore: {}", result.getPartitionsNotInMs());
+    LOG.info("Partitions missing from filesystem: {}", 
result.getPartitionsNotOnFs());
+    LOG.info("Expired partitions: {}", result.getExpiredPartitions());
+  }
+
+  private boolean writeResultToFile(MsckInfo msckInfo, CheckResult result, 
List<String> repairOutput,
+      long partitionExpirySeconds) {
+    boolean success = true;
+    BufferedWriter resultOut = null;
     try {
-      loggedInUser = UserGroupInformation.getLoginUser();
+      Path resFile = new Path(msckInfo.getResFile());
+      FileSystem fs = resFile.getFileSystem(getConf());
+      resultOut = new BufferedWriter(new 
OutputStreamWriter(fs.create(resFile)));
+
+      boolean firstWritten = false;
+      firstWritten |= writeMsckResult(result.getTablesNotInMs(),
+        "Tables not in metastore:", resultOut, firstWritten);
+      firstWritten |= writeMsckResult(result.getTablesNotOnFs(),
+        "Tables missing on filesystem:", resultOut, firstWritten);
+      firstWritten |= writeMsckResult(result.getPartitionsNotInMs(),
+        "Partitions not in metastore:", resultOut, firstWritten);
+      firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(),
+        "Partitions missing from filesystem:", resultOut, firstWritten);
+      firstWritten |= writeMsckResult(result.getExpiredPartitions(),
+        "Expired partitions (retention period: " + partitionExpirySeconds + 
"s) :", resultOut, firstWritten);
+      // sorting to stabilize qfile output (msck_repair_drop.q)
+      Collections.sort(repairOutput);
+      for (String rout : repairOutput) {
+        if (firstWritten) {
+          resultOut.write(terminator);
+        } else {
+          firstWritten = true;
+        }
+        resultOut.write(rout);
+      }
     } catch (IOException e) {
-      LOG.warn("Unable to get logged in user via UGI. err: {}", 
e.getMessage());
+      LOG.warn("Failed to save metacheck output: ", e);
+      success = false;
+    } finally {
+      if (resultOut != null) {
+        try {
+          resultOut.close();
+        } catch (IOException e) {
+          LOG.warn("Failed to close output file: ", e);
+          success = false;
+        }
+      }
     }
-    if (loggedInUser == null) {
-      username = System.getProperty("user.name");
-    } else {
-      username = loggedInUser.getShortUserName();
+    return success;
+  }
+
+  /**
+   * When we add new partitions to a transactional table, we have check the 
writeIds.
+   * For every newly added partitions, we read the maximum writeId form the 
directory structure
+   * and compare it to the maximum allocated writeId in the metastore.
+   * If the metastore has never allocated any were are good, the use case 
would be initialize a table with
+   * existing data. The HMS will be initialized with the maximum writeId. The 
system will handle every delta directory
+   * as committed ones.
+   * If the writeId is higher in the metastore we can still accept the data, 
the use case would be after some dataloss
+   * some older data backup was used. The system would able to read the old 
data.
+   * If however the writeId in the new partition is greater than the maximum 
allocated in the HMS
+   * we must raise an error. The writedId in the HMS should be increased to 
match the writeIds in the data files,
+   * but it would most likely cause a lot of problem since the transactional 
data would become inconsistent
+   * between the HMS and the filesystem.
+   * Further more we need to check for the visibilityTransactionIds written by 
the compaction.
+   * If we have a higher visibilityId in the directory structure than the 
current transactionid we need to set
+   * the transactionId sequence higher in the HMS so the next reads may read 
the content of the
+   * compacted base/delta folders.
+   * @param partsNotInMs partitions only in the FileSystem
+   * @param dbName database name
+   * @param tableName table name
+   * @param txnId actual transactionId
+   */
+  private void 
validateAndAddMaxTxnIdAndWriteId(Set<CheckResult.PartitionResult> partsNotInMs, 
String dbName,
+      String tableName, long txnId) throws TException {
+    long maxWriteIdOnFilesystem =
+        
partsNotInMs.stream().map(CheckResult.PartitionResult::getMaxWriteId).max(Long::compareTo).orElse(0L);
+    long maxVisibilityTxnId =
+        
partsNotInMs.stream().map(CheckResult.PartitionResult::getMaxTxnId).max(Long::compareTo).orElse(0L);
+    validateAndAddMaxTxnIdAndWriteId(maxWriteIdOnFilesystem, 
maxVisibilityTxnId, dbName, tableName, txnId);
+  }
+
+  private void validateAndAddMaxTxnIdAndWriteId(long maxWriteIdOnFilesystem, 
long maxVisibilityTxnId, String dbName,
+      String tableName, long txnId) throws TException {
+    long maxAllocatedWriteId = getMsc().getMaxAllocatedWriteId(dbName, 
tableName);
+    if (maxAllocatedWriteId > 0 && maxWriteIdOnFilesystem > 
maxAllocatedWriteId) {

Review comment:
       what if we would like to ship FS deltas to the backup HMS due to some 
replication issue? could you please elaborate what issues do you see in 
increasing writeId in HMS to match FS?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to