This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 90e79ae373a [Go SDK]: Implement natsio.Read transform for reading from
NATS (#29410)
90e79ae373a is described below
commit 90e79ae373ab38cf4e48e9854c28aaffb0938458
Author: Johanna Öjeling <[email protected]>
AuthorDate: Tue Dec 12 22:19:06 2023 +0100
[Go SDK]: Implement natsio.Read transform for reading from NATS (#29410)
---
CHANGES.md | 1 +
sdks/go/pkg/beam/io/natsio/common.go | 4 +
sdks/go/pkg/beam/io/natsio/end_estimator.go | 77 ++++++
sdks/go/pkg/beam/io/natsio/end_estimator_test.go | 78 ++++++
sdks/go/pkg/beam/io/natsio/example_test.go | 18 ++
sdks/go/pkg/beam/io/natsio/helper_test.go | 48 +++-
sdks/go/pkg/beam/io/natsio/read.go | 289 +++++++++++++++++++++
sdks/go/pkg/beam/io/natsio/read_option.go | 98 +++++++
sdks/go/pkg/beam/io/natsio/read_test.go | 212 +++++++++++++++
.../beam/io/natsio/{common.go => time_policy.go} | 50 ++--
.../io/natsio/{common.go => time_policy_test.go} | 55 ++--
.../natsio/{common.go => watermark_estimator.go} | 43 +--
.../pkg/beam/io/natsio/watermark_estimator_test.go | 67 +++++
sdks/go/pkg/beam/io/natsio/write_test.go | 4 +-
14 files changed, 943 insertions(+), 101 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 60b5a820cf3..70ce1a70b7e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
* Adding support for LowCardinality DataType in ClickHouse (Java)
([#29533](https://github.com/apache/beam/pull/29533)).
* Added support for handling bad records to KafkaIO (Java)
([#29546](https://github.com/apache/beam/pull/29546))
* Add support for generating text embeddings in MLTransform for Vertex AI and
Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564))
+* NATS IO connector added (Go)
([#29000](https://github.com/apache/beam/issues/29000)).
## New Features / Improvements
diff --git a/sdks/go/pkg/beam/io/natsio/common.go
b/sdks/go/pkg/beam/io/natsio/common.go
index 53f59551698..72640894c76 100644
--- a/sdks/go/pkg/beam/io/natsio/common.go
+++ b/sdks/go/pkg/beam/io/natsio/common.go
@@ -31,6 +31,10 @@ type natsFn struct {
}
func (fn *natsFn) Setup() error {
+ if fn.nc != nil && fn.js != nil {
+ return nil
+ }
+
var opts []nats.Option
if fn.CredsFile != "" {
opts = append(opts, nats.UserCredentials(fn.CredsFile))
diff --git a/sdks/go/pkg/beam/io/natsio/end_estimator.go
b/sdks/go/pkg/beam/io/natsio/end_estimator.go
new file mode 100644
index 00000000000..6b2f18e10ce
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/end_estimator.go
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "github.com/nats-io/nats.go/jetstream"
+)
+
+type endEstimator struct {
+ js jetstream.JetStream
+ stream string
+ subject string
+}
+
+func newEndEstimator(js jetstream.JetStream, stream string, subject string)
*endEstimator {
+ return &endEstimator{
+ js: js,
+ stream: stream,
+ subject: subject,
+ }
+}
+
+func (e *endEstimator) Estimate() int64 {
+ ctx := context.Background()
+ end, err := e.getEndSeqNo(ctx)
+ if err != nil {
+ panic(err)
+ }
+ return end
+}
+
+func (e *endEstimator) getEndSeqNo(ctx context.Context) (int64, error) {
+ str, err := e.js.Stream(ctx, e.stream)
+ if err != nil {
+ return -1, fmt.Errorf("error getting stream: %v", err)
+ }
+
+ msg, err := str.GetLastMsgForSubject(ctx, e.subject)
+ if err != nil {
+ if isMessageNotFound(err) {
+ return 1, nil
+ }
+
+ return -1, fmt.Errorf("error getting last message: %v", err)
+ }
+
+ return int64(msg.Sequence) + 1, nil
+}
+
+func isMessageNotFound(err error) bool {
+ var jsErr jetstream.JetStreamError
+ if errors.As(err, &jsErr) {
+ apiErr := jsErr.APIError()
+ if apiErr.ErrorCode == jetstream.JSErrCodeMessageNotFound {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/sdks/go/pkg/beam/io/natsio/end_estimator_test.go
b/sdks/go/pkg/beam/io/natsio/end_estimator_test.go
new file mode 100644
index 00000000000..855547a0e29
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/end_estimator_test.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/nats-io/nats.go"
+)
+
+func Test_endEstimator_Estimate(t *testing.T) {
+ tests := []struct {
+ name string
+ msgs []*nats.Msg
+ subject string
+ want int64
+ }{
+ {
+ name: "Estimate end for published messages",
+ msgs: []*nats.Msg{
+ {
+ Subject: "subject.1",
+ Data: []byte("msg1"),
+ },
+ {
+ Subject: "subject.1",
+ Data: []byte("msg2"),
+ },
+ {
+ Subject: "subject.2",
+ Data: []byte("msg3"),
+ },
+ },
+ subject: "subject.1",
+ want: 3,
+ },
+ {
+ name: "Estimate end for no published messages",
+ subject: "subject.1",
+ want: 1,
+ },
+ }
+ for i, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctx := context.Background()
+ srv := newServer(t)
+ url := srv.ClientURL()
+ conn := newConn(t, url)
+ js := newJetStream(t, conn)
+
+ stream := fmt.Sprintf("STREAM-%d", i)
+ subjectFilter := "subject.*"
+
+ createStream(ctx, t, js, stream,
[]string{subjectFilter})
+ publishMessages(ctx, t, js, tt.msgs)
+
+ estimator := newEndEstimator(js, stream, tt.subject)
+ if got := estimator.Estimate(); got != tt.want {
+ t.Fatalf("Estimate() = %v, want %v", got,
tt.want)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/io/natsio/example_test.go
b/sdks/go/pkg/beam/io/natsio/example_test.go
index 0516b8efa92..984261a3686 100644
--- a/sdks/go/pkg/beam/io/natsio/example_test.go
+++ b/sdks/go/pkg/beam/io/natsio/example_test.go
@@ -22,9 +22,27 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/natsio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"github.com/nats-io/nats.go"
)
+func ExampleRead() {
+ beam.Init()
+
+ p, s := beam.NewPipelineWithRoot()
+
+ uri := "nats://localhost:4222"
+ stream := "EVENTS"
+ subject := "events.*"
+
+ col := natsio.Read(s, uri, stream, subject)
+ debug.Print(s, col)
+
+ if err := beamx.Run(context.Background(), p); err != nil {
+ log.Fatalf("Failed to execute job: %v", err)
+ }
+}
+
func ExampleWrite() {
beam.Init()
diff --git a/sdks/go/pkg/beam/io/natsio/helper_test.go
b/sdks/go/pkg/beam/io/natsio/helper_test.go
index cd47ed331de..ac7eedac1d4 100644
--- a/sdks/go/pkg/beam/io/natsio/helper_test.go
+++ b/sdks/go/pkg/beam/io/natsio/helper_test.go
@@ -18,6 +18,7 @@ package natsio
import (
"context"
"testing"
+ "time"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats-server/v2/test"
@@ -62,8 +63,8 @@ func newJetStream(t *testing.T, conn *nats.Conn)
jetstream.JetStream {
}
func createStream(
- t *testing.T,
ctx context.Context,
+ t *testing.T,
js jetstream.JetStream,
stream string,
subjects []string,
@@ -89,8 +90,8 @@ func createStream(
}
func createConsumer(
- t *testing.T,
ctx context.Context,
+ t *testing.T,
js jetstream.JetStream,
stream string,
subjects []string,
@@ -128,3 +129,46 @@ func fetchMessages(t *testing.T, cons jetstream.Consumer,
size int) []jetstream.
return result
}
+
+func publishMessages(ctx context.Context, t *testing.T, js
jetstream.JetStream, msgs []*nats.Msg) {
+ t.Helper()
+
+ for _, msg := range msgs {
+ if _, err := js.PublishMsg(ctx, msg); err != nil {
+ t.Fatalf("Failed to publish message: %v", err)
+ }
+ }
+}
+
+func messagesWithPublishingTime(
+ t *testing.T,
+ pubMsgs []jetstream.Msg,
+ pubIndices []int,
+ want []any,
+) []any {
+ t.Helper()
+
+ wantWTime := make([]any, len(want))
+
+ for i := range want {
+ pubIdx := pubIndices[i]
+ pubMsg := pubMsgs[pubIdx]
+
+ wantMsg := want[i].(ConsumerMessage)
+ wantMsg.PublishingTime = messageTimestamp(t, pubMsg)
+ wantWTime[i] = wantMsg
+ }
+
+ return wantWTime
+}
+
+func messageTimestamp(t *testing.T, msg jetstream.Msg) time.Time {
+ t.Helper()
+
+ metadata, err := msg.Metadata()
+ if err != nil {
+ t.Fatalf("Failed to retrieve metadata: %v", err)
+ }
+
+ return metadata.Timestamp
+}
diff --git a/sdks/go/pkg/beam/io/natsio/read.go
b/sdks/go/pkg/beam/io/natsio/read.go
new file mode 100644
index 00000000000..df5a53accbe
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/read.go
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "reflect"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+ "github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
+)
+
+func init() {
+ register.DoFn5x2[
+ context.Context, *watermarkEstimator, *sdf.LockRTracker, []byte,
+ func(beam.EventTime, ConsumerMessage), sdf.ProcessContinuation,
error,
+ ](
+ &readFn{},
+ )
+ register.Emitter2[beam.EventTime, ConsumerMessage]()
+ beam.RegisterType(reflect.TypeOf((*ConsumerMessage)(nil)).Elem())
+}
+
+const (
+ defaultFetchSize = 100
+ defaultStartSeqNo = 1
+ defaultEndSeqNo = math.MaxInt64
+ fetchTimeout = 3 * time.Second
+ assumedLag = 1 * time.Second
+ resumeDelay = 5 * time.Second
+)
+
+type ConsumerMessage struct {
+ Subject string
+ PublishingTime time.Time
+ ID string
+ Headers map[string][]string
+ Data []byte
+}
+
+// Read reads messages from NATS JetStream and returns a
PCollection<ConsumerMessage>.
+// Read takes a variable number of ReadOptionFn to configure the read
operation:
+// - UserCredentials: path to the user credentials file. Defaults to empty.
+// - ProcessingTimePolicy: whether to use the pipeline processing time of
the messages as the event
+// time. Defaults to true.
+// - PublishingTimePolicy: whether to use the publishing time of the
messages as the event time.
+// Defaults to false.
+// - FetchSize: the maximum number of messages to retrieve at a time.
Defaults to 100.
+// - StartSeqNo: the start sequence number of messages to read. Defaults to
1.
+// - EndSeqNo: the end sequence number of messages to read (exclusive).
Defaults to math.MaxInt64.
+func Read(
+ s beam.Scope,
+ uri string,
+ stream string,
+ subject string,
+ opts ...ReadOptionFn,
+) beam.PCollection {
+ s = s.Scope("natsio.Read")
+
+ option := &readOption{
+ TimePolicy: processingTimePolicy,
+ FetchSize: defaultFetchSize,
+ StartSeqNo: defaultStartSeqNo,
+ EndSeqNo: defaultEndSeqNo,
+ }
+
+ for _, opt := range opts {
+ if err := opt(option); err != nil {
+ panic(fmt.Sprintf("natsio.Read: invalid option: %v",
err))
+ }
+ }
+
+ imp := beam.Impulse(s)
+ return beam.ParDo(s, newReadFn(uri, stream, subject, option), imp)
+}
+
+type readFn struct {
+ natsFn
+ Stream string
+ Subject string
+ TimePolicy timePolicy
+ FetchSize int
+ StartSeqNo int64
+ EndSeqNo int64
+ timestampFn timestampFn
+}
+
+func newReadFn(uri string, stream string, subject string, option *readOption)
*readFn {
+ return &readFn{
+ natsFn: natsFn{
+ URI: uri,
+ CredsFile: option.CredsFile,
+ },
+ Stream: stream,
+ Subject: subject,
+ TimePolicy: option.TimePolicy,
+ FetchSize: option.FetchSize,
+ StartSeqNo: option.StartSeqNo,
+ EndSeqNo: option.EndSeqNo,
+ }
+}
+
+func (fn *readFn) Setup() error {
+ if err := fn.natsFn.Setup(); err != nil {
+ return err
+ }
+
+ fn.timestampFn = fn.TimePolicy.TimestampFn()
+ return nil
+}
+
+func (fn *readFn) CreateInitialRestriction(_ []byte) offsetrange.Restriction {
+ return offsetrange.Restriction{
+ Start: fn.StartSeqNo,
+ End: fn.EndSeqNo,
+ }
+}
+
+func (fn *readFn) SplitRestriction(
+ _ []byte,
+ rest offsetrange.Restriction,
+) []offsetrange.Restriction {
+ return []offsetrange.Restriction{rest}
+}
+
+func (fn *readFn) RestrictionSize(_ []byte, rest offsetrange.Restriction)
(float64, error) {
+ if err := fn.natsFn.Setup(); err != nil {
+ return -1, err
+ }
+
+ rt, err := fn.createRTracker(rest)
+ if err != nil {
+ return -1, err
+ }
+
+ _, remaining := rt.GetProgress()
+ return remaining, nil
+}
+
+func (fn *readFn) CreateTracker(rest offsetrange.Restriction)
(*sdf.LockRTracker, error) {
+ rt, err := fn.createRTracker(rest)
+ if err != nil {
+ return nil, err
+ }
+
+ return sdf.NewLockRTracker(rt), nil
+}
+
+func (fn *readFn) TruncateRestriction(rt *sdf.LockRTracker, _ []byte)
offsetrange.Restriction {
+ start := rt.GetRestriction().(offsetrange.Restriction).Start
+ return offsetrange.Restriction{
+ Start: start,
+ End: start,
+ }
+}
+
+func (fn *readFn) InitialWatermarkEstimatorState(
+ et beam.EventTime,
+ _ offsetrange.Restriction,
+ _ []byte,
+) int64 {
+ return et.Milliseconds()
+}
+
+func (fn *readFn) CreateWatermarkEstimator(ms int64) *watermarkEstimator {
+ return &watermarkEstimator{state: ms}
+}
+
+func (fn *readFn) WatermarkEstimatorState(we *watermarkEstimator) int64 {
+ return we.state
+}
+
+func (fn *readFn) ProcessElement(
+ ctx context.Context,
+ we *watermarkEstimator,
+ rt *sdf.LockRTracker,
+ _ []byte,
+ emit func(beam.EventTime, ConsumerMessage),
+) (sdf.ProcessContinuation, error) {
+ startSeqNo := rt.GetRestriction().(offsetrange.Restriction).Start
+ cons, err := fn.createConsumer(ctx, startSeqNo)
+ if err != nil {
+ return sdf.StopProcessing(), err
+ }
+
+ for {
+ msgs, err := cons.Fetch(fn.FetchSize,
jetstream.FetchMaxWait(fetchTimeout))
+ if err != nil {
+ return nil, fmt.Errorf("error fetching messages: %v",
err)
+ }
+
+ count := 0
+ for msg := range msgs.Messages() {
+ metadata, err := msg.Metadata()
+ if err != nil {
+ return sdf.StopProcessing(), fmt.Errorf("error
retrieving metadata: %v", err)
+ }
+
+ seqNo := int64(metadata.Sequence.Stream)
+ if !rt.TryClaim(seqNo) {
+ return sdf.StopProcessing(), nil
+ }
+
+ et := fn.timestampFn(metadata.Timestamp)
+ consMsg := createConsumerMessage(msg,
metadata.Timestamp)
+ emit(et, consMsg)
+
+ count++
+ }
+
+ if err := msgs.Error(); err != nil {
+ return sdf.StopProcessing(), fmt.Errorf("error in
message batch: %v", err)
+ }
+
+ if count == 0 {
+ fn.updateWatermarkManually(we)
+ return sdf.ResumeProcessingIn(resumeDelay), nil
+ }
+ }
+}
+
+func (fn *readFn) createRTracker(rest offsetrange.Restriction) (sdf.RTracker,
error) {
+ if rest.End < math.MaxInt64 {
+ return offsetrange.NewTracker(rest), nil
+ }
+
+ estimator := newEndEstimator(fn.js, fn.Stream, fn.Subject)
+ rt, err := offsetrange.NewGrowableTracker(rest, estimator)
+ if err != nil {
+ return nil, fmt.Errorf("error creating growable tracker: %v",
err)
+ }
+
+ return rt, nil
+}
+
+func (fn *readFn) createConsumer(
+ ctx context.Context,
+ startSeqNo int64,
+) (jetstream.Consumer, error) {
+ cfg := jetstream.OrderedConsumerConfig{
+ FilterSubjects: []string{fn.Subject},
+ DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
+ OptStartSeq: uint64(startSeqNo),
+ MaxResetAttempts: 5,
+ }
+
+ cons, err := fn.js.OrderedConsumer(ctx, fn.Stream, cfg)
+ if err != nil {
+ return nil, fmt.Errorf("error creating consumer: %v", err)
+ }
+
+ return cons, nil
+}
+
+func createConsumerMessage(msg jetstream.Msg, publishingTime time.Time)
ConsumerMessage {
+ return ConsumerMessage{
+ Subject: msg.Subject(),
+ PublishingTime: publishingTime,
+ ID: msg.Headers().Get(nats.MsgIdHdr),
+ Headers: msg.Headers(),
+ Data: msg.Data(),
+ }
+}
+
+func (fn *readFn) updateWatermarkManually(we *watermarkEstimator) {
+ t := time.Now().Add(-1 * assumedLag)
+ et := fn.timestampFn(t)
+ we.ObserveTimestamp(et.ToTime())
+}
diff --git a/sdks/go/pkg/beam/io/natsio/read_option.go
b/sdks/go/pkg/beam/io/natsio/read_option.go
new file mode 100644
index 00000000000..f9d715be102
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/read_option.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import "errors"
+
+var (
+ errInvalidFetchSize = errors.New("fetch size must be greater than 0")
+ errInvalidStartSeqNo = errors.New("start sequence number must be
greater than 0")
+ errInvalidEndSeqNo = errors.New("end sequence number must be greater
than 0")
+)
+
+type readOption struct {
+ CredsFile string
+ TimePolicy timePolicy
+ FetchSize int
+ StartSeqNo int64
+ EndSeqNo int64
+}
+
+// ReadOptionFn is a function that can be passed to Read to configure options
for reading
+// from NATS.
+type ReadOptionFn func(option *readOption) error
+
+// ReadUserCredentials sets the user credentials when connecting to NATS.
+func ReadUserCredentials(credsFile string) ReadOptionFn {
+ return func(o *readOption) error {
+ o.CredsFile = credsFile
+ return nil
+ }
+}
+
+// ReadProcessingTimePolicy specifies that the pipeline processing time of the
messages should be
+// used to compute the watermark estimate.
+func ReadProcessingTimePolicy() ReadOptionFn {
+ return func(o *readOption) error {
+ o.TimePolicy = processingTimePolicy
+ return nil
+ }
+}
+
+// ReadPublishingTimePolicy specifies that the publishing time of the messages
should be used to
+// compute the watermark estimate.
+func ReadPublishingTimePolicy() ReadOptionFn {
+ return func(o *readOption) error {
+ o.TimePolicy = publishingTimePolicy
+ return nil
+ }
+}
+
+// ReadFetchSize sets the maximum number of messages to retrieve at a time.
+func ReadFetchSize(size int) ReadOptionFn {
+ return func(o *readOption) error {
+ if size <= 0 {
+ return errInvalidFetchSize
+ }
+
+ o.FetchSize = size
+ return nil
+ }
+}
+
+// ReadStartSeqNo sets the start sequence number of messages to read.
+func ReadStartSeqNo(seqNo int64) ReadOptionFn {
+ return func(o *readOption) error {
+ if seqNo <= 0 {
+ return errInvalidStartSeqNo
+ }
+
+ o.StartSeqNo = seqNo
+ return nil
+ }
+}
+
+// ReadEndSeqNo sets the end sequence number of messages to read (exclusive).
+func ReadEndSeqNo(seqNo int64) ReadOptionFn {
+ return func(o *readOption) error {
+ if seqNo <= 0 {
+ return errInvalidEndSeqNo
+ }
+
+ o.EndSeqNo = seqNo
+ return nil
+ }
+}
diff --git a/sdks/go/pkg/beam/io/natsio/read_test.go
b/sdks/go/pkg/beam/io/natsio/read_test.go
new file mode 100644
index 00000000000..faf0b0540c8
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/read_test.go
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+ "github.com/nats-io/nats.go"
+)
+
+func TestRead(t *testing.T) {
+ tests := []struct {
+ name string
+ input []*nats.Msg
+ subject string
+ opts []ReadOptionFn
+ pubIndices []int
+ want []any
+ }{
+ {
+ name: "Read messages from bounded stream with single
subject",
+ input: []*nats.Msg{
+ {
+ Subject: "subject.1",
+ Header: nats.Header{nats.MsgIdHdr:
[]string{"123"}},
+ Data: []byte("msg1"),
+ },
+ {
+ Subject: "subject.2",
+ Header: nats.Header{nats.MsgIdHdr:
[]string{"124"}},
+ Data: []byte("msg2"),
+ },
+ {
+ Subject: "subject.1",
+ Header: nats.Header{nats.MsgIdHdr:
[]string{"125"}},
+ Data: []byte("msg3"),
+ },
+ },
+ subject: "subject.1",
+ opts: []ReadOptionFn{
+ ReadEndSeqNo(4),
+ },
+ pubIndices: []int{0, 2},
+ want: []any{
+ ConsumerMessage{
+ Subject: "subject.1",
+ ID: "123",
+ Headers:
map[string][]string{nats.MsgIdHdr: {"123"}},
+ Data: []byte("msg1"),
+ },
+ ConsumerMessage{
+ Subject: "subject.1",
+ ID: "125",
+ Headers:
map[string][]string{nats.MsgIdHdr: {"125"}},
+ Data: []byte("msg3"),
+ },
+ },
+ },
+ {
+ name: "Read messages from bounded stream with wildcard
subject",
+ input: []*nats.Msg{
+ {
+ Subject: "subject.1",
+ Header: nats.Header{nats.MsgIdHdr:
[]string{"123"}},
+ Data: []byte("msg1"),
+ },
+ {
+ Subject: "subject.2",
+ Header: nats.Header{nats.MsgIdHdr:
[]string{"124"}},
+ Data: []byte("msg2"),
+ },
+ {
+ Subject: "subject.1",
+ Header: nats.Header{nats.MsgIdHdr:
[]string{"125"}},
+ Data: []byte("msg3"),
+ },
+ },
+ subject: "subject.*",
+ opts: []ReadOptionFn{
+ ReadEndSeqNo(4),
+ },
+ pubIndices: []int{0, 1, 2},
+ want: []any{
+ ConsumerMessage{
+ Subject: "subject.1",
+ ID: "123",
+ Headers:
map[string][]string{nats.MsgIdHdr: {"123"}},
+ Data: []byte("msg1"),
+ },
+ ConsumerMessage{
+ Subject: "subject.2",
+ ID: "124",
+ Headers:
map[string][]string{nats.MsgIdHdr: {"124"}},
+ Data: []byte("msg2"),
+ },
+ ConsumerMessage{
+ Subject: "subject.1",
+ ID: "125",
+ Headers:
map[string][]string{nats.MsgIdHdr: {"125"}},
+ Data: []byte("msg3"),
+ },
+ },
+ },
+ {
+ name: "Read messages from bounded stream with custom
fetch size",
+ input: []*nats.Msg{
+ {
+ Subject: "subject.1",
+ Header: nats.Header{nats.MsgIdHdr:
[]string{"123"}},
+ Data: []byte("msg1"),
+ },
+ {
+ Subject: "subject.1",
+ Header: nats.Header{nats.MsgIdHdr:
[]string{"124"}},
+ Data: []byte("msg2"),
+ },
+ },
+ subject: "subject.1",
+ opts: []ReadOptionFn{
+ ReadFetchSize(1),
+ ReadEndSeqNo(3),
+ },
+ pubIndices: []int{0, 1},
+ want: []any{
+ ConsumerMessage{
+ Subject: "subject.1",
+ ID: "123",
+ Headers:
map[string][]string{nats.MsgIdHdr: {"123"}},
+ Data: []byte("msg1"),
+ },
+ ConsumerMessage{
+ Subject: "subject.1",
+ ID: "124",
+ Headers:
map[string][]string{nats.MsgIdHdr: {"124"}},
+ Data: []byte("msg2"),
+ },
+ },
+ },
+ {
+ name: "Read messages from bounded stream with custom
start seq no",
+ input: []*nats.Msg{
+ {
+ Subject: "subject.1",
+ Header: nats.Header{nats.MsgIdHdr:
[]string{"123"}},
+ Data: []byte("msg1"),
+ },
+ {
+ Subject: "subject.1",
+ Header: nats.Header{nats.MsgIdHdr:
[]string{"124"}},
+ Data: []byte("msg2"),
+ },
+ },
+ subject: "subject.1",
+ opts: []ReadOptionFn{
+ ReadStartSeqNo(2),
+ ReadEndSeqNo(3),
+ },
+ pubIndices: []int{1},
+ want: []any{
+ ConsumerMessage{
+ Subject: "subject.1",
+ ID: "124",
+ Headers:
map[string][]string{nats.MsgIdHdr: {"124"}},
+ Data: []byte("msg2"),
+ },
+ },
+ },
+ }
+ for i, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctx := context.Background()
+ srv := newServer(t)
+ url := srv.ClientURL()
+ conn := newConn(t, url)
+ js := newJetStream(t, conn)
+
+ stream := fmt.Sprintf("STREAM-%d", i)
+ subjectFilter := "subject.*"
+
+ createStream(ctx, t, js, stream,
[]string{subjectFilter})
+ publishMessages(ctx, t, js, tt.input)
+
+ cons := createConsumer(ctx, t, js, stream,
[]string{subjectFilter})
+ pubMsgs := fetchMessages(t, cons, len(tt.input))
+ wantWTime := messagesWithPublishingTime(t, pubMsgs,
tt.pubIndices, tt.want)
+
+ p, s := beam.NewPipelineWithRoot()
+ got := Read(s, url, stream, tt.subject, tt.opts...)
+
+ passert.Equals(s, got, wantWTime...)
+ ptest.RunAndValidate(t, p)
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/io/natsio/common.go
b/sdks/go/pkg/beam/io/natsio/time_policy.go
similarity index 53%
copy from sdks/go/pkg/beam/io/natsio/common.go
copy to sdks/go/pkg/beam/io/natsio/time_policy.go
index 53f59551698..1c2dbf5165f 100644
--- a/sdks/go/pkg/beam/io/natsio/common.go
+++ b/sdks/go/pkg/beam/io/natsio/time_policy.go
@@ -13,46 +13,38 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Package natsio contains transforms for interacting with NATS.
package natsio
import (
- "fmt"
+ "time"
- "github.com/nats-io/nats.go"
- "github.com/nats-io/nats.go/jetstream"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
)
-type natsFn struct {
- URI string
- CredsFile string
- nc *nats.Conn
- js jetstream.JetStream
-}
+type timePolicy int
-func (fn *natsFn) Setup() error {
- var opts []nats.Option
- if fn.CredsFile != "" {
- opts = append(opts, nats.UserCredentials(fn.CredsFile))
- }
+const (
+ processingTimePolicy timePolicy = iota
+ publishingTimePolicy
+)
- conn, err := nats.Connect(fn.URI, opts...)
- if err != nil {
- return fmt.Errorf("error connecting to NATS: %v", err)
- }
- fn.nc = conn
+type timestampFn func(time.Time) mtime.Time
- js, err := jetstream.New(fn.nc)
- if err != nil {
- return fmt.Errorf("error creating JetStream context: %v", err)
- }
- fn.js = js
+func processingTime(_ time.Time) mtime.Time {
+ return mtime.Now()
+}
- return nil
+func publishingTime(t time.Time) mtime.Time {
+ return mtime.FromTime(t)
}
-func (fn *natsFn) Teardown() {
- if fn.nc != nil {
- fn.nc.Close()
+func (p timePolicy) TimestampFn() timestampFn {
+ switch p {
+ case processingTimePolicy:
+ return processingTime
+ case publishingTimePolicy:
+ return publishingTime
+ default:
+ panic("unsupported time policy")
}
}
diff --git a/sdks/go/pkg/beam/io/natsio/common.go
b/sdks/go/pkg/beam/io/natsio/time_policy_test.go
similarity index 52%
copy from sdks/go/pkg/beam/io/natsio/common.go
copy to sdks/go/pkg/beam/io/natsio/time_policy_test.go
index 53f59551698..2452334c8cd 100644
--- a/sdks/go/pkg/beam/io/natsio/common.go
+++ b/sdks/go/pkg/beam/io/natsio/time_policy_test.go
@@ -13,46 +13,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Package natsio contains transforms for interacting with NATS.
package natsio
import (
- "fmt"
+ "testing"
+ "time"
- "github.com/nats-io/nats.go"
- "github.com/nats-io/nats.go/jetstream"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
)
-type natsFn struct {
- URI string
- CredsFile string
- nc *nats.Conn
- js jetstream.JetStream
-}
+func Test_timePolicy_TimestampFn(t *testing.T) {
+ t.Run("processingTime", func(t *testing.T) {
+ pubTime := time.Date(2020, 1, 2, 3, 4, 5, 6e6, time.UTC)
-func (fn *natsFn) Setup() error {
- var opts []nats.Option
- if fn.CredsFile != "" {
- opts = append(opts, nats.UserCredentials(fn.CredsFile))
- }
-
- conn, err := nats.Connect(fn.URI, opts...)
- if err != nil {
- return fmt.Errorf("error connecting to NATS: %v", err)
- }
- fn.nc = conn
-
- js, err := jetstream.New(fn.nc)
- if err != nil {
- return fmt.Errorf("error creating JetStream context: %v", err)
- }
- fn.js = js
-
- return nil
-}
+ t1 := mtime.Now()
+ got := processingTimePolicy.TimestampFn()(pubTime)
+ t2 := mtime.Now()
+
+ if got < t1 || got > t2 {
+ t.Errorf("timestamp = %v, want between %v and %v", got,
t1, t2)
+ }
+ })
+
+ t.Run("publishingTime", func(t *testing.T) {
+ pubTime := time.Date(2020, 1, 2, 3, 4, 5, 6e6, time.UTC)
-func (fn *natsFn) Teardown() {
- if fn.nc != nil {
- fn.nc.Close()
- }
+ if got, want := publishingTimePolicy.TimestampFn()(pubTime),
mtime.FromTime(pubTime); got != want {
+ t.Errorf("timestamp = %v, want %v", got, want)
+ }
+ })
}
diff --git a/sdks/go/pkg/beam/io/natsio/common.go
b/sdks/go/pkg/beam/io/natsio/watermark_estimator.go
similarity index 52%
copy from sdks/go/pkg/beam/io/natsio/common.go
copy to sdks/go/pkg/beam/io/natsio/watermark_estimator.go
index 53f59551698..b23eb37ac85 100644
--- a/sdks/go/pkg/beam/io/natsio/common.go
+++ b/sdks/go/pkg/beam/io/natsio/watermark_estimator.go
@@ -13,46 +13,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Package natsio contains transforms for interacting with NATS.
package natsio
-import (
- "fmt"
+import "time"
- "github.com/nats-io/nats.go"
- "github.com/nats-io/nats.go/jetstream"
-)
-
-type natsFn struct {
- URI string
- CredsFile string
- nc *nats.Conn
- js jetstream.JetStream
+type watermarkEstimator struct {
+ state int64
}
-func (fn *natsFn) Setup() error {
- var opts []nats.Option
- if fn.CredsFile != "" {
- opts = append(opts, nats.UserCredentials(fn.CredsFile))
- }
-
- conn, err := nats.Connect(fn.URI, opts...)
- if err != nil {
- return fmt.Errorf("error connecting to NATS: %v", err)
- }
- fn.nc = conn
-
- js, err := jetstream.New(fn.nc)
- if err != nil {
- return fmt.Errorf("error creating JetStream context: %v", err)
- }
- fn.js = js
-
- return nil
+func (e *watermarkEstimator) CurrentWatermark() time.Time {
+ return time.UnixMilli(e.state)
}
-func (fn *natsFn) Teardown() {
- if fn.nc != nil {
- fn.nc.Close()
+func (e *watermarkEstimator) ObserveTimestamp(t time.Time) {
+ ms := t.UnixMilli()
+ if ms > e.state {
+ e.state = ms
}
}
diff --git a/sdks/go/pkg/beam/io/natsio/watermark_estimator_test.go
b/sdks/go/pkg/beam/io/natsio/watermark_estimator_test.go
new file mode 100644
index 00000000000..91a9a840a6e
--- /dev/null
+++ b/sdks/go/pkg/beam/io/natsio/watermark_estimator_test.go
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package natsio
+
+import (
+ "testing"
+ "time"
+)
+
+func Test_watermarkEstimator_CurrentWatermark(t *testing.T) {
+ ms := int64(1577934245000)
+ we := &watermarkEstimator{
+ state: ms,
+ }
+ if got, want := we.CurrentWatermark(), time.UnixMilli(ms); got != want {
+ t.Errorf("CurrentWatermark() = %v, want %v", got, want)
+ }
+}
+
+func Test_watermarkEstimator_ObserveTimestamp(t *testing.T) {
+ t1 := time.Date(2020, 1, 2, 3, 4, 5, 6e6, time.UTC)
+ t2 := time.Date(2020, 1, 2, 3, 4, 5, 7e6, time.UTC)
+
+ tests := []struct {
+ name string
+ state int64
+ t time.Time
+ want int64
+ }{
+ {
+ name: "Update watermark when the time is greater than
the current state",
+ state: t1.UnixMilli(),
+ t: t2,
+ want: t2.UnixMilli(),
+ },
+ {
+ name: "Keep existing watermark when the time is not
greater than the current state",
+ state: t2.UnixMilli(),
+ t: t1,
+ want: t2.UnixMilli(),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ we := &watermarkEstimator{
+ state: tt.state,
+ }
+ we.ObserveTimestamp(tt.t)
+ if got, want := we.state, tt.want; got != want {
+ t.Errorf("state = %v, want %v", got, want)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/io/natsio/write_test.go
b/sdks/go/pkg/beam/io/natsio/write_test.go
index 5e9387ece5f..678874ce11e 100644
--- a/sdks/go/pkg/beam/io/natsio/write_test.go
+++ b/sdks/go/pkg/beam/io/natsio/write_test.go
@@ -187,8 +187,8 @@ func TestWrite(t *testing.T) {
js := newJetStream(t, conn)
subjects := []string{subject}
- createStream(t, ctx, js, stream, subjects)
- cons := createConsumer(t, ctx, js, stream, subjects)
+ createStream(ctx, t, js, stream, subjects)
+ cons := createConsumer(ctx, t, js, stream, subjects)
p, s := beam.NewPipelineWithRoot()