Repository: kafka Updated Branches: refs/heads/trunk 0fba52960 -> b380a82d5
MINOR: improve MinTimestampTrackerTest and fix NPE when null element removed Author: Damian Guy <[email protected]> Reviewers: Matthias J. Sax, Guozhang Wang Closes #2611 from dguy/testing Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b380a82d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b380a82d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b380a82d Branch: refs/heads/trunk Commit: b380a82d5be7c68141590467911ecb61db45ed1e Parents: 0fba529 Author: Damian Guy <[email protected]> Authored: Wed Mar 1 12:07:46 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Mar 1 12:07:46 2017 -0800 ---------------------------------------------------------------------- .../internals/MinTimestampTracker.java | 15 ++- .../internals/MinTimestampTrackerTest.java | 102 ++++++++----------- 2 files changed, 55 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b380a82d/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java index ef7d990..a67675c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java @@ -34,7 +34,7 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> { /** * @throws NullPointerException if the element is null */ - public void addElement(Stamped<E> elem) { + public void addElement(final Stamped<E> elem) { if (elem == null) throw new NullPointerException(); Stamped<E> minElem = descendingSubsequence.peekLast(); @@ -45,12 +45,19 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> { descendingSubsequence.offerLast(elem); } - public void removeElement(Stamped<E> elem) { - if (elem != null && descendingSubsequence.peekFirst() == elem) + public void removeElement(final Stamped<E> elem) { + if (elem == null) { + return; + } + + if (descendingSubsequence.peekFirst() == elem) { descendingSubsequence.removeFirst(); + } - if (descendingSubsequence.isEmpty()) + if (descendingSubsequence.isEmpty()) { lastKnownTime = elem.timestamp; + } + } public int size() { http://git-wip-us.apache.org/repos/asf/kafka/blob/b380a82d/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java index c398dc5..f6a1518 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java @@ -16,77 +16,63 @@ */ package org.apache.kafka.streams.processor.internals; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import org.junit.Test; public class MinTimestampTrackerTest { - private Stamped<String> elem(long timestamp) { - return new Stamped<>("", timestamp); - } + private MinTimestampTracker<String> tracker = new MinTimestampTracker<>(); - @SuppressWarnings("unchecked") @Test - public void testTracking() { - TimestampTracker<String> tracker = new MinTimestampTracker<>(); - - Object[] elems = new Object[]{ - elem(100), elem(101), elem(102), elem(98), elem(99), elem(100) - }; - - int insertionIndex = 0; - int removalIndex = 0; - - // add 100 - tracker.addElement((Stamped<String>) elems[insertionIndex++]); - assertEquals(100L, tracker.get()); - - // add 101 - tracker.addElement((Stamped<String>) elems[insertionIndex++]); - assertEquals(100L, tracker.get()); - - // remove 100 - tracker.removeElement((Stamped<String>) elems[removalIndex++]); - assertEquals(101L, tracker.get()); - - // add 102 - tracker.addElement((Stamped<String>) elems[insertionIndex++]); - assertEquals(101L, tracker.get()); - - // add 98 - tracker.addElement((Stamped<String>) elems[insertionIndex++]); - assertEquals(98L, tracker.get()); - - // add 99 - tracker.addElement((Stamped<String>) elems[insertionIndex++]); - assertEquals(98L, tracker.get()); - - // add 100 - tracker.addElement((Stamped<String>) elems[insertionIndex++]); - assertEquals(98L, tracker.get()); + public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() throws Exception { + assertThat(tracker.get(), equalTo(TimestampTracker.NOT_KNOWN)); + } - // remove 101 - tracker.removeElement((Stamped<String>) elems[removalIndex++]); - assertEquals(98L, tracker.get()); + @Test + public void shouldReturnTimestampOfOnlyRecord() throws Exception { + tracker.addElement(elem(100)); + assertThat(tracker.get(), equalTo(100L)); + } - // remove 102 - tracker.removeElement((Stamped<String>) elems[removalIndex++]); - assertEquals(98L, tracker.get()); + @Test + public void shouldReturnLowestAvailableTimestampFromAllInputs() throws Exception { + tracker.addElement(elem(100)); + tracker.addElement(elem(99)); + tracker.addElement(elem(102)); + assertThat(tracker.get(), equalTo(99L)); + } - // remove 98 - tracker.removeElement((Stamped<String>) elems[removalIndex++]); - assertEquals(99L, tracker.get()); + @Test + public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() throws Exception { + final Stamped<String> lowest = elem(88); + tracker.addElement(lowest); + tracker.addElement(elem(101)); + tracker.addElement(elem(99)); + tracker.removeElement(lowest); + assertThat(tracker.get(), equalTo(99L)); + } - // remove 99 - tracker.removeElement((Stamped<String>) elems[removalIndex++]); - assertEquals(100L, tracker.get()); + @Test + public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() throws Exception { + final Stamped<String> record = elem(98); + tracker.addElement(record); + tracker.removeElement(record); + assertThat(tracker.get(), equalTo(98L)); + } - // remove 100 - tracker.removeElement((Stamped<String>) elems[removalIndex++]); - assertEquals(100L, tracker.get()); + @Test + public void shouldIgnoreNullRecordOnRemove() throws Exception { + tracker.removeElement(null); + } - assertEquals(insertionIndex, removalIndex); + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() throws Exception { + tracker.addElement(null); } + private Stamped<String> elem(final long timestamp) { + return new Stamped<>("", timestamp); + } } \ No newline at end of file
