fix #2015, do not send AR or LP when no pending messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/bcdbfc9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/bcdbfc9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/bcdbfc9f Branch: refs/heads/master Commit: bcdbfc9f662aa21e76b4385cbb91874c41cd80db Parents: 47d867d Author: manuzhang <[email protected]> Authored: Thu Mar 24 15:46:39 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:23:31 2016 +0800 ---------------------------------------------------------------------- .../main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala | 2 +- .../src/main/scala/io/gearpump/streaming/task/Subscription.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bcdbfc9f/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala index 42d6ca7..03bfa81 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -50,7 +50,7 @@ class JavaStream[T](val stream: Stream[T]) { stream.log() } - def merge(other: JavaStream[T], description: String = null): JavaStream[T] = { + def merge(other: JavaStream[T], description: String): JavaStream[T] = { new JavaStream[T](stream.merge(other.stream, description)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bcdbfc9f/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala index 5f321e4..0b1fa29 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala @@ -187,7 +187,7 @@ class Subscription( def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = { minClockValue.indices.foreach { i => - if (minClockValue(i) == stallingTime && allowSendingMoreMessages) { + if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0 && allowSendingMoreMessages) { sendAckRequest(i) sendLatencyProbe(i) }
