This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 90e79ae373a [Go SDK]: Implement natsio.Read transform for reading from 
NATS (#29410)
90e79ae373a is described below

commit 90e79ae373ab38cf4e48e9854c28aaffb0938458
Author: Johanna Öjeling <[email protected]>
AuthorDate: Tue Dec 12 22:19:06 2023 +0100

    [Go SDK]: Implement natsio.Read transform for reading from NATS (#29410)
---
 CHANGES.md                                         |   1 +
 sdks/go/pkg/beam/io/natsio/common.go               |   4 +
 sdks/go/pkg/beam/io/natsio/end_estimator.go        |  77 ++++++
 sdks/go/pkg/beam/io/natsio/end_estimator_test.go   |  78 ++++++
 sdks/go/pkg/beam/io/natsio/example_test.go         |  18 ++
 sdks/go/pkg/beam/io/natsio/helper_test.go          |  48 +++-
 sdks/go/pkg/beam/io/natsio/read.go                 | 289 +++++++++++++++++++++
 sdks/go/pkg/beam/io/natsio/read_option.go          |  98 +++++++
 sdks/go/pkg/beam/io/natsio/read_test.go            | 212 +++++++++++++++
 .../beam/io/natsio/{common.go => time_policy.go}   |  50 ++--
 .../io/natsio/{common.go => time_policy_test.go}   |  55 ++--
 .../natsio/{common.go => watermark_estimator.go}   |  43 +--
 .../pkg/beam/io/natsio/watermark_estimator_test.go |  67 +++++
 sdks/go/pkg/beam/io/natsio/write_test.go           |   4 +-
 14 files changed, 943 insertions(+), 101 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 60b5a820cf3..70ce1a70b7e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
 * Adding support for LowCardinality DataType in ClickHouse (Java) 
([#29533](https://github.com/apache/beam/pull/29533)).
 * Added support for handling bad records to KafkaIO (Java) 
([#29546](https://github.com/apache/beam/pull/29546))
 * Add support for generating text embeddings in MLTransform for Vertex AI and 
Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564))
+* NATS IO connector added (Go) 
([#29000](https://github.com/apache/beam/issues/29000)).
 
 ## New Features / Improvements
 
diff --git a/sdks/go/pkg/beam/io/natsio/common.go 
b/sdks/go/pkg/beam/io/natsio/common.go
index 53f59551698..72640894c76 100644
--- a/sdks/go/pkg/beam/io/natsio/common.go
+++ b/sdks/go/pkg/beam/io/natsio/common.go
@@ -31,6 +31,10 @@ type natsFn struct {
 }
 
 func (fn *natsFn) Setup() error {
+       if fn.nc != nil && fn.js != nil {
+               return nil
+       }
+
        var opts []nats.Option
        if fn.CredsFile != "" {
                opts = append(opts, nats.UserCredentials(fn.CredsFile))
diff --git a/sdks/go/pkg/beam/io/natsio/end_estimator.go 
b/sdks/go/pkg/beam/io/natsio/end_estimator.go
new file mode 100644
index 00000000000..6b2f18e10ce
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/end_estimator.go
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import (
+       "context"
+       "errors"
+       "fmt"
+
+       "github.com/nats-io/nats.go/jetstream"
+)
+
+type endEstimator struct {
+       js      jetstream.JetStream
+       stream  string
+       subject string
+}
+
+func newEndEstimator(js jetstream.JetStream, stream string, subject string) 
*endEstimator {
+       return &endEstimator{
+               js:      js,
+               stream:  stream,
+               subject: subject,
+       }
+}
+
+func (e *endEstimator) Estimate() int64 {
+       ctx := context.Background()
+       end, err := e.getEndSeqNo(ctx)
+       if err != nil {
+               panic(err)
+       }
+       return end
+}
+
+func (e *endEstimator) getEndSeqNo(ctx context.Context) (int64, error) {
+       str, err := e.js.Stream(ctx, e.stream)
+       if err != nil {
+               return -1, fmt.Errorf("error getting stream: %v", err)
+       }
+
+       msg, err := str.GetLastMsgForSubject(ctx, e.subject)
+       if err != nil {
+               if isMessageNotFound(err) {
+                       return 1, nil
+               }
+
+               return -1, fmt.Errorf("error getting last message: %v", err)
+       }
+
+       return int64(msg.Sequence) + 1, nil
+}
+
+func isMessageNotFound(err error) bool {
+       var jsErr jetstream.JetStreamError
+       if errors.As(err, &jsErr) {
+               apiErr := jsErr.APIError()
+               if apiErr.ErrorCode == jetstream.JSErrCodeMessageNotFound {
+                       return true
+               }
+       }
+
+       return false
+}
diff --git a/sdks/go/pkg/beam/io/natsio/end_estimator_test.go 
b/sdks/go/pkg/beam/io/natsio/end_estimator_test.go
new file mode 100644
index 00000000000..855547a0e29
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/end_estimator_test.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import (
+       "context"
+       "fmt"
+       "testing"
+
+       "github.com/nats-io/nats.go"
+)
+
+func Test_endEstimator_Estimate(t *testing.T) {
+       tests := []struct {
+               name    string
+               msgs    []*nats.Msg
+               subject string
+               want    int64
+       }{
+               {
+                       name: "Estimate end for published messages",
+                       msgs: []*nats.Msg{
+                               {
+                                       Subject: "subject.1",
+                                       Data:    []byte("msg1"),
+                               },
+                               {
+                                       Subject: "subject.1",
+                                       Data:    []byte("msg2"),
+                               },
+                               {
+                                       Subject: "subject.2",
+                                       Data:    []byte("msg3"),
+                               },
+                       },
+                       subject: "subject.1",
+                       want:    3,
+               },
+               {
+                       name:    "Estimate end for no published messages",
+                       subject: "subject.1",
+                       want:    1,
+               },
+       }
+       for i, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ctx := context.Background()
+                       srv := newServer(t)
+                       url := srv.ClientURL()
+                       conn := newConn(t, url)
+                       js := newJetStream(t, conn)
+
+                       stream := fmt.Sprintf("STREAM-%d", i)
+                       subjectFilter := "subject.*"
+
+                       createStream(ctx, t, js, stream, 
[]string{subjectFilter})
+                       publishMessages(ctx, t, js, tt.msgs)
+
+                       estimator := newEndEstimator(js, stream, tt.subject)
+                       if got := estimator.Estimate(); got != tt.want {
+                               t.Fatalf("Estimate() = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/io/natsio/example_test.go 
b/sdks/go/pkg/beam/io/natsio/example_test.go
index 0516b8efa92..984261a3686 100644
--- a/sdks/go/pkg/beam/io/natsio/example_test.go
+++ b/sdks/go/pkg/beam/io/natsio/example_test.go
@@ -22,9 +22,27 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/natsio"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
        "github.com/nats-io/nats.go"
 )
 
+func ExampleRead() {
+       beam.Init()
+
+       p, s := beam.NewPipelineWithRoot()
+
+       uri := "nats://localhost:4222"
+       stream := "EVENTS"
+       subject := "events.*"
+
+       col := natsio.Read(s, uri, stream, subject)
+       debug.Print(s, col)
+
+       if err := beamx.Run(context.Background(), p); err != nil {
+               log.Fatalf("Failed to execute job: %v", err)
+       }
+}
+
 func ExampleWrite() {
        beam.Init()
 
diff --git a/sdks/go/pkg/beam/io/natsio/helper_test.go 
b/sdks/go/pkg/beam/io/natsio/helper_test.go
index cd47ed331de..ac7eedac1d4 100644
--- a/sdks/go/pkg/beam/io/natsio/helper_test.go
+++ b/sdks/go/pkg/beam/io/natsio/helper_test.go
@@ -18,6 +18,7 @@ package natsio
 import (
        "context"
        "testing"
+       "time"
 
        "github.com/nats-io/nats-server/v2/server"
        "github.com/nats-io/nats-server/v2/test"
@@ -62,8 +63,8 @@ func newJetStream(t *testing.T, conn *nats.Conn) 
jetstream.JetStream {
 }
 
 func createStream(
-       t *testing.T,
        ctx context.Context,
+       t *testing.T,
        js jetstream.JetStream,
        stream string,
        subjects []string,
@@ -89,8 +90,8 @@ func createStream(
 }
 
 func createConsumer(
-       t *testing.T,
        ctx context.Context,
+       t *testing.T,
        js jetstream.JetStream,
        stream string,
        subjects []string,
@@ -128,3 +129,46 @@ func fetchMessages(t *testing.T, cons jetstream.Consumer, 
size int) []jetstream.
 
        return result
 }
+
+func publishMessages(ctx context.Context, t *testing.T, js 
jetstream.JetStream, msgs []*nats.Msg) {
+       t.Helper()
+
+       for _, msg := range msgs {
+               if _, err := js.PublishMsg(ctx, msg); err != nil {
+                       t.Fatalf("Failed to publish message: %v", err)
+               }
+       }
+}
+
+func messagesWithPublishingTime(
+       t *testing.T,
+       pubMsgs []jetstream.Msg,
+       pubIndices []int,
+       want []any,
+) []any {
+       t.Helper()
+
+       wantWTime := make([]any, len(want))
+
+       for i := range want {
+               pubIdx := pubIndices[i]
+               pubMsg := pubMsgs[pubIdx]
+
+               wantMsg := want[i].(ConsumerMessage)
+               wantMsg.PublishingTime = messageTimestamp(t, pubMsg)
+               wantWTime[i] = wantMsg
+       }
+
+       return wantWTime
+}
+
+func messageTimestamp(t *testing.T, msg jetstream.Msg) time.Time {
+       t.Helper()
+
+       metadata, err := msg.Metadata()
+       if err != nil {
+               t.Fatalf("Failed to retrieve metadata: %v", err)
+       }
+
+       return metadata.Timestamp
+}
diff --git a/sdks/go/pkg/beam/io/natsio/read.go 
b/sdks/go/pkg/beam/io/natsio/read.go
new file mode 100644
index 00000000000..df5a53accbe
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/read.go
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       "github.com/nats-io/nats.go"
+       "github.com/nats-io/nats.go/jetstream"
+)
+
+func init() {
+       register.DoFn5x2[
+               context.Context, *watermarkEstimator, *sdf.LockRTracker, []byte,
+               func(beam.EventTime, ConsumerMessage), sdf.ProcessContinuation, 
error,
+       ](
+               &readFn{},
+       )
+       register.Emitter2[beam.EventTime, ConsumerMessage]()
+       beam.RegisterType(reflect.TypeOf((*ConsumerMessage)(nil)).Elem())
+}
+
+const (
+       defaultFetchSize  = 100
+       defaultStartSeqNo = 1
+       defaultEndSeqNo   = math.MaxInt64
+       fetchTimeout      = 3 * time.Second
+       assumedLag        = 1 * time.Second
+       resumeDelay       = 5 * time.Second
+)
+
+type ConsumerMessage struct {
+       Subject        string
+       PublishingTime time.Time
+       ID             string
+       Headers        map[string][]string
+       Data           []byte
+}
+
+// Read reads messages from NATS JetStream and returns a 
PCollection<ConsumerMessage>.
+// Read takes a variable number of ReadOptionFn to configure the read 
operation:
+//   - UserCredentials: path to the user credentials file. Defaults to empty.
+//   - ProcessingTimePolicy: whether to use the pipeline processing time of 
the messages as the event
+//     time. Defaults to true.
+//   - PublishingTimePolicy: whether to use the publishing time of the 
messages as the event time.
+//     Defaults to false.
+//   - FetchSize: the maximum number of messages to retrieve at a time. 
Defaults to 100.
+//   - StartSeqNo: the start sequence number of messages to read. Defaults to 
1.
+//   - EndSeqNo: the end sequence number of messages to read (exclusive). 
Defaults to math.MaxInt64.
+func Read(
+       s beam.Scope,
+       uri string,
+       stream string,
+       subject string,
+       opts ...ReadOptionFn,
+) beam.PCollection {
+       s = s.Scope("natsio.Read")
+
+       option := &readOption{
+               TimePolicy: processingTimePolicy,
+               FetchSize:  defaultFetchSize,
+               StartSeqNo: defaultStartSeqNo,
+               EndSeqNo:   defaultEndSeqNo,
+       }
+
+       for _, opt := range opts {
+               if err := opt(option); err != nil {
+                       panic(fmt.Sprintf("natsio.Read: invalid option: %v", 
err))
+               }
+       }
+
+       imp := beam.Impulse(s)
+       return beam.ParDo(s, newReadFn(uri, stream, subject, option), imp)
+}
+
+type readFn struct {
+       natsFn
+       Stream      string
+       Subject     string
+       TimePolicy  timePolicy
+       FetchSize   int
+       StartSeqNo  int64
+       EndSeqNo    int64
+       timestampFn timestampFn
+}
+
+func newReadFn(uri string, stream string, subject string, option *readOption) 
*readFn {
+       return &readFn{
+               natsFn: natsFn{
+                       URI:       uri,
+                       CredsFile: option.CredsFile,
+               },
+               Stream:     stream,
+               Subject:    subject,
+               TimePolicy: option.TimePolicy,
+               FetchSize:  option.FetchSize,
+               StartSeqNo: option.StartSeqNo,
+               EndSeqNo:   option.EndSeqNo,
+       }
+}
+
+func (fn *readFn) Setup() error {
+       if err := fn.natsFn.Setup(); err != nil {
+               return err
+       }
+
+       fn.timestampFn = fn.TimePolicy.TimestampFn()
+       return nil
+}
+
+func (fn *readFn) CreateInitialRestriction(_ []byte) offsetrange.Restriction {
+       return offsetrange.Restriction{
+               Start: fn.StartSeqNo,
+               End:   fn.EndSeqNo,
+       }
+}
+
+func (fn *readFn) SplitRestriction(
+       _ []byte,
+       rest offsetrange.Restriction,
+) []offsetrange.Restriction {
+       return []offsetrange.Restriction{rest}
+}
+
+func (fn *readFn) RestrictionSize(_ []byte, rest offsetrange.Restriction) 
(float64, error) {
+       if err := fn.natsFn.Setup(); err != nil {
+               return -1, err
+       }
+
+       rt, err := fn.createRTracker(rest)
+       if err != nil {
+               return -1, err
+       }
+
+       _, remaining := rt.GetProgress()
+       return remaining, nil
+}
+
+func (fn *readFn) CreateTracker(rest offsetrange.Restriction) 
(*sdf.LockRTracker, error) {
+       rt, err := fn.createRTracker(rest)
+       if err != nil {
+               return nil, err
+       }
+
+       return sdf.NewLockRTracker(rt), nil
+}
+
+func (fn *readFn) TruncateRestriction(rt *sdf.LockRTracker, _ []byte) 
offsetrange.Restriction {
+       start := rt.GetRestriction().(offsetrange.Restriction).Start
+       return offsetrange.Restriction{
+               Start: start,
+               End:   start,
+       }
+}
+
+func (fn *readFn) InitialWatermarkEstimatorState(
+       et beam.EventTime,
+       _ offsetrange.Restriction,
+       _ []byte,
+) int64 {
+       return et.Milliseconds()
+}
+
+func (fn *readFn) CreateWatermarkEstimator(ms int64) *watermarkEstimator {
+       return &watermarkEstimator{state: ms}
+}
+
+func (fn *readFn) WatermarkEstimatorState(we *watermarkEstimator) int64 {
+       return we.state
+}
+
+func (fn *readFn) ProcessElement(
+       ctx context.Context,
+       we *watermarkEstimator,
+       rt *sdf.LockRTracker,
+       _ []byte,
+       emit func(beam.EventTime, ConsumerMessage),
+) (sdf.ProcessContinuation, error) {
+       startSeqNo := rt.GetRestriction().(offsetrange.Restriction).Start
+       cons, err := fn.createConsumer(ctx, startSeqNo)
+       if err != nil {
+               return sdf.StopProcessing(), err
+       }
+
+       for {
+               msgs, err := cons.Fetch(fn.FetchSize, 
jetstream.FetchMaxWait(fetchTimeout))
+               if err != nil {
+                       return nil, fmt.Errorf("error fetching messages: %v", 
err)
+               }
+
+               count := 0
+               for msg := range msgs.Messages() {
+                       metadata, err := msg.Metadata()
+                       if err != nil {
+                               return sdf.StopProcessing(), fmt.Errorf("error 
retrieving metadata: %v", err)
+                       }
+
+                       seqNo := int64(metadata.Sequence.Stream)
+                       if !rt.TryClaim(seqNo) {
+                               return sdf.StopProcessing(), nil
+                       }
+
+                       et := fn.timestampFn(metadata.Timestamp)
+                       consMsg := createConsumerMessage(msg, 
metadata.Timestamp)
+                       emit(et, consMsg)
+
+                       count++
+               }
+
+               if err := msgs.Error(); err != nil {
+                       return sdf.StopProcessing(), fmt.Errorf("error in 
message batch: %v", err)
+               }
+
+               if count == 0 {
+                       fn.updateWatermarkManually(we)
+                       return sdf.ResumeProcessingIn(resumeDelay), nil
+               }
+       }
+}
+
+func (fn *readFn) createRTracker(rest offsetrange.Restriction) (sdf.RTracker, 
error) {
+       if rest.End < math.MaxInt64 {
+               return offsetrange.NewTracker(rest), nil
+       }
+
+       estimator := newEndEstimator(fn.js, fn.Stream, fn.Subject)
+       rt, err := offsetrange.NewGrowableTracker(rest, estimator)
+       if err != nil {
+               return nil, fmt.Errorf("error creating growable tracker: %v", 
err)
+       }
+
+       return rt, nil
+}
+
+func (fn *readFn) createConsumer(
+       ctx context.Context,
+       startSeqNo int64,
+) (jetstream.Consumer, error) {
+       cfg := jetstream.OrderedConsumerConfig{
+               FilterSubjects:   []string{fn.Subject},
+               DeliverPolicy:    jetstream.DeliverByStartSequencePolicy,
+               OptStartSeq:      uint64(startSeqNo),
+               MaxResetAttempts: 5,
+       }
+
+       cons, err := fn.js.OrderedConsumer(ctx, fn.Stream, cfg)
+       if err != nil {
+               return nil, fmt.Errorf("error creating consumer: %v", err)
+       }
+
+       return cons, nil
+}
+
+func createConsumerMessage(msg jetstream.Msg, publishingTime time.Time) 
ConsumerMessage {
+       return ConsumerMessage{
+               Subject:        msg.Subject(),
+               PublishingTime: publishingTime,
+               ID:             msg.Headers().Get(nats.MsgIdHdr),
+               Headers:        msg.Headers(),
+               Data:           msg.Data(),
+       }
+}
+
+func (fn *readFn) updateWatermarkManually(we *watermarkEstimator) {
+       t := time.Now().Add(-1 * assumedLag)
+       et := fn.timestampFn(t)
+       we.ObserveTimestamp(et.ToTime())
+}
diff --git a/sdks/go/pkg/beam/io/natsio/read_option.go 
b/sdks/go/pkg/beam/io/natsio/read_option.go
new file mode 100644
index 00000000000..f9d715be102
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/read_option.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import "errors"
+
+var (
+       errInvalidFetchSize  = errors.New("fetch size must be greater than 0")
+       errInvalidStartSeqNo = errors.New("start sequence number must be 
greater than 0")
+       errInvalidEndSeqNo   = errors.New("end sequence number must be greater 
than 0")
+)
+
+type readOption struct {
+       CredsFile  string
+       TimePolicy timePolicy
+       FetchSize  int
+       StartSeqNo int64
+       EndSeqNo   int64
+}
+
+// ReadOptionFn is a function that can be passed to Read to configure options 
for reading
+// from NATS.
+type ReadOptionFn func(option *readOption) error
+
+// ReadUserCredentials sets the user credentials when connecting to NATS.
+func ReadUserCredentials(credsFile string) ReadOptionFn {
+       return func(o *readOption) error {
+               o.CredsFile = credsFile
+               return nil
+       }
+}
+
+// ReadProcessingTimePolicy specifies that the pipeline processing time of the 
messages should be
+// used to compute the watermark estimate.
+func ReadProcessingTimePolicy() ReadOptionFn {
+       return func(o *readOption) error {
+               o.TimePolicy = processingTimePolicy
+               return nil
+       }
+}
+
+// ReadPublishingTimePolicy specifies that the publishing time of the messages 
should be used to
+// compute the watermark estimate.
+func ReadPublishingTimePolicy() ReadOptionFn {
+       return func(o *readOption) error {
+               o.TimePolicy = publishingTimePolicy
+               return nil
+       }
+}
+
+// ReadFetchSize sets the maximum number of messages to retrieve at a time.
+func ReadFetchSize(size int) ReadOptionFn {
+       return func(o *readOption) error {
+               if size <= 0 {
+                       return errInvalidFetchSize
+               }
+
+               o.FetchSize = size
+               return nil
+       }
+}
+
+// ReadStartSeqNo sets the start sequence number of messages to read.
+func ReadStartSeqNo(seqNo int64) ReadOptionFn {
+       return func(o *readOption) error {
+               if seqNo <= 0 {
+                       return errInvalidStartSeqNo
+               }
+
+               o.StartSeqNo = seqNo
+               return nil
+       }
+}
+
+// ReadEndSeqNo sets the end sequence number of messages to read (exclusive).
+func ReadEndSeqNo(seqNo int64) ReadOptionFn {
+       return func(o *readOption) error {
+               if seqNo <= 0 {
+                       return errInvalidEndSeqNo
+               }
+
+               o.EndSeqNo = seqNo
+               return nil
+       }
+}
diff --git a/sdks/go/pkg/beam/io/natsio/read_test.go 
b/sdks/go/pkg/beam/io/natsio/read_test.go
new file mode 100644
index 00000000000..faf0b0540c8
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/read_test.go
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import (
+       "context"
+       "fmt"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       "github.com/nats-io/nats.go"
+)
+
+func TestRead(t *testing.T) {
+       tests := []struct {
+               name       string
+               input      []*nats.Msg
+               subject    string
+               opts       []ReadOptionFn
+               pubIndices []int
+               want       []any
+       }{
+               {
+                       name: "Read messages from bounded stream with single 
subject",
+                       input: []*nats.Msg{
+                               {
+                                       Subject: "subject.1",
+                                       Header:  nats.Header{nats.MsgIdHdr: 
[]string{"123"}},
+                                       Data:    []byte("msg1"),
+                               },
+                               {
+                                       Subject: "subject.2",
+                                       Header:  nats.Header{nats.MsgIdHdr: 
[]string{"124"}},
+                                       Data:    []byte("msg2"),
+                               },
+                               {
+                                       Subject: "subject.1",
+                                       Header:  nats.Header{nats.MsgIdHdr: 
[]string{"125"}},
+                                       Data:    []byte("msg3"),
+                               },
+                       },
+                       subject: "subject.1",
+                       opts: []ReadOptionFn{
+                               ReadEndSeqNo(4),
+                       },
+                       pubIndices: []int{0, 2},
+                       want: []any{
+                               ConsumerMessage{
+                                       Subject: "subject.1",
+                                       ID:      "123",
+                                       Headers: 
map[string][]string{nats.MsgIdHdr: {"123"}},
+                                       Data:    []byte("msg1"),
+                               },
+                               ConsumerMessage{
+                                       Subject: "subject.1",
+                                       ID:      "125",
+                                       Headers: 
map[string][]string{nats.MsgIdHdr: {"125"}},
+                                       Data:    []byte("msg3"),
+                               },
+                       },
+               },
+               {
+                       name: "Read messages from bounded stream with wildcard 
subject",
+                       input: []*nats.Msg{
+                               {
+                                       Subject: "subject.1",
+                                       Header:  nats.Header{nats.MsgIdHdr: 
[]string{"123"}},
+                                       Data:    []byte("msg1"),
+                               },
+                               {
+                                       Subject: "subject.2",
+                                       Header:  nats.Header{nats.MsgIdHdr: 
[]string{"124"}},
+                                       Data:    []byte("msg2"),
+                               },
+                               {
+                                       Subject: "subject.1",
+                                       Header:  nats.Header{nats.MsgIdHdr: 
[]string{"125"}},
+                                       Data:    []byte("msg3"),
+                               },
+                       },
+                       subject: "subject.*",
+                       opts: []ReadOptionFn{
+                               ReadEndSeqNo(4),
+                       },
+                       pubIndices: []int{0, 1, 2},
+                       want: []any{
+                               ConsumerMessage{
+                                       Subject: "subject.1",
+                                       ID:      "123",
+                                       Headers: 
map[string][]string{nats.MsgIdHdr: {"123"}},
+                                       Data:    []byte("msg1"),
+                               },
+                               ConsumerMessage{
+                                       Subject: "subject.2",
+                                       ID:      "124",
+                                       Headers: 
map[string][]string{nats.MsgIdHdr: {"124"}},
+                                       Data:    []byte("msg2"),
+                               },
+                               ConsumerMessage{
+                                       Subject: "subject.1",
+                                       ID:      "125",
+                                       Headers: 
map[string][]string{nats.MsgIdHdr: {"125"}},
+                                       Data:    []byte("msg3"),
+                               },
+                       },
+               },
+               {
+                       name: "Read messages from bounded stream with custom 
fetch size",
+                       input: []*nats.Msg{
+                               {
+                                       Subject: "subject.1",
+                                       Header:  nats.Header{nats.MsgIdHdr: 
[]string{"123"}},
+                                       Data:    []byte("msg1"),
+                               },
+                               {
+                                       Subject: "subject.1",
+                                       Header:  nats.Header{nats.MsgIdHdr: 
[]string{"124"}},
+                                       Data:    []byte("msg2"),
+                               },
+                       },
+                       subject: "subject.1",
+                       opts: []ReadOptionFn{
+                               ReadFetchSize(1),
+                               ReadEndSeqNo(3),
+                       },
+                       pubIndices: []int{0, 1},
+                       want: []any{
+                               ConsumerMessage{
+                                       Subject: "subject.1",
+                                       ID:      "123",
+                                       Headers: 
map[string][]string{nats.MsgIdHdr: {"123"}},
+                                       Data:    []byte("msg1"),
+                               },
+                               ConsumerMessage{
+                                       Subject: "subject.1",
+                                       ID:      "124",
+                                       Headers: 
map[string][]string{nats.MsgIdHdr: {"124"}},
+                                       Data:    []byte("msg2"),
+                               },
+                       },
+               },
+               {
+                       name: "Read messages from bounded stream with custom 
start seq no",
+                       input: []*nats.Msg{
+                               {
+                                       Subject: "subject.1",
+                                       Header:  nats.Header{nats.MsgIdHdr: 
[]string{"123"}},
+                                       Data:    []byte("msg1"),
+                               },
+                               {
+                                       Subject: "subject.1",
+                                       Header:  nats.Header{nats.MsgIdHdr: 
[]string{"124"}},
+                                       Data:    []byte("msg2"),
+                               },
+                       },
+                       subject: "subject.1",
+                       opts: []ReadOptionFn{
+                               ReadStartSeqNo(2),
+                               ReadEndSeqNo(3),
+                       },
+                       pubIndices: []int{1},
+                       want: []any{
+                               ConsumerMessage{
+                                       Subject: "subject.1",
+                                       ID:      "124",
+                                       Headers: 
map[string][]string{nats.MsgIdHdr: {"124"}},
+                                       Data:    []byte("msg2"),
+                               },
+                       },
+               },
+       }
+       for i, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ctx := context.Background()
+                       srv := newServer(t)
+                       url := srv.ClientURL()
+                       conn := newConn(t, url)
+                       js := newJetStream(t, conn)
+
+                       stream := fmt.Sprintf("STREAM-%d", i)
+                       subjectFilter := "subject.*"
+
+                       createStream(ctx, t, js, stream, 
[]string{subjectFilter})
+                       publishMessages(ctx, t, js, tt.input)
+
+                       cons := createConsumer(ctx, t, js, stream, 
[]string{subjectFilter})
+                       pubMsgs := fetchMessages(t, cons, len(tt.input))
+                       wantWTime := messagesWithPublishingTime(t, pubMsgs, 
tt.pubIndices, tt.want)
+
+                       p, s := beam.NewPipelineWithRoot()
+                       got := Read(s, url, stream, tt.subject, tt.opts...)
+
+                       passert.Equals(s, got, wantWTime...)
+                       ptest.RunAndValidate(t, p)
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/io/natsio/common.go 
b/sdks/go/pkg/beam/io/natsio/time_policy.go
similarity index 53%
copy from sdks/go/pkg/beam/io/natsio/common.go
copy to sdks/go/pkg/beam/io/natsio/time_policy.go
index 53f59551698..1c2dbf5165f 100644
--- a/sdks/go/pkg/beam/io/natsio/common.go
+++ b/sdks/go/pkg/beam/io/natsio/time_policy.go
@@ -13,46 +13,38 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package natsio contains transforms for interacting with NATS.
 package natsio
 
 import (
-       "fmt"
+       "time"
 
-       "github.com/nats-io/nats.go"
-       "github.com/nats-io/nats.go/jetstream"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
 )
 
-type natsFn struct {
-       URI       string
-       CredsFile string
-       nc        *nats.Conn
-       js        jetstream.JetStream
-}
+type timePolicy int
 
-func (fn *natsFn) Setup() error {
-       var opts []nats.Option
-       if fn.CredsFile != "" {
-               opts = append(opts, nats.UserCredentials(fn.CredsFile))
-       }
+const (
+       processingTimePolicy timePolicy = iota
+       publishingTimePolicy
+)
 
-       conn, err := nats.Connect(fn.URI, opts...)
-       if err != nil {
-               return fmt.Errorf("error connecting to NATS: %v", err)
-       }
-       fn.nc = conn
+type timestampFn func(time.Time) mtime.Time
 
-       js, err := jetstream.New(fn.nc)
-       if err != nil {
-               return fmt.Errorf("error creating JetStream context: %v", err)
-       }
-       fn.js = js
+func processingTime(_ time.Time) mtime.Time {
+       return mtime.Now()
+}
 
-       return nil
+func publishingTime(t time.Time) mtime.Time {
+       return mtime.FromTime(t)
 }
 
-func (fn *natsFn) Teardown() {
-       if fn.nc != nil {
-               fn.nc.Close()
+func (p timePolicy) TimestampFn() timestampFn {
+       switch p {
+       case processingTimePolicy:
+               return processingTime
+       case publishingTimePolicy:
+               return publishingTime
+       default:
+               panic("unsupported time policy")
        }
 }
diff --git a/sdks/go/pkg/beam/io/natsio/common.go 
b/sdks/go/pkg/beam/io/natsio/time_policy_test.go
similarity index 52%
copy from sdks/go/pkg/beam/io/natsio/common.go
copy to sdks/go/pkg/beam/io/natsio/time_policy_test.go
index 53f59551698..2452334c8cd 100644
--- a/sdks/go/pkg/beam/io/natsio/common.go
+++ b/sdks/go/pkg/beam/io/natsio/time_policy_test.go
@@ -13,46 +13,33 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package natsio contains transforms for interacting with NATS.
 package natsio
 
 import (
-       "fmt"
+       "testing"
+       "time"
 
-       "github.com/nats-io/nats.go"
-       "github.com/nats-io/nats.go/jetstream"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
 )
 
-type natsFn struct {
-       URI       string
-       CredsFile string
-       nc        *nats.Conn
-       js        jetstream.JetStream
-}
+func Test_timePolicy_TimestampFn(t *testing.T) {
+       t.Run("processingTime", func(t *testing.T) {
+               pubTime := time.Date(2020, 1, 2, 3, 4, 5, 6e6, time.UTC)
 
-func (fn *natsFn) Setup() error {
-       var opts []nats.Option
-       if fn.CredsFile != "" {
-               opts = append(opts, nats.UserCredentials(fn.CredsFile))
-       }
-
-       conn, err := nats.Connect(fn.URI, opts...)
-       if err != nil {
-               return fmt.Errorf("error connecting to NATS: %v", err)
-       }
-       fn.nc = conn
-
-       js, err := jetstream.New(fn.nc)
-       if err != nil {
-               return fmt.Errorf("error creating JetStream context: %v", err)
-       }
-       fn.js = js
-
-       return nil
-}
+               t1 := mtime.Now()
+               got := processingTimePolicy.TimestampFn()(pubTime)
+               t2 := mtime.Now()
+
+               if got < t1 || got > t2 {
+                       t.Errorf("timestamp = %v, want between %v and %v", got, 
t1, t2)
+               }
+       })
+
+       t.Run("publishingTime", func(t *testing.T) {
+               pubTime := time.Date(2020, 1, 2, 3, 4, 5, 6e6, time.UTC)
 
-func (fn *natsFn) Teardown() {
-       if fn.nc != nil {
-               fn.nc.Close()
-       }
+               if got, want := publishingTimePolicy.TimestampFn()(pubTime), 
mtime.FromTime(pubTime); got != want {
+                       t.Errorf("timestamp = %v, want %v", got, want)
+               }
+       })
 }
diff --git a/sdks/go/pkg/beam/io/natsio/common.go 
b/sdks/go/pkg/beam/io/natsio/watermark_estimator.go
similarity index 52%
copy from sdks/go/pkg/beam/io/natsio/common.go
copy to sdks/go/pkg/beam/io/natsio/watermark_estimator.go
index 53f59551698..b23eb37ac85 100644
--- a/sdks/go/pkg/beam/io/natsio/common.go
+++ b/sdks/go/pkg/beam/io/natsio/watermark_estimator.go
@@ -13,46 +13,21 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package natsio contains transforms for interacting with NATS.
 package natsio
 
-import (
-       "fmt"
+import "time"
 
-       "github.com/nats-io/nats.go"
-       "github.com/nats-io/nats.go/jetstream"
-)
-
-type natsFn struct {
-       URI       string
-       CredsFile string
-       nc        *nats.Conn
-       js        jetstream.JetStream
+type watermarkEstimator struct {
+       state int64
 }
 
-func (fn *natsFn) Setup() error {
-       var opts []nats.Option
-       if fn.CredsFile != "" {
-               opts = append(opts, nats.UserCredentials(fn.CredsFile))
-       }
-
-       conn, err := nats.Connect(fn.URI, opts...)
-       if err != nil {
-               return fmt.Errorf("error connecting to NATS: %v", err)
-       }
-       fn.nc = conn
-
-       js, err := jetstream.New(fn.nc)
-       if err != nil {
-               return fmt.Errorf("error creating JetStream context: %v", err)
-       }
-       fn.js = js
-
-       return nil
+func (e *watermarkEstimator) CurrentWatermark() time.Time {
+       return time.UnixMilli(e.state)
 }
 
-func (fn *natsFn) Teardown() {
-       if fn.nc != nil {
-               fn.nc.Close()
+func (e *watermarkEstimator) ObserveTimestamp(t time.Time) {
+       ms := t.UnixMilli()
+       if ms > e.state {
+               e.state = ms
        }
 }
diff --git a/sdks/go/pkg/beam/io/natsio/watermark_estimator_test.go 
b/sdks/go/pkg/beam/io/natsio/watermark_estimator_test.go
new file mode 100644
index 00000000000..91a9a840a6e
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/watermark_estimator_test.go
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import (
+       "testing"
+       "time"
+)
+
+func Test_watermarkEstimator_CurrentWatermark(t *testing.T) {
+       ms := int64(1577934245000)
+       we := &watermarkEstimator{
+               state: ms,
+       }
+       if got, want := we.CurrentWatermark(), time.UnixMilli(ms); got != want {
+               t.Errorf("CurrentWatermark() = %v, want %v", got, want)
+       }
+}
+
+func Test_watermarkEstimator_ObserveTimestamp(t *testing.T) {
+       t1 := time.Date(2020, 1, 2, 3, 4, 5, 6e6, time.UTC)
+       t2 := time.Date(2020, 1, 2, 3, 4, 5, 7e6, time.UTC)
+
+       tests := []struct {
+               name  string
+               state int64
+               t     time.Time
+               want  int64
+       }{
+               {
+                       name:  "Update watermark when the time is greater than 
the current state",
+                       state: t1.UnixMilli(),
+                       t:     t2,
+                       want:  t2.UnixMilli(),
+               },
+               {
+                       name:  "Keep existing watermark when the time is not 
greater than the current state",
+                       state: t2.UnixMilli(),
+                       t:     t1,
+                       want:  t2.UnixMilli(),
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       we := &watermarkEstimator{
+                               state: tt.state,
+                       }
+                       we.ObserveTimestamp(tt.t)
+                       if got, want := we.state, tt.want; got != want {
+                               t.Errorf("state = %v, want %v", got, want)
+                       }
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/io/natsio/write_test.go 
b/sdks/go/pkg/beam/io/natsio/write_test.go
index 5e9387ece5f..678874ce11e 100644
--- a/sdks/go/pkg/beam/io/natsio/write_test.go
+++ b/sdks/go/pkg/beam/io/natsio/write_test.go
@@ -187,8 +187,8 @@ func TestWrite(t *testing.T) {
                        js := newJetStream(t, conn)
 
                        subjects := []string{subject}
-                       createStream(t, ctx, js, stream, subjects)
-                       cons := createConsumer(t, ctx, js, stream, subjects)
+                       createStream(ctx, t, js, stream, subjects)
+                       cons := createConsumer(ctx, t, js, stream, subjects)
 
                        p, s := beam.NewPipelineWithRoot()
 


Reply via email to