HIVE-11998 - Improve Compaction process logging (Eugene Koifman, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e4937c03 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e4937c03 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e4937c03 Branch: refs/heads/branch-1 Commit: e4937c0337037ee37d3b0879f259bd7840d8bc39 Parents: a4ff7ad Author: Eugene Koifman <[email protected]> Authored: Fri Oct 2 10:44:28 2015 -0700 Committer: Eugene Koifman <[email protected]> Committed: Fri Oct 2 10:44:28 2015 -0700 ---------------------------------------------------------------------- .../metastore/txn/CompactionTxnHandler.java | 36 ++++++++++++++------ .../hive/ql/txn/AcidHouseKeeperService.java | 5 +-- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 3 +- .../hive/ql/txn/compactor/CompactorThread.java | 9 ++--- .../hadoop/hive/ql/txn/compactor/Initiator.java | 5 ++- 5 files changed, 38 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e4937c03/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 328a65c..44ee5c6 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -122,8 +122,9 @@ public class CompactionTxnHandler extends TxnHandler { stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); + int updCnt = stmt.executeUpdate(s); + if (updCnt != 1) { + LOG.error("Unable to set cq_run_as=" + user + " for compaction record with cq_id=" + cq_id + ". updCnt=" + updCnt); LOG.debug("Going to rollback"); dbConn.rollback(); } @@ -182,8 +183,10 @@ public class CompactionTxnHandler extends TxnHandler { s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); + int updCount = stmt.executeUpdate(s); + if (updCount != 1) { + LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " + + info + ". updCnt=" + updCount); LOG.debug("Going to rollback"); dbConn.rollback(); } @@ -221,8 +224,9 @@ public class CompactionTxnHandler extends TxnHandler { String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + "cq_worker_id = null where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); + int updCnt = stmt.executeUpdate(s); + if (updCnt != 1) { + LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for compaction record: " + info + ". updCnt=" + updCnt); LOG.debug("Going to rollback"); dbConn.rollback(); } @@ -298,6 +302,17 @@ public class CompactionTxnHandler extends TxnHandler { /** * This will remove an entry from the queue after * it has been compacted. + * + * todo: possibly a problem? Worker will start with DB in state X (wrt this partition). + * while it's working more txns will happen, against partition it's compacting. + * then this will delete state up to X and since then. There may be new delta files created + * between compaction starting and cleaning. These will not be compacted until more + * transactions happen. So this ideally should only delete + * up to TXN_ID that was compacted (i.e. HWM in Worker?) Then this can also run + * at READ_COMMITTED + * + * Also, by using this method when Worker fails, we prevent future compactions from + * running until more data is written to tale or compaction is invoked explicitly * @param info info on the compaction entry to remove */ public void markCleaned(CompactionInfo info) throws MetaException { @@ -309,8 +324,9 @@ public class CompactionTxnHandler extends TxnHandler { stmt = dbConn.createStatement(); String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to delete compaction record"); + int updCount = stmt.executeUpdate(s); + if (updCount != 1) { + LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount); LOG.debug("Going to rollback"); dbConn.rollback(); } @@ -348,7 +364,7 @@ public class CompactionTxnHandler extends TxnHandler { else buf.append(", "); buf.append(id); } - + //because 1 txn may include different partitions/tables even in auto commit mode buf.append(") and tc_database = '"); buf.append(info.dbname); buf.append("' and tc_table = '"); @@ -415,7 +431,7 @@ public class CompactionTxnHandler extends TxnHandler { String bufStr = buf.toString(); LOG.debug("Going to execute update <" + bufStr + ">"); int rc = stmt.executeUpdate(bufStr); - LOG.debug("Removed " + rc + " records from txns"); + LOG.info("Removed " + rc + " empty Aborted transactions: " + txnids + " from TXNS"); LOG.debug("Going to commit"); dbConn.commit(); } http://git-wip-us.apache.org/repos/asf/hive/blob/e4937c03/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java index d22ca8d..23a77e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java @@ -84,9 +84,10 @@ public class AcidHouseKeeperService implements HouseKeeperService { @Override public void run() { try { + long startTime = System.currentTimeMillis(); txnHandler.performTimeOuts(); - owner.isAliveCounter.incrementAndGet(); - LOG.info("timeout reaper ran"); + int count = owner.isAliveCounter.incrementAndGet(); + LOG.info("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); } catch(Throwable t) { LOG.fatal("Serious error in " + Thread.currentThread().getName() + ": " + t.getMessage(), t); http://git-wip-us.apache.org/repos/asf/hive/blob/e4937c03/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 16d2c81..622bf54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -212,7 +212,7 @@ public class Cleaner extends CompactorThread { if (runJobAsSelf(ci.runAs)) { removeFiles(location, txnList); } else { - LOG.info("Cleaning as user " + ci.runAs); + LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, UserGroupInformation.getLoginUser()); ugi.doAs(new PrivilegedExceptionAction<Object>() { @@ -245,6 +245,7 @@ public class Cleaner extends CompactorThread { ", that hardly seems right."); return; } + LOG.info("About to remove " + filesToDelete.size() + " obsolete directories from " + location); FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { http://git-wip-us.apache.org/repos/asf/hive/blob/e4937c03/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 38cd95e..c956f58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -119,8 +119,8 @@ abstract class CompactorThread extends Thread implements MetaStoreThread { throw e; } if (parts.size() != 1) { - LOG.error(ci.getFullPartitionName() + " does not refer to a single partition"); - throw new MetaException("Too many partitions"); + LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " + parts); + throw new MetaException("Too many partitions for : " + ci.getFullPartitionName()); } return parts.get(0); } else { @@ -179,8 +179,9 @@ abstract class CompactorThread extends Thread implements MetaStoreThread { return wrapper.get(0); } } - LOG.error("Unable to stat file as either current user or table owner, giving up"); - throw new IOException("Unable to stat file"); + LOG.error("Unable to stat file " + p + " as either current user(" + UserGroupInformation.getLoginUser() + + ") or table owner(" + t.getOwner() + "), giving up"); + throw new IOException("Unable to stat file: " + p); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/e4937c03/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 73715c6..ff40600 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -98,7 +98,7 @@ public class Initiator extends CompactorThread { // check if no compaction set for this table if (noAutoCompactSet(t)) { - LOG.info("Table " + tableName(t) + " marked true so we will not compact it."); + LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it."); continue; } @@ -296,11 +296,10 @@ public class Initiator extends CompactorThread { } private void requestCompaction(CompactionInfo ci, String runAs, CompactionType type) throws MetaException { - String s = "Requesting " + type.toString() + " compaction for " + ci.getFullPartitionName(); - LOG.info(s); CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, type); if (ci.partName != null) rqst.setPartitionname(ci.partName); rqst.setRunas(runAs); + LOG.info("Requesting compaction: " + rqst); txnHandler.compact(rqst); }
