This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new fc390a6  Expose EventTime consistently as a non-pointer (#186)
fc390a6 is described below

commit fc390a6a37f3cbd94ac46b3b5e4239b3ca5df875
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Feb 14 10:44:51 2020 -0800

    Expose EventTime consistently as a non-pointer (#186)
    
    * Expose EventTime consistently as a non-pointer
    
    * Expanded docs
---
 pulsar/consumer_test.go      | 2 +-
 pulsar/dlq_router.go         | 3 +--
 pulsar/message.go            | 8 ++++++--
 pulsar/producer_partition.go | 4 ++--
 pulsar/producer_test.go      | 2 +-
 5 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 08c994b..3e256f8 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -503,7 +503,7 @@ func TestConsumerEventTime(t *testing.T) {
        et := timeFromUnixTimestampMillis(uint64(5))
        _, err = producer.Send(ctx, &ProducerMessage{
                Payload:   []byte("test"),
-               EventTime: &et,
+               EventTime: et,
        })
        assert.Nil(t, err)
 
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 3373a9f..69b45d7 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -92,12 +92,11 @@ func (r *dlqRouter) run() {
 
                        msg := cm.Message.(*message)
                        msgID := msg.ID()
-                       eventTime := msg.EventTime()
                        producer.SendAsync(context.Background(), 
&ProducerMessage{
                                Payload:             msg.Payload(),
                                Key:                 msg.Key(),
                                Properties:          msg.Properties(),
-                               EventTime:           &eventTime,
+                               EventTime:           msg.EventTime(),
                                ReplicationClusters: msg.replicationClusters,
                        }, func(MessageID, *ProducerMessage, error) {
                                r.log.WithField("msgID", msgID).Debug("Sent 
message to DLQ")
diff --git a/pulsar/message.go b/pulsar/message.go
index 735ca41..bc38ebb 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -34,7 +34,11 @@ type ProducerMessage struct {
        Properties map[string]string
 
        // EventTime set the event time for a given message
-       EventTime *time.Time
+       // By default, messages don't have an event time associated, while the 
publish
+       // time will be be always present.
+       // Set the event time to a non-zero timestamp to explicitly declare the 
time
+       // that the event "happened", as opposed to when the message is being 
published.
+       EventTime time.Time
 
        // ReplicationClusters override the replication clusters for this 
message.
        ReplicationClusters []string
@@ -77,7 +81,7 @@ type Message interface {
 
        // EventTime get the event time associated with this message. It is 
typically set by the applications via
        // `ProducerMessage.EventTime`.
-       // If there isn't any event time associated with this event, it will be 
nil.
+       // If EventTime is 0, it means there isn't any event time associated 
with this message.
        EventTime() time.Time
 
        // Key get the key of the message, if any
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index a24ff66..f10adcf 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -249,8 +249,8 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                PayloadSize: proto.Int(len(msg.Payload)),
        }
 
-       if msg.EventTime != nil {
-               smm.EventTime = 
proto.Uint64(internal.TimestampMillis(*msg.EventTime))
+       if msg.EventTime.UnixNano() != 0 {
+               smm.EventTime = 
proto.Uint64(internal.TimestampMillis(msg.EventTime))
        }
 
        if msg.Key != "" {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 5e40ec2..66fa8a0 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -244,7 +244,7 @@ func TestEventTime(t *testing.T) {
        eventTime := timeFromUnixTimestampMillis(uint64(1565161612))
        ID, err := producer.Send(context.Background(), &ProducerMessage{
                Payload:   []byte(fmt.Sprintf("test-event-time")),
-               EventTime: &eventTime,
+               EventTime: eventTime,
        })
        assert.Nil(t, err)
        assert.NotNil(t, ID)

Reply via email to