Repository: hive Updated Branches: refs/heads/master 3726ce590 -> 4959ff5bb
HIVE-13691 No record with CQ_ID=0 found in COMPACTION_QUEUE (Eugene Koifman, reviewed by Wei Zheng) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4959ff5b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4959ff5b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4959ff5b Branch: refs/heads/master Commit: 4959ff5bb7dc590e21f680b9d9be0f2270414309 Parents: 3726ce5 Author: Eugene Koifman <[email protected]> Authored: Wed May 18 10:36:45 2016 -0700 Committer: Eugene Koifman <[email protected]> Committed: Wed May 18 10:36:45 2016 -0700 ---------------------------------------------------------------------- .../metastore/txn/CompactionTxnHandler.java | 27 +++++-- .../hadoop/hive/metastore/txn/TxnHandler.java | 36 +++++---- .../hadoop/hive/ql/txn/compactor/Initiator.java | 5 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 84 +++++++++++++------- 4 files changed, 99 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index ab7da68..d2d6462 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -691,7 +691,7 @@ class CompactionTxnHandler extends TxnHandler { } /** - * For any given compactable entity (partition, table if not partitioned) the history of compactions + * For any given compactable entity (partition; table if not partitioned) the history of compactions * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the * history such that a configurable number of each type of state is present. Any other entries * can be purged. This scheme has advantage of always retaining the last failure/success even if @@ -793,7 +793,7 @@ class CompactionTxnHandler extends TxnHandler { "CC_DATABASE = " + quoteString(ci.dbname) + " and " + "CC_TABLE = " + quoteString(ci.tableName) + (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") + - " order by CC_ID desc"); + " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); int numFailed = 0; int numTotal = 0; int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); @@ -824,8 +824,8 @@ class CompactionTxnHandler extends TxnHandler { /** * If there is an entry in compaction_queue with ci.id, remove it * Make entry in completed_compactions with status 'f'. - * - * but what abount markCleaned() which is called when table is had been deleted... + * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction, + * which we record as ATTEMPTED_STATE entry in history. */ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure @@ -845,12 +845,27 @@ class CompactionTxnHandler extends TxnHandler { int updCnt = stmt.executeUpdate(s); } else { - throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE"); + if(ci.id > 0) { + //the record with valid CQ_ID has disappeared - this is a sign of something wrong + throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE"); + } + } + if(ci.id == 0) { + //The failure occurred before we even made an entry in COMPACTION_QUEUE + //generate ID so that we can make an entry in COMPLETED_COMPACTIONS + ci.id = generateCompactionQueueId(stmt); + //mostly this indicates that the Initiator is paying attention to some table even though + //compactions are not happening. + ci.state = ATTEMPTED_STATE; + //this is not strictly accurate, but 'type' cannot be null. + ci.type = CompactionType.MINOR; + } + else { + ci.state = FAILED_STATE; } close(rs, stmt, null); pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)"); - ci.state = FAILED_STATE; CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); int updCount = pStmt.executeUpdate(); LOG.debug("Going to commit"); http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index f061767..bc818e0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore.txn; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Service; import com.jolbox.bonecp.BoneCPConfig; import com.jolbox.bonecp.BoneCPDataSource; import org.apache.commons.dbcp.ConnectionFactory; @@ -1252,6 +1253,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } + long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException { + // Get the id for the next entry in the queue + String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID"); + LOG.debug("going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new IllegalStateException("Transaction tables not properly initiated, " + + "no record found in next_compaction_queue_id"); + } + long id = rs.getLong(1); + s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + return id; + } public long compact(CompactionRequest rqst) throws MetaException { // Put a compaction request in the queue. try { @@ -1261,21 +1277,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - - // Get the id for the next entry in the queue - String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID"); - LOG.debug("going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly initiated, " + - "no record found in next_compaction_queue_id"); - } - long id = rs.getLong(1); - s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + + long id = generateCompactionQueueId(stmt); StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + "cq_table, "); @@ -1315,7 +1318,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { buf.append(rqst.getRunas()); } buf.append("')"); - s = buf.toString(); + String s = buf.toString(); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -1366,6 +1369,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break; case FAILED_STATE: e.setState(FAILED_RESPONSE); break; case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break; + case ATTEMPTED_STATE: e.setState(ATTEMPTED_RESPONSE); break; default: //do nothing to handle RU/D if we add another status } http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 949cbd5..a55fa1c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -126,8 +126,9 @@ public class Initiator extends CompactorThread { continue; } if(txnHandler.checkFailedCompactions(ci)) { - //todo: make 'a' state entry in completed_compactions - LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed."); + LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last " + + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed."); + txnHandler.markFailed(ci); continue; } http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 903337d..d80a03e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -492,6 +492,15 @@ public class TestTxnCommands2 { Assert.assertEquals("Insert overwrite partition failed", stringifyValues(updatedData), rs2); //insert overwrite not supported for ACID tables } + private static void checkCompactionState(CompactionsByState expected, CompactionsByState actual) { + Assert.assertEquals(TxnStore.ATTEMPTED_RESPONSE, expected.attempted, actual.attempted); + Assert.assertEquals(TxnStore.FAILED_RESPONSE, expected.failed, actual.failed); + Assert.assertEquals(TxnStore.INITIATED_RESPONSE, expected.initiated, actual.initiated); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, expected.readyToClean, actual.readyToClean); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, expected.succeeded, actual.succeeded); + Assert.assertEquals(TxnStore.WORKING_RESPONSE, expected.working, actual.working); + Assert.assertEquals("total", expected.total, actual.total); + } /** * HIVE-12353 * @throws Exception @@ -519,58 +528,60 @@ public class TestTxnCommands2 { txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); runWorker(hiveConf); } - //this should not schedule a new compaction due to prior failures + //this should not schedule a new compaction due to prior failures, but will create Attempted entry Initiator init = new Initiator(); init.setThreadId((int)init.getId()); init.setHiveConf(hiveConf); init.init(stop, new AtomicBoolean()); init.run(); - - CompactionsByState cbs = countCompacts(txnHandler); - Assert.assertEquals("Unexpected number of failed compactions", numFailedCompactions, cbs.failed); - Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions, cbs.total); + int numAttemptedCompactions = 1; + checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler)); hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS); AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService(); runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history - cbs = countCompacts(txnHandler); - Assert.assertEquals("Number of failed compactions after History clean", numFailedCompactions, cbs.failed); - Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions, cbs.total); + checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler)); txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR)); runWorker(hiveConf);//will fail txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); runWorker(hiveConf);//will fail - cbs = countCompacts(txnHandler); - Assert.assertEquals("Unexpected num failed1", numFailedCompactions + 2, cbs.failed); - Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2, cbs.total); + init.run(); + numAttemptedCompactions++; + init.run(); + numAttemptedCompactions++; + checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions + 2,0,0,0,0,numFailedCompactions + 2 + numAttemptedCompactions), countCompacts(txnHandler)); + runHouseKeeperService(compactionHistoryService, hiveConf);//should remove history so that we have //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here) - cbs = countCompacts(txnHandler); - Assert.assertEquals("Unexpected num failed2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); - Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.total); - + checkCompactionState(new CompactionsByState( + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,0,0, + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)), countCompacts(txnHandler)); hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false); txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); - //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated - cbs = countCompacts(txnHandler); - Assert.assertEquals("Unexpected num failed3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); - Assert.assertEquals("Unexpected num initiated", 1, cbs.initiated); - Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); + //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated (explicitly by user) + checkCompactionState(new CompactionsByState( + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),1,0,0,0, + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler)); runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning - cbs = countCompacts(txnHandler); - Assert.assertEquals("Unexpected num failed4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); - Assert.assertEquals("Unexpected num ready to clean", 1, cbs.readyToClean); - Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); - + checkCompactionState(new CompactionsByState( + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,1,0,0, + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler)); + runCleaner(hiveConf); // transition to Success state runHouseKeeperService(compactionHistoryService, hiveConf);//should not purge anything as all items within retention sizes - cbs = countCompacts(txnHandler); - Assert.assertEquals("Unexpected num failed5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); - Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded); - Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); + checkCompactionState(new CompactionsByState( + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,1,0, + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler)); } /** @@ -624,6 +635,18 @@ public class TestTxnCommands2 { private int succeeded; private int working; private int total; + CompactionsByState() { + this(0,0,0,0,0,0,0); + } + CompactionsByState(int attempted, int failed, int initiated, int readyToClean, int succeeded, int working, int total) { + this.attempted = attempted; + this.failed = failed; + this.initiated = initiated; + this.readyToClean = readyToClean; + this.succeeded = succeeded; + this.working = working; + this.total = total; + } } private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException { ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); @@ -648,6 +671,9 @@ public class TestTxnCommands2 { else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) { compactionsByState.attempted++; } + else { + throw new IllegalStateException("Unexpected state: " + compact.getState()); + } } return compactionsByState; }
