Repository: incubator-sentry Updated Branches: refs/heads/sentry-hdfs-plugin d94bdb23e -> f00a8f251
SENTRY-432: Fixing updateLog bug that can cause a missed entry during first few updates Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/f00a8f25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/f00a8f25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/f00a8f25 Branch: refs/heads/sentry-hdfs-plugin Commit: f00a8f2516388252c5ab6d6fbd4740083ded89b0 Parents: d94bdb2 Author: Arun Suresh <[email protected]> Authored: Sun Nov 9 18:09:10 2014 -0800 Committer: Arun Suresh <[email protected]> Committed: Mon Nov 10 10:28:03 2014 -0800 ---------------------------------------------------------------------- .../sentry/hdfs/SentryHDFSServiceProcessor.java | 15 +++ .../org/apache/sentry/hdfs/UpdateForwarder.java | 34 +++--- .../apache/sentry/hdfs/TestUpdateForwarder.java | 111 ++++++++++++------- 3 files changed, 105 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/f00a8f25/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java index 5fe89a8..cc849b9 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java @@ -58,6 +58,21 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface { } retVal.getAuthzPermUpdate().add(update.toThrift()); } + if (LOGGER.isDebugEnabled()) { + StringBuilder permSeq = new StringBuilder("<"); + for (PermissionsUpdate permUpdate : permUpdates) { + permSeq.append(permUpdate.getSeqNum()).append(","); + } + permSeq.append(">"); + StringBuilder pathSeq = new StringBuilder("<"); + for (PathsUpdate pathUpdate : pathUpdates) { + pathSeq.append(pathUpdate.getSeqNum()).append(","); + } + pathSeq.append(">"); + LOGGER.debug("#### Updates requested from HDFS [" + + "permReq=" + permSeqNum + ", permResp=" + permSeq + "] " + + "[pathReq=" + pathSeqNum + ", pathResp=" + pathSeq + "]"); + } } catch (Exception e) { LOGGER.error("Error Sending updates to downstream Cache", e); throw new TException(e); http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/f00a8f25/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java index 2815880..f321d3d 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java @@ -169,17 +169,26 @@ public class UpdateForwarder<K extends Updateable.Update> implements private void appendToUpdateLog(K update) { synchronized (updateLog) { + boolean logCompacted = false; if (updateLogSize > 0) { if (update.hasFullImage() || (updateLog.size() == updateLogSize)) { // Essentially a log compaction updateLog.clear(); updateLog.add(update.hasFullImage() ? update : createFullImageUpdate(update.getSeqNum())); + logCompacted = true; } else { updateLog.add(update); } } lastCommittedSeqNum.set(update.getSeqNum()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("#### Appending to Update Log [" + + "type=" + update.getClass() + ", " + + "lastCommit=" + lastCommittedSeqNum.get() + ", " + + "lastSeen=" + lastSeenSeqNum.get() + ", " + + "logCompacted=" + logCompacted + "]"); + } } } @@ -192,6 +201,14 @@ public class UpdateForwarder<K extends Updateable.Update> implements List<K> retVal = new LinkedList<K>(); synchronized (updateLog) { long currSeqNum = lastCommittedSeqNum.get(); + if (LOGGER.isDebugEnabled() && (updateable != null)) { + LOGGER.debug("#### GetAllUpdatesFrom [" + + "type=" + updateable.getClass() + ", " + + "reqSeqNum=" + seqNum + ", " + + "lastCommit=" + lastCommittedSeqNum.get() + ", " + + "lastSeen=" + lastSeenSeqNum.get() + ", " + + "updateLogSize=" + updateLog.size() + "]"); + } if (updateLogSize == 0) { // no updatelog configured.. return retVal; @@ -227,21 +244,10 @@ public class UpdateForwarder<K extends Updateable.Update> implements } else { // increment iterator to requested seqNum Iterator<K> iter = updateLog.iterator(); - K u = null; while (iter.hasNext()) { - u = iter.next(); - if (u.getSeqNum() == seqNum) { - break; - } - } - // add all updates from requestedSeq - // to committedSeqNum - for (long seq = seqNum; seq <= currSeqNum; seq ++) { - retVal.add(u); - if (iter.hasNext()) { - u = iter.next(); - } else { - break; + K elem = iter.next(); + if (elem.getSeqNum() >= seqNum) { + retVal.add(elem); } } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/f00a8f25/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java index d571df8..0c55bb1 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java +++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java @@ -38,16 +38,16 @@ public class TestUpdateForwarder { static class DummyUpdate implements Update { private long seqNum = 0; private boolean hasFullUpdate = false; - private String stuff; + private String state; public DummyUpdate(long seqNum, boolean hasFullUpdate) { this.seqNum = seqNum; this.hasFullUpdate = hasFullUpdate; } - public String getStuff() { - return stuff; + public String getState() { + return state; } - public DummyUpdate setStuff(String stuff) { - this.stuff = stuff; + public DummyUpdate setState(String stuff) { + this.state = stuff; return this; } @Override @@ -72,7 +72,7 @@ public class TestUpdateForwarder { @Override public void updatePartial(Iterable<DummyUpdate> update, ReadWriteLock lock) { for (DummyUpdate u : update) { - state.add(u.getStuff()); + state.add(u.getState()); lastUpdatedSeqNum = u.seqNum; } } @@ -81,7 +81,7 @@ public class TestUpdateForwarder { public Updateable<DummyUpdate> updateFull(DummyUpdate update) { DummyUpdatable retVal = new DummyUpdatable(); retVal.lastUpdatedSeqNum = update.seqNum; - retVal.state = Lists.newArrayList(update.stuff.split(",")); + retVal.state = Lists.newArrayList(update.state.split(",")); return retVal; } @@ -93,7 +93,7 @@ public class TestUpdateForwarder { @Override public DummyUpdate createFullImageUpdate(long currSeqNum) { DummyUpdate retVal = new DummyUpdate(currSeqNum, true); - retVal.stuff = Joiner.on(",").join(state); + retVal.state = Joiner.on(",").join(state); return retVal; } @@ -111,7 +111,7 @@ public class TestUpdateForwarder { @Override public DummyUpdate retrieveFullImage(long currSeqNum) { DummyUpdate retVal = new DummyUpdate(currSeqNum, true); - retVal.stuff = state; + retVal.state = state; return retVal; } } @@ -125,12 +125,12 @@ public class TestUpdateForwarder { Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum()); List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); Assert.assertTrue(allUpdates.size() == 1); - Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff()); + Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); // If the current process has restarted the input seqNum will be > currSeq allUpdates = updateForwarder.getAllUpdatesFrom(100); Assert.assertTrue(allUpdates.size() == 1); - Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff()); + Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); Assert.assertEquals(-2, allUpdates.get(0).getSeqNum()); allUpdates = updateForwarder.getAllUpdatesFrom(-1); Assert.assertEquals(0, allUpdates.size()); @@ -142,15 +142,44 @@ public class TestUpdateForwarder { imageRetreiver.setState("a,b,c"); UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( new DummyUpdatable(), imageRetreiver, 5); - updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d")); + updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); while(!updateForwarder.areAllUpdatesCommited()) { Thread.sleep(100); } Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum()); List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); Assert.assertEquals(2, allUpdates.size()); - Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff()); - Assert.assertEquals("d", allUpdates.get(1).getStuff()); + Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); + Assert.assertEquals("d", allUpdates.get(1).getState()); + } + + // This happens when we the first update from HMS is a -1 (If the heartbeat + // thread checks Sentry's current seqNum before any update has come in).. + // This will lead the first and second entries in the updatelog to differ + // by more than +1.. + @Test + public void testUpdateReceiveWithNullImageRetriver() throws Exception { + UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( + new DummyUpdatable(), null, 5); + updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(1); + Assert.assertEquals("a", allUpdates.get(0).getState()); + updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("b")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("c")); + while(!updateForwarder.areAllUpdatesCommited()) { + Thread.sleep(100); + } + Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum()); + allUpdates = updateForwarder.getAllUpdatesFrom(0); + Assert.assertEquals(2, allUpdates.size()); + Assert.assertEquals("b", allUpdates.get(0).getState()); + Assert.assertEquals("c", allUpdates.get(1).getState()); } @Test @@ -159,7 +188,7 @@ public class TestUpdateForwarder { imageRetreiver.setState("a,b,c"); UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( new DummyUpdatable(), imageRetreiver, 5); - updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d")); + updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); while(!updateForwarder.areAllUpdatesCommited()) { Thread.sleep(100); } @@ -167,8 +196,8 @@ public class TestUpdateForwarder { List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); Assert.assertEquals(2, allUpdates.size()); - updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e")); - updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f")); + updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e")); + updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f")); while(!updateForwarder.areAllUpdatesCommited()) { Thread.sleep(100); @@ -176,23 +205,23 @@ public class TestUpdateForwarder { Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum()); allUpdates = updateForwarder.getAllUpdatesFrom(0); Assert.assertEquals(4, allUpdates.size()); - Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff()); + Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); Assert.assertEquals(4, allUpdates.get(0).getSeqNum()); - Assert.assertEquals("d", allUpdates.get(1).getStuff()); + Assert.assertEquals("d", allUpdates.get(1).getState()); Assert.assertEquals(5, allUpdates.get(1).getSeqNum()); - Assert.assertEquals("e", allUpdates.get(2).getStuff()); + Assert.assertEquals("e", allUpdates.get(2).getState()); Assert.assertEquals(6, allUpdates.get(2).getSeqNum()); - Assert.assertEquals("f", allUpdates.get(3).getStuff()); + Assert.assertEquals("f", allUpdates.get(3).getState()); Assert.assertEquals(7, allUpdates.get(3).getSeqNum()); - updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g")); + updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g")); while(!updateForwarder.areAllUpdatesCommited()) { Thread.sleep(100); } Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum()); allUpdates = updateForwarder.getAllUpdatesFrom(8); Assert.assertEquals(1, allUpdates.size()); - Assert.assertEquals("g", allUpdates.get(0).getStuff()); + Assert.assertEquals("g", allUpdates.get(0).getState()); } @Test @@ -201,13 +230,13 @@ public class TestUpdateForwarder { imageRetreiver.setState("a,b,c"); UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( new DummyUpdatable(), imageRetreiver, 5); - updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d")); + updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); while(!updateForwarder.areAllUpdatesCommited()) { Thread.sleep(100); } - updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e")); - updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f")); + updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e")); + updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f")); while(!updateForwarder.areAllUpdatesCommited()) { Thread.sleep(100); @@ -215,29 +244,29 @@ public class TestUpdateForwarder { Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum()); List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); Assert.assertEquals(4, allUpdates.size()); - Assert.assertEquals("f", allUpdates.get(3).getStuff()); + Assert.assertEquals("f", allUpdates.get(3).getState()); Assert.assertEquals(7, allUpdates.get(3).getSeqNum()); - updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g")); + updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g")); while(!updateForwarder.areAllUpdatesCommited()) { Thread.sleep(100); } Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum()); allUpdates = updateForwarder.getAllUpdatesFrom(8); Assert.assertEquals(1, allUpdates.size()); - Assert.assertEquals("g", allUpdates.get(0).getStuff()); + Assert.assertEquals("g", allUpdates.get(0).getState()); imageRetreiver.setState("a,b,c,d,e,f,g,h"); // New update comes with SeqNum = 1 - updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setStuff("h")); + updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setState("h")); while(!updateForwarder.areAllUpdatesCommited()) { Thread.sleep(100); } // NN plugin asks for next update allUpdates = updateForwarder.getAllUpdatesFrom(9); Assert.assertEquals(1, allUpdates.size()); - Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getStuff()); + Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState()); Assert.assertEquals(1, allUpdates.get(0).getSeqNum()); } @@ -247,7 +276,7 @@ public class TestUpdateForwarder { imageRetreiver.setState("a,b,c"); UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>( new DummyUpdatable(), imageRetreiver, 5); - updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d")); + updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); while(!updateForwarder.areAllUpdatesCommited()) { Thread.sleep(100); } @@ -255,12 +284,12 @@ public class TestUpdateForwarder { List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); Assert.assertEquals(2, allUpdates.size()); - updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e")); - updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f")); - updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g")); - updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setStuff("h")); - updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setStuff("i")); - updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setStuff("j")); + updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e")); + updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f")); + updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g")); + updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setState("h")); + updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setState("i")); + updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setState("j")); while(!updateForwarder.areAllUpdatesCommited()) { Thread.sleep(100); @@ -268,11 +297,11 @@ public class TestUpdateForwarder { Assert.assertEquals(11, updateForwarder.getLastUpdatedSeqNum()); allUpdates = updateForwarder.getAllUpdatesFrom(0); Assert.assertEquals(3, allUpdates.size()); - Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getStuff()); + Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState()); Assert.assertEquals(9, allUpdates.get(0).getSeqNum()); - Assert.assertEquals("i", allUpdates.get(1).getStuff()); + Assert.assertEquals("i", allUpdates.get(1).getState()); Assert.assertEquals(10, allUpdates.get(1).getSeqNum()); - Assert.assertEquals("j", allUpdates.get(2).getStuff()); + Assert.assertEquals("j", allUpdates.get(2).getState()); Assert.assertEquals(11, allUpdates.get(2).getSeqNum()); } }
