wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743352175
##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
// Decryption decryption related fields to decrypt the encrypted message
Decryption *MessageDecryptionInfo
+
+ // If enabled, the default implementation of NackBackoffPolicy will be
used to calculate the delay time of
+ // nack backoff, Default: false.
+ EnableDefaultNackBackoffPolicy bool
Review comment:
If there is no `EnableDefaultNackBackoffPolicy`, it will invade the
existing code logic. When the NackBackoffPolicy policy is empty, suppose we use
the default NackBackoffPolicy, then when the user uses the Nack(Message)
interface, the new implementation will be used.
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
Review comment:
ok, will fix this
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
+ maxNackTimeMs := 1000 * 60 * 10 // 10min
+
+ if redeliveryCount < 0 {
+ return minNackTimeMs
+ }
+
+ return
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)),
float64(maxNackTimeMs)))
Review comment:
Sure, will add comment for this change
##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
doneOnce sync.Once
negativeAcks map[messageID]time.Time
rc redeliveryConsumer
- tick *time.Ticker
+ nackBackoff NackBackoffPolicy
+ trackFlag bool
delay time.Duration
log log.Logger
}
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger
log.Logger) *negativeAcksTracker {
- t := &negativeAcksTracker{
- doneCh: make(chan interface{}),
- negativeAcks: make(map[messageID]time.Time),
- rc: rc,
- tick: time.NewTicker(delay / 3),
- delay: delay,
- log: logger,
- }
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+ nackBackoffPolicy NackBackoffPolicy, logger log.Logger)
*negativeAcksTracker {
+
+ t := new(negativeAcksTracker)
Review comment:
They are the same effect
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
Review comment:
ok, will fix this
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
Review comment:
Because the << operation is required to cooperate with redeliveryCount,
the unit conversion is still required, which will be converted to time.Duration
in subsequent use.
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
Review comment:
Because the `<<` operation is required to cooperate with
redeliveryCount, the unit conversion is still required, which will be converted
to time.Duration in subsequent use.
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
+ maxNackTimeMs := 1000 * 60 * 10 // 10min
+
+ if redeliveryCount < 0 {
+ return minNackTimeMs
+ }
+
+ return
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)),
float64(maxNackTimeMs)))
Review comment:
Here we will first get the redeliveryCount object from the
CommandMessage, and then start the << operation from minNackTimeMs to calculate
the current length of time that nack needs to be executed, and then compare it
with maxNackTimeMs, and take their maximum value as the nack duration. nack
will increase from minNackTimeMs to maxNackTimeMs according to the above rule
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
+ maxNackTimeMs := 1000 * 60 * 10 // 10min
+
+ if redeliveryCount < 0 {
+ return minNackTimeMs
+ }
+
+ return
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)),
float64(maxNackTimeMs)))
Review comment:
Here we will first get the redeliveryCount object from the
CommandMessage, and then start the << operation from minNackTimeMs to calculate
the current length of time that nack needs to be executed, and then compare it
with maxNackTimeMs, and take their maximum value as the nack duration. nack
will increase from minNackTimeMs to maxNackTimeMs according to the above rule
##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
doneOnce sync.Once
negativeAcks map[messageID]time.Time
rc redeliveryConsumer
- tick *time.Ticker
+ nackBackoff NackBackoffPolicy
+ trackFlag bool
delay time.Duration
log log.Logger
}
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger
log.Logger) *negativeAcksTracker {
- t := &negativeAcksTracker{
- doneCh: make(chan interface{}),
- negativeAcks: make(map[messageID]time.Time),
- rc: rc,
- tick: time.NewTicker(delay / 3),
- delay: delay,
- log: logger,
- }
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+ nackBackoffPolicy NackBackoffPolicy, logger log.Logger)
*negativeAcksTracker {
+
+ t := new(negativeAcksTracker)
+
+ // When using NackBackoffPolicy, the delay time needs to be calculated
based on the RedeliveryCount field in
+ // the CommandMessage, so for the original default Nack() logic, we
still keep the negativeAcksTracker created
+ // when we open a gorutine to execute the logic of `t.track()`. But for
the NackBackoffPolicy method, we need
+ // to execute the logic of `t.track()` when AddMessage().
+ if nackBackoffPolicy != nil {
Review comment:
Yes, agree with your point of view. The problem here is because, for
nackbackoff, we can't directly get the corresponding nackDelayTime, we need to
get the redeliveryCount through the CommandMessage and then calculate the
nackDelayTime, then we can determine the time.NewTicker based on the
nackDelayTime. It is precisely because of such a relationship that the if
statement is added
##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
t.negativeAcks[batchMsgID] = targetTime
}
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {
Review comment:
Because we need to get redeliveryCount through the Message interface
##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
t.rc.Redeliver(msgIds)
}
}
-
}
}
}
func (t *negativeAcksTracker) Close() {
// allow Close() to be invoked multiple times by consumer_partition to
avoid panic
t.doneOnce.Do(func() {
- t.tick.Stop()
Review comment:
In the current implementation situation, if we use the t.ticker in the
struct, there will be a data race, so now we use the temporary variables of the
ticker, and there is no good way to see how to close the temporarily created
ticker.
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
+ maxNackTimeMs := 1000 * 60 * 10 // 10min
+
+ if redeliveryCount < 0 {
+ return minNackTimeMs
+ }
+
+ return
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)),
float64(maxNackTimeMs)))
Review comment:
Here we will first get the redeliveryCount object from the
CommandMessage, and then start the << operation from minNackTimeMs to calculate
the current length of time that nack needs to be executed, and then compare it
with maxNackTimeMs, and take their maximum value as the nack duration. nack
will increase from minNackTimeMs to maxNackTimeMs according to the above rule
##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
// Decryption decryption related fields to decrypt the encrypted message
Decryption *MessageDecryptionInfo
+
+ // If enabled, the default implementation of NackBackoffPolicy will be
used to calculate the delay time of
+ // nack backoff, Default: false.
+ EnableDefaultNackBackoffPolicy bool
Review comment:
If there is no `EnableDefaultNackBackoffPolicy`, it will invade the
existing code logic. When the NackBackoffPolicy policy is empty, suppose we use
the default NackBackoffPolicy, then when the user uses the Nack(Message)
interface, the new implementation will be used.
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
Review comment:
ok, will fix this
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
+ maxNackTimeMs := 1000 * 60 * 10 // 10min
+
+ if redeliveryCount < 0 {
+ return minNackTimeMs
+ }
+
+ return
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)),
float64(maxNackTimeMs)))
Review comment:
Sure, will add comment for this change
##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
doneOnce sync.Once
negativeAcks map[messageID]time.Time
rc redeliveryConsumer
- tick *time.Ticker
+ nackBackoff NackBackoffPolicy
+ trackFlag bool
delay time.Duration
log log.Logger
}
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger
log.Logger) *negativeAcksTracker {
- t := &negativeAcksTracker{
- doneCh: make(chan interface{}),
- negativeAcks: make(map[messageID]time.Time),
- rc: rc,
- tick: time.NewTicker(delay / 3),
- delay: delay,
- log: logger,
- }
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+ nackBackoffPolicy NackBackoffPolicy, logger log.Logger)
*negativeAcksTracker {
+
+ t := new(negativeAcksTracker)
Review comment:
They are the same effect
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
Review comment:
ok, will fix this
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
Review comment:
Because the << operation is required to cooperate with redeliveryCount,
the unit conversion is still required, which will be converted to time.Duration
in subsequent use.
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
Review comment:
Because the `<<` operation is required to cooperate with
redeliveryCount, the unit conversion is still required, which will be converted
to time.Duration in subsequent use.
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
+ maxNackTimeMs := 1000 * 60 * 10 // 10min
+
+ if redeliveryCount < 0 {
+ return minNackTimeMs
+ }
+
+ return
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)),
float64(maxNackTimeMs)))
Review comment:
Here we will first get the redeliveryCount object from the
CommandMessage, and then start the << operation from minNackTimeMs to calculate
the current length of time that nack needs to be executed, and then compare it
with maxNackTimeMs, and take their maximum value as the nack duration. nack
will increase from minNackTimeMs to maxNackTimeMs according to the above rule
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
+ maxNackTimeMs := 1000 * 60 * 10 // 10min
+
+ if redeliveryCount < 0 {
+ return minNackTimeMs
+ }
+
+ return
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)),
float64(maxNackTimeMs)))
Review comment:
Here we will first get the redeliveryCount object from the
CommandMessage, and then start the << operation from minNackTimeMs to calculate
the current length of time that nack needs to be executed, and then compare it
with maxNackTimeMs, and take their maximum value as the nack duration. nack
will increase from minNackTimeMs to maxNackTimeMs according to the above rule
##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
doneOnce sync.Once
negativeAcks map[messageID]time.Time
rc redeliveryConsumer
- tick *time.Ticker
+ nackBackoff NackBackoffPolicy
+ trackFlag bool
delay time.Duration
log log.Logger
}
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger
log.Logger) *negativeAcksTracker {
- t := &negativeAcksTracker{
- doneCh: make(chan interface{}),
- negativeAcks: make(map[messageID]time.Time),
- rc: rc,
- tick: time.NewTicker(delay / 3),
- delay: delay,
- log: logger,
- }
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+ nackBackoffPolicy NackBackoffPolicy, logger log.Logger)
*negativeAcksTracker {
+
+ t := new(negativeAcksTracker)
+
+ // When using NackBackoffPolicy, the delay time needs to be calculated
based on the RedeliveryCount field in
+ // the CommandMessage, so for the original default Nack() logic, we
still keep the negativeAcksTracker created
+ // when we open a gorutine to execute the logic of `t.track()`. But for
the NackBackoffPolicy method, we need
+ // to execute the logic of `t.track()` when AddMessage().
+ if nackBackoffPolicy != nil {
Review comment:
Yes, agree with your point of view. The problem here is because, for
nackbackoff, we can't directly get the corresponding nackDelayTime, we need to
get the redeliveryCount through the CommandMessage and then calculate the
nackDelayTime, then we can determine the time.NewTicker based on the
nackDelayTime. It is precisely because of such a relationship that the if
statement is added
##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
t.negativeAcks[batchMsgID] = targetTime
}
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {
Review comment:
Because we need to get redeliveryCount through the Message interface
##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
t.rc.Redeliver(msgIds)
}
}
-
}
}
}
func (t *negativeAcksTracker) Close() {
// allow Close() to be invoked multiple times by consumer_partition to
avoid panic
t.doneOnce.Do(func() {
- t.tick.Stop()
Review comment:
In the current implementation situation, if we use the t.ticker in the
struct, there will be a data race, so now we use the temporary variables of the
ticker, and there is no good way to see how to close the temporarily created
ticker.
##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 10) // 10sec
+ maxNackTimeMs := 1000 * 60 * 10 // 10min
+
+ if redeliveryCount < 0 {
+ return minNackTimeMs
+ }
+
+ return
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)),
float64(maxNackTimeMs)))
Review comment:
Here we will first get the redeliveryCount object from the
CommandMessage, and then start the << operation from minNackTimeMs to calculate
the current length of time that nack needs to be executed, and then compare it
with maxNackTimeMs, and take their maximum value as the nack duration. nack
will increase from minNackTimeMs to maxNackTimeMs according to the above rule
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]