GitHub user srdo opened a pull request:
https://github.com/apache/storm/pull/1832
STORM-2250: Refactor new Kafka spout so it is easier to read, and easier to
test without breaking encapsulation.
Making these changes based on a discussion here
https://github.com/apache/storm/pull/1826 and
https://github.com/apache/storm/pull/1825 about the spout getting a bit large.
This PR makes the following changes:
* OffsetEntry was moved into a new class and renamed OffsetManager
* OffsetManager.commit now returns numCommittedOffsets, so it doesn't have
to refer to a KafkaSpout internal variable.
* KafkaSpout.commitOffsetsForAckedTuples no longer iterates over acked
twice. The second iteration was used to commit offsets to OffsetEntries, but we
may as well use nextCommitOffsets for that iteration, since those were the
offsets committed to Kafka. This also saves us a null check in
OffsetManager.commit.
* OffsetManager.commit no longer checks if the parameter is null. It should
no longer be possible that it is null, unless there's a bug, in which case we
want to throw an NPE so we can fix it.
* Timer was moved into a separate class and renamed
PeriodicallyExpiringTimer.
* PeriodicallyExpiringTimer supports Storm's time simulation. This also
means that it only supports time units down to milliseconds. If we need
nanosecond precision, we need to update Storm's Utils.Time.
* Cleaned up a few redundant version declarations in the pom. Also switched
out hamcrest-all for hamcrest-core + hamcrest-library, since tests were
throwing NoSuchMethodError when an assertThat failed.
* Updated the tests to use time simulation, and to check if the spout calls
KafkaConsumer.commitSync instead of reading the spout's internal state
I think it would be nice if RetryService also supported time simulation,
let me know what you think @hmcl
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/srdo/storm STORM-2250
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/1832.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1832
----
commit 119e35611ad841e703db6ce4b77cf483b9859df7
Author: Stig Rohde Døssing <[email protected]>
Date: 2016-12-17T13:48:44Z
STORM-2250: Put Timer and OffsetEntry into internal package classes,
slightly adjust KafkaSpout.commitOffsetsForAckedTuples to iterate over
nextCommitOffsets instead of acked for second loop
commit 7e59df243f4d633c801f250da4ceddb6b583fc64
Author: Stig Rohde Døssing <[email protected]>
Date: 2016-12-17T16:33:48Z
STORM-2250: Rewrite single topic kafka tests to use simulated time + a spy
to check if offsets are committed, rather than using a package private field.
Get rid of Timer factories again, since we can just use Storm's time simulation
in tests. Remove some redundant version declarations in poms
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---