This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new fc390a6 Expose EventTime consistently as a non-pointer (#186)
fc390a6 is described below
commit fc390a6a37f3cbd94ac46b3b5e4239b3ca5df875
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Feb 14 10:44:51 2020 -0800
Expose EventTime consistently as a non-pointer (#186)
* Expose EventTime consistently as a non-pointer
* Expanded docs
---
pulsar/consumer_test.go | 2 +-
pulsar/dlq_router.go | 3 +--
pulsar/message.go | 8 ++++++--
pulsar/producer_partition.go | 4 ++--
pulsar/producer_test.go | 2 +-
5 files changed, 11 insertions(+), 8 deletions(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 08c994b..3e256f8 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -503,7 +503,7 @@ func TestConsumerEventTime(t *testing.T) {
et := timeFromUnixTimestampMillis(uint64(5))
_, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("test"),
- EventTime: &et,
+ EventTime: et,
})
assert.Nil(t, err)
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 3373a9f..69b45d7 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -92,12 +92,11 @@ func (r *dlqRouter) run() {
msg := cm.Message.(*message)
msgID := msg.ID()
- eventTime := msg.EventTime()
producer.SendAsync(context.Background(),
&ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
Properties: msg.Properties(),
- EventTime: &eventTime,
+ EventTime: msg.EventTime(),
ReplicationClusters: msg.replicationClusters,
}, func(MessageID, *ProducerMessage, error) {
r.log.WithField("msgID", msgID).Debug("Sent
message to DLQ")
diff --git a/pulsar/message.go b/pulsar/message.go
index 735ca41..bc38ebb 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -34,7 +34,11 @@ type ProducerMessage struct {
Properties map[string]string
// EventTime set the event time for a given message
- EventTime *time.Time
+ // By default, messages don't have an event time associated, while the
publish
+ // time will be be always present.
+ // Set the event time to a non-zero timestamp to explicitly declare the
time
+ // that the event "happened", as opposed to when the message is being
published.
+ EventTime time.Time
// ReplicationClusters override the replication clusters for this
message.
ReplicationClusters []string
@@ -77,7 +81,7 @@ type Message interface {
// EventTime get the event time associated with this message. It is
typically set by the applications via
// `ProducerMessage.EventTime`.
- // If there isn't any event time associated with this event, it will be
nil.
+ // If EventTime is 0, it means there isn't any event time associated
with this message.
EventTime() time.Time
// Key get the key of the message, if any
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index a24ff66..f10adcf 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -249,8 +249,8 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
PayloadSize: proto.Int(len(msg.Payload)),
}
- if msg.EventTime != nil {
- smm.EventTime =
proto.Uint64(internal.TimestampMillis(*msg.EventTime))
+ if msg.EventTime.UnixNano() != 0 {
+ smm.EventTime =
proto.Uint64(internal.TimestampMillis(msg.EventTime))
}
if msg.Key != "" {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 5e40ec2..66fa8a0 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -244,7 +244,7 @@ func TestEventTime(t *testing.T) {
eventTime := timeFromUnixTimestampMillis(uint64(1565161612))
ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("test-event-time")),
- EventTime: &eventTime,
+ EventTime: eventTime,
})
assert.Nil(t, err)
assert.NotNil(t, ID)