damondouglas commented on code in PR #29900:
URL: https://github.com/apache/beam/pull/29900#discussion_r1445492009
##########
sdks/go/pkg/beam/runners/prism/internal/engine/data.go:
##########
@@ -49,6 +51,15 @@ func (d *TentativeData) WriteData(colID string, data []byte)
{
d.Raw[colID] = append(d.Raw[colID], data)
}
+// WriteTimers adds timers to the associated transform handler.
+func (d *TentativeData) WriteTimers(transformID, familyID string, timers
[]byte) {
Review Comment:
Related to the question above. What are timers the serialized representation
of - something that acts as a timer or identifies a timer?
##########
sdks/go/pkg/beam/runners/prism/internal/engine/data.go:
##########
@@ -39,6 +39,8 @@ type TentativeData struct {
// state is a map from transformID + UserStateID, to window, to
userKey, to datavalues.
state map[LinkID]map[typex.Window]map[string]StateData
+ // timers is a map from transformID + UserStateID, to window, to
userKey, to datavalues.
Review Comment:
Non blocking but perhaps I'm a data point on readability. I wrote this
question 5 times now and I'm just leaving it as it is. I see timers plural. Is
it correct that we are mapping a LinkID to a slice of serialized
representations of something that acts as a timer or identifies a timer?
##########
sdks/go/pkg/beam/runners/prism/internal/engine/timers.go:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package engine
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "math"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "google.golang.org/protobuf/encoding/protowire"
+)
+
+// DecodeTimer extracts timers to elements for insertion into their keyed
queues.
+// Returns the key bytes, tag, window exploded elements, and the hold
timestamp.
+// If the timer has been cleared, no elements will be returned. Any existing
timers
+// for the tag *must* be cleared from the pending queue.
+func decodeTimer(keyDec func(io.Reader) []byte, globalWindow bool, raw []byte)
([]byte, string, []element) {
Review Comment:
1) Should we have tests for this method?
2) Why does the Go Beam SDK have a lot of func as function arguments and
return types? Is there something inherent about encoding and decoding that
makes interfaces harder or impossible to use?
3) Could we rename globalWindow as isGlobalWindow?
4) What does raw encode?
##########
sdks/go/pkg/beam/core/timers/timers.go:
##########
@@ -53,6 +53,7 @@ type TimerMap struct {
type timerConfig struct {
Tag string
+ HoldSet bool
Review Comment:
Non-blocking question. I'm assuming that `HoldSet` is specifying to hold a
set of something, correct? If so, a set of what? Timers?
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -23,26 +23,46 @@ import (
"context"
"fmt"
"io"
+ "sort"
"strings"
"sync"
"sync/atomic"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "golang.org/x/exp/maps"
"golang.org/x/exp/slog"
)
type element struct {
- window typex.Window
- timestamp mtime.Time
- pane typex.PaneInfo
+ window typex.Window
+ timestamp mtime.Time
+ holdTimestamp mtime.Time // only used for Timers
+ pane typex.PaneInfo
+ transform, family, tag string // only used for Timers.
- elmBytes []byte
+ elmBytes []byte // When nil, indicates this is a timer.
Review Comment:
Do we have to use the same element struct for both timers and data because
they end up in the same elementHeap?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]