lostluck commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1150984329


##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -574,3 +620,99 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
        w.buf = append(w.buf, p...)
        return len(p), nil
 }
+
+func (c *DataChannel) makeTimerWriter(ctx context.Context, id clientID, family 
string) *timerWriter {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       var m map[timerKey]*timerWriter
+       var ok bool
+       if m, ok = c.timerWriters[id.instID]; !ok {
+               m = make(map[timerKey]*timerWriter)
+               c.timerWriters[id.instID] = m
+       }
+       tk := timerKey{ptransformID: id.ptransformID, family: family}
+       if w, ok := m[tk]; ok {
+               return w
+       }
+
+       // We don't check for finished instructions for writers, as writers
+       // can only be created if an instruction is in scope, and aren't
+       // runner or user directed.
+
+       w := &timerWriter{ch: c, id: id, timerFamilyID: family}
+       m[tk] = w
+       return w
+}
+
+type timerWriter struct {
+       id            clientID
+       timerFamilyID string
+       ch            *DataChannel
+}
+
+// send requires the ch.mu lock to be held.
+func (w *timerWriter) send(msg *fnpb.Elements) error {
+       recordStreamSend(msg)
+       if err := w.ch.client.Send(msg); err != nil {
+               if err == io.EOF {
+                       log.Warnf(context.TODO(), "dataWriter[%v;%v] EOF on 
send; fetching real error", w.id, w.ch.id)

Review Comment:
   Done!
   
   Basically, if a setting is functionally "permanent" and decided on 
construction, it's better to have a separate type instead of an if-statement. 
Fewer ifs == faster execution. See also my long overdue performance work on the 
DoFn execution hotpath, which is largely blocked until timers are done, so they 
can be hotpathed if appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to