lostluck commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1150984329
##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -574,3 +620,99 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
w.buf = append(w.buf, p...)
return len(p), nil
}
+
+func (c *DataChannel) makeTimerWriter(ctx context.Context, id clientID, family
string) *timerWriter {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ var m map[timerKey]*timerWriter
+ var ok bool
+ if m, ok = c.timerWriters[id.instID]; !ok {
+ m = make(map[timerKey]*timerWriter)
+ c.timerWriters[id.instID] = m
+ }
+ tk := timerKey{ptransformID: id.ptransformID, family: family}
+ if w, ok := m[tk]; ok {
+ return w
+ }
+
+ // We don't check for finished instructions for writers, as writers
+ // can only be created if an instruction is in scope, and aren't
+ // runner or user directed.
+
+ w := &timerWriter{ch: c, id: id, timerFamilyID: family}
+ m[tk] = w
+ return w
+}
+
+type timerWriter struct {
+ id clientID
+ timerFamilyID string
+ ch *DataChannel
+}
+
+// send requires the ch.mu lock to be held.
+func (w *timerWriter) send(msg *fnpb.Elements) error {
+ recordStreamSend(msg)
+ if err := w.ch.client.Send(msg); err != nil {
+ if err == io.EOF {
+ log.Warnf(context.TODO(), "dataWriter[%v;%v] EOF on
send; fetching real error", w.id, w.ch.id)
Review Comment:
Done!
Basically, if a setting is functionally "permanent" and decided on
construction, it's better to have a separate type instead of an if-statement.
Fewer ifs == faster execution. See also my long overdue performance work on the
DoFn execution hotpath, which is largely blocked until timers are done, so they
can be hotpathed if appropriate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]