Repository: hive Updated Branches: refs/heads/master 6a4e0806a -> ce457a496
HIVE-13493 - Fix TransactionBatchImpl.getCurrentTxnId() and mis logging fixes (Eugene Koifman, reviewed by Wei Zheng) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ce457a49 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ce457a49 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ce457a49 Branch: refs/heads/master Commit: ce457a4962a8f7a43c0da7647d958e5cc87b5dd8 Parents: 6a4e080 Author: Eugene Koifman <[email protected]> Authored: Wed Apr 27 15:45:39 2016 -0700 Committer: Eugene Koifman <[email protected]> Committed: Wed Apr 27 15:45:39 2016 -0700 ---------------------------------------------------------------------- .../src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java | 3 ++- .../test/org/apache/hive/hcatalog/streaming/TestStreaming.java | 2 ++ .../apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java | 3 ++- .../src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java | 2 ++ 4 files changed, 8 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ce457a49/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index baeafad..db9fd72 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -622,7 +622,7 @@ public class HiveEndPoint { private void beginNextTransactionImpl() throws TransactionError { state = TxnState.INACTIVE;//clear state from previous txn - if ( currentTxnIndex >= txnIds.size() ) + if ( currentTxnIndex + 1 >= txnIds.size() ) throw new InvalidTrasactionState("No more transactions available in" + " current batch for end point : " + endPt); ++currentTxnIndex; @@ -874,6 +874,7 @@ public class HiveEndPoint { currentTxnIndex < txnIds.size(); currentTxnIndex++) { msClient.rollbackTxn(txnIds.get(currentTxnIndex)); } + currentTxnIndex--;//since the loop left it == txnId.size() } else { if (getCurrentTxnId() > 0) { http://git-wip-us.apache.org/repos/asf/hive/blob/ce457a49/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index bde78e4..f4ee208 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -1714,6 +1714,8 @@ public class TestStreaming { } catch(StreamingIOFailure ex) { expectedEx = ex; + txnBatch.getCurrentTransactionState(); + txnBatch.getCurrentTxnId();//test it doesn't throw ArrayIndexOutOfBounds... } Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"), expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred")); http://git-wip-us.apache.org/repos/asf/hive/blob/ce457a49/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 67e661f..ab7da68 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -30,6 +30,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -451,7 +452,7 @@ class CompactionTxnHandler extends TxnHandler { if(txnids.size() <= 0) { return; } - + Collections.sort(txnids);//easier to read logs List<String> queries = new ArrayList<String>(); StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/hive/blob/ce457a49/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index df6591f..c32b0b0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -2345,6 +2345,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { deletedLocks += stmt.executeUpdate(query); } if(deletedLocks > 0) { + Collections.sort(extLockIDs);////easier to read logs LOG.info("Deleted " + deletedLocks + " ext locks from HIVE_LOCKS due to timeout (vs. " + extLockIDs.size() + " found. List: " + extLockIDs + ") maxHeartbeatTime=" + maxHeartbeatTime); } @@ -2444,6 +2445,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn.commit(); numTxnsAborted += batchToAbort.size(); //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout' + Collections.sort(batchToAbort);//easier to read logs LOG.info("Aborted the following transactions due to timeout: " + batchToAbort.toString()); } else {
