Copilot commented on code in PR #67154:
URL: https://github.com/apache/airflow/pull/67154#discussion_r3278889575


##########
go-sdk/pkg/execution/frames.go:
##########
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/vmihailenco/msgpack/v5"
+)
+
+// IncomingFrame represents a decoded frame received from the comm socket.
+type IncomingFrame struct {
+       ID   int
+       Body map[string]any
+       Err  map[string]any // non-nil only for response frames (3-element 
arrays)
+}
+
+// encodeRequest encodes a request frame (2-element msgpack array: [id, body]).
+func encodeRequest(id int, body map[string]any) ([]byte, error) {
+       var buf bytes.Buffer
+       enc := msgpack.NewEncoder(&buf)
+       enc.UseCompactInts(true)
+
+       if err := enc.EncodeArrayLen(2); err != nil {
+               return nil, err
+       }
+       if err := enc.EncodeInt(int64(id)); err != nil {
+               return nil, err
+       }
+       if err := enc.Encode(body); err != nil {
+               return nil, err
+       }
+       return buf.Bytes(), nil
+}
+
+// writeFrame writes a length-prefixed msgpack payload to the writer.
+// Format: [4-byte big-endian length][payload bytes]
+func writeFrame(w io.Writer, payload []byte) error {
+       prefix := make([]byte, 4)
+       binary.BigEndian.PutUint32(prefix, uint32(len(payload)))
+       if _, err := w.Write(prefix); err != nil {
+               return fmt.Errorf("writing length prefix: %w", err)
+       }
+       if _, err := w.Write(payload); err != nil {
+               return fmt.Errorf("writing payload: %w", err)
+       }
+       return nil
+}
+
+// readFrame reads one length-prefixed msgpack frame from the reader and 
decodes it.
+func readFrame(r io.Reader) (IncomingFrame, error) {
+       // Read 4-byte big-endian length prefix.
+       prefix := make([]byte, 4)
+       if _, err := io.ReadFull(r, prefix); err != nil {
+               return IncomingFrame{}, fmt.Errorf("reading length prefix: %w", 
err)
+       }
+       payloadLen := binary.BigEndian.Uint32(prefix)
+
+       // Read the payload.
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {

Review Comment:
   readFrame trusts the 4-byte length prefix and allocates a payload slice of 
that size with no upper bound. A malformed/hostile peer (or corrupted stream) 
can trigger huge allocations/OOM; consider enforcing a maximum frame size and 
rejecting/limiting reads when payloadLen exceeds it.



##########
go-sdk/pkg/execution/comms.go:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "fmt"
+       "io"
+       "log/slog"
+       "sync"
+       "sync/atomic"
+)
+
+// CoordinatorComm manages bidirectional communication with the Airflow 
supervisor
+// over a length-prefixed msgpack socket connection.
+type CoordinatorComm struct {
+       reader io.Reader
+       writer io.Writer
+       nextID atomic.Int32
+       logger *slog.Logger
+
+       wmu sync.Mutex // serialises writes
+       rmu sync.Mutex // serialises reads
+}
+
+// NewCoordinatorComm creates a new communication channel.
+func NewCoordinatorComm(reader io.Reader, writer io.Writer, logger 
*slog.Logger) *CoordinatorComm {
+       return &CoordinatorComm{
+               reader: reader,
+               writer: writer,
+               logger: logger,
+       }
+}
+
+// ReadMessage reads and decodes one frame from the comm socket.
+// It returns the raw IncomingFrame with decoded map bodies.
+func (c *CoordinatorComm) ReadMessage() (IncomingFrame, error) {
+       c.rmu.Lock()
+       defer c.rmu.Unlock()
+       frame, err := readFrame(c.reader)
+       if err != nil {
+               return IncomingFrame{}, fmt.Errorf("reading frame: %w", err)
+       }
+       c.logger.Debug("Received frame", "id", frame.ID)
+       return frame, nil
+}
+
+// SendRequest sends a request frame (2-element: [id, body]) to the supervisor.
+func (c *CoordinatorComm) SendRequest(id int, body map[string]any) error {
+       payload, err := encodeRequest(id, body)
+       if err != nil {
+               return fmt.Errorf("encoding request: %w", err)
+       }
+       c.logger.Debug("Sending request", "id", id)
+       c.wmu.Lock()
+       defer c.wmu.Unlock()
+       return writeFrame(c.writer, payload)
+}
+
+// Communicate sends a request and waits for the corresponding response.
+// This is a synchronous request-response: the caller sends a request and 
blocks
+// until the next frame arrives. The protocol is single-threaded on the comm 
socket.
+//
+// If the response contains an error element, it is returned as an ApiError.
+// Otherwise, the response body map is returned.
+func (c *CoordinatorComm) Communicate(body map[string]any) (map[string]any, 
error) {
+       id := int(c.nextID.Add(1) - 1)
+
+       if err := c.SendRequest(id, body); err != nil {
+               return nil, err

Review Comment:
   CoordinatorComm.Communicate assumes the next frame read is the response for 
the request it just sent, but it never verifies frame.ID matches the generated 
id. This will break if calls happen concurrently or if the supervisor can send 
frames out-of-order; either serialize the full send+read with a single mutex 
(or implement a per-id dispatcher) and validate the response id before 
returning.



##########
go-sdk/pkg/execution/task_runner.go:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "context"
+       "fmt"
+       "log/slog"
+       "runtime/debug"
+       "time"
+
+       "github.com/google/uuid"
+
+       "github.com/apache/airflow/go-sdk/bundle/bundlev1"
+       "github.com/apache/airflow/go-sdk/pkg/api"
+       "github.com/apache/airflow/go-sdk/pkg/sdkcontext"
+       "github.com/apache/airflow/go-sdk/sdk"
+)
+
+// RunTask executes a task based on StartupDetails received from the 
supervisor.
+//
+// It looks up the task in the bundle, creates a CoordinatorClient for SDK
+// calls, executes the task, and returns a terminal message body
+// (SucceedTaskMsg or TaskStateMsg) ready to ship as the final response frame.
+//
+// The supervisor owns the Execution-API state transitions in coordinator
+// mode, so we deliberately bypass worker.ExecuteTaskWorkload (which drives
+// Run / UpdateState itself) and only invoke the user's task function.
+func RunTask(
+       bundle bundlev1.Bundle,
+       details *StartupDetails,
+       comm *CoordinatorComm,
+       logger *slog.Logger,
+) map[string]any {
+       task, exists := bundle.LookupTask(details.TI.DagID, details.TI.TaskID)
+       if !exists {
+               logger.Error("Task not registered",
+                       "dag_id", details.TI.DagID,
+                       "task_id", details.TI.TaskID,
+               )
+               return TaskStateMsg{State: TaskStateRemoved, EndDate: 
time.Now().UTC()}.toMap()
+       }
+
+       client := NewCoordinatorClient(comm, details)
+
+       // taskFunction.sendXcom reads the workload from context to get the task
+       // instance ids; populate it the same shape the gRPC path uses.
+       tiUUID, _ := uuid.Parse(details.TI.ID)

Review Comment:
   uuid.Parse(details.TI.ID) ignores the returned error, which means an invalid 
TI id from the supervisor will silently become the zero UUID and be used in the 
workload context (impacting XCom pushes/heartbeats/etc). Please handle the 
parse error and return an appropriate terminal TaskState (or propagate a 
protocol-level error).
   



##########
go-sdk/pkg/execution/server.go:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package execution implements the SDK coordinator-protocol runtime
+// (msgpack-over-IPC). It is the second mode of bundlev1server.Serve: when
+// the bundle binary is launched with --comm/--logs by the Airflow supervisor
+// (Python ExecutableCoordinator), bundlev1server.Serve dispatches here.
+//
+// The first inbound frame on the comm socket selects between two
+// sub-protocols:
+//
+//   - DagFileParseRequest: one-shot, returns DagFileParsingResult and exits.
+//   - StartupDetails:       multi-round task execution.
+//
+// See go-sdk/adr/0003-coordinator-protocol-msgpack-ipc.md.
+package execution
+
+import (
+       "fmt"
+       "log/slog"
+       "net"
+       "sync"
+
+       "github.com/apache/airflow/go-sdk/bundle/bundlev1"
+)
+
+// Serve runs the bundle binary in coordinator mode. It dials the supervisor's
+// comm and logs sockets, installs an slog handler that writes JSON-line
+// records to the logs connection, and dispatches on the first frame.
+//
+// Serve returns nil on a clean shutdown (one-shot DAG parse or task execution
+// completed); a non-nil error indicates a protocol-level failure (connection
+// loss, malformed frames, unknown first message type).
+func Serve(provider bundlev1.BundleProvider, commAddr, logsAddr string) error {
+       if commAddr == "" {
+               return fmt.Errorf("missing --comm=host:port argument")
+       }
+       if logsAddr == "" {
+               return fmt.Errorf("missing --logs=host:port argument")
+       }
+
+       // Buffer log records until the logs socket is connected. Anything the
+       // runtime emits between Connect-time and the first frame still gets
+       // flushed.
+       logHandler := NewSocketLogHandler(nil, slog.LevelDebug)
+       logger := slog.New(logHandler)
+       slog.SetDefault(logger)
+
+       // Connect to both sockets concurrently so the supervisor can accept 
them
+       // in either order.
+       var commConn, logsConn net.Conn
+       var commErr, logsErr error
+       var wg sync.WaitGroup
+       wg.Add(2)
+       go func() {
+               defer wg.Done()
+               commConn, commErr = net.Dial("tcp", commAddr)
+       }()

Review Comment:
   net.Dial has no timeout and can block indefinitely if the supervisor address 
is unreachable. Consider using net.Dialer with a reasonable Timeout (and/or 
context) for both comm/logs sockets so coordinator-mode startup fails fast 
rather than hanging.



##########
go-sdk/bundle/bundlev1/bundlev1server/server.go:
##########
@@ -52,41 +69,97 @@ func (s serveConfigFunc) ApplyServeOpt(in *ServerConfig) 
error {
 
 type ServerConfig struct{}
 
-// Serve is the entrypoint for your bundle, and sets it up ready for Airflow's 
Go Worker to use
+// serveMode tags the protocol the binary will speak this run.
+type serveMode int
+
+const (
+       modePlugin                serveMode = iota // go-plugin gRPC (existing 
Edge Worker path)
+       modeMetadataDump                           // --bundle-metadata: print 
BundleInfo JSON
+       modeCoordinator                            // --comm/--logs: 
msgpack-over-IPC (ADR 0003)
+       modeCoordinatorUsageError                  // misuse: print usage and 
exit non-zero
+)
+
+// Serve is the entrypoint for your bundle, and sets it up ready for Airflow's
+// Go Worker (go-plugin) or Python supervisor (coordinator protocol) to use.
+//
+// The mode is decided from CLI flags and process environment, so user code is
+// always one line:
 //
-// Zero or more options to configure the server may also be passed. There are 
no options yet, this is to allow
-// future changes without breaking compatibility
+//     func main() { bundlev1server.Serve(&myBundle{}) }
+//
+// Zero or more options to configure the server may also be passed. There are
+// no options yet; the parameter exists to allow future additions without
+// breaking compatibility.
 func Serve(bundle bundlev1.BundleProvider, opts ...ServeOpt) error {
        config.SetupViper("")
 
-       hcLogger := hclog.New(&hclog.LoggerOptions{
-               Level:                    hclog.Trace,
-               Output:                   os.Stderr,
-               JSONFormat:               true,
-               IncludeLocation:          true,
-               AdditionalLocationOffset: 3,
-       })
-
-       log := slog.New(hclogslog.Adapt(hcLogger))
-       slog.SetDefault(log)
-
        flag.Parse()
 
        serveConfig := &ServerConfig{}
        for _, c := range opts {
                c.ApplyServeOpt(serveConfig)
        }
 
+       switch decideMode() {
+       case modeMetadataDump:
+               return dumpBundleMetadata(bundle)
+       case modeCoordinator:
+               // In coordinator mode the supervisor reads the logs channel for
+               // structured records, so configuring the hclog/stderr default
+               // logger here is unnecessary — execution.Serve installs its own
+               // slog handler against the logs socket before any user code 
runs.
+               return execution.Serve(bundle, *commAddr, *logsAddr)
+       case modePlugin:
+               installPluginLogger()
+               return servePlugin(bundle)
+       case modeCoordinatorUsageError:
+               fmt.Fprintln(os.Stderr, "error: --comm and --logs must be 
supplied together")
+               flag.CommandLine.SetOutput(os.Stderr)
+               flag.Usage()
+               os.Exit(2)

Review Comment:
   Serve() has an error-returning signature but calls os.Exit(2) on flag 
misuse, which makes it hard to test and prevents callers from handling the 
error. Consider returning an error (and letting main decide whether to print 
usage/exit) instead of exiting inside the library entrypoint.
   



##########
go-sdk/pkg/execution/messages.go:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "fmt"
+       "time"
+)
+
+// Inbound messages (Supervisor -> Runtime).
+
+// DagFileParseRequest is sent by the supervisor to request DAG parsing.
+type DagFileParseRequest struct {
+       File       string
+       BundlePath string
+}
+
+func decodeDagFileParseRequest(m map[string]any) (*DagFileParseRequest, error) 
{
+       file, err := mapString(m, "file")
+       if err != nil {
+               return nil, err
+       }
+       bundlePath := mapStringOr(m, "bundle_path", "")
+       return &DagFileParseRequest{File: file, BundlePath: bundlePath}, nil
+}
+
+// TaskInstanceInfo holds task instance details from StartupDetails.
+type TaskInstanceInfo struct {
+       ID             string
+       TaskID         string
+       DagID          string
+       RunID          string
+       TryNumber      int
+       DagVersionID   string
+       MapIndex       int
+       ContextCarrier map[string]any
+}
+
+func decodeTaskInstanceInfo(m map[string]any) (TaskInstanceInfo, error) {
+       if m == nil {
+               return TaskInstanceInfo{}, fmt.Errorf("nil task instance map")
+       }
+       id, err := mapString(m, "id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.id: %w", err)
+       }
+       taskID, err := mapString(m, "task_id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.task_id: %w", err)
+       }
+       dagID, err := mapString(m, "dag_id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.dag_id: %w", err)
+       }
+       runID, err := mapString(m, "run_id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.run_id: %w", err)
+       }
+       tryNumber := mapIntOr(m, "try_number", 1)
+       dagVersionID := mapStringOr(m, "dag_version_id", "")
+       mapIndex := mapIntOr(m, "map_index", -1)
+       contextCarrier := mapMap(m, "context_carrier")
+
+       return TaskInstanceInfo{
+               ID:             id,
+               TaskID:         taskID,
+               DagID:          dagID,
+               RunID:          runID,
+               TryNumber:      tryNumber,
+               DagVersionID:   dagVersionID,
+               MapIndex:       mapIndex,
+               ContextCarrier: contextCarrier,
+       }, nil
+}
+
+// BundleInfoMsg holds bundle identification from StartupDetails.
+type BundleInfoMsg struct {
+       Name    string
+       Version string
+}
+
+func decodeBundleInfo(m map[string]any) BundleInfoMsg {
+       if m == nil {
+               return BundleInfoMsg{}
+       }
+       return BundleInfoMsg{
+               Name:    mapStringOr(m, "name", ""),
+               Version: mapStringOr(m, "version", ""),
+       }
+}
+
+// TIRunContext holds the runtime context for a task instance.
+type TIRunContext struct {
+       LogicalDate       *time.Time
+       DataIntervalStart *time.Time
+       DataIntervalEnd   *time.Time
+}
+
+func decodeTIRunContext(m map[string]any) TIRunContext {
+       if m == nil {
+               return TIRunContext{}
+       }
+       ctx := TIRunContext{}
+       if t, err := asTime(m["logical_date"]); err == nil {
+               ctx.LogicalDate = &t
+       }
+       if t, err := asTime(m["data_interval_start"]); err == nil {
+               ctx.DataIntervalStart = &t
+       }
+       if t, err := asTime(m["data_interval_end"]); err == nil {
+               ctx.DataIntervalEnd = &t
+       }
+       return ctx
+}
+
+// StartupDetails is sent by the supervisor to initiate task execution.
+type StartupDetails struct {
+       TI                TaskInstanceInfo
+       DagRelPath        string
+       BundleInfo        BundleInfoMsg
+       StartDate         time.Time
+       TIContext         TIRunContext
+       SentryIntegration string
+}
+
+func decodeStartupDetails(m map[string]any) (*StartupDetails, error) {
+       tiMap := mapMap(m, "ti")
+       ti, err := decodeTaskInstanceInfo(tiMap)
+       if err != nil {
+               return nil, fmt.Errorf("decoding ti: %w", err)
+       }
+
+       dagRelPath := mapStringOr(m, "dag_rel_path", "")
+       bundleInfo := decodeBundleInfo(mapMap(m, "bundle_info"))
+
+       var startDate time.Time
+       if t, err := asTime(m["start_date"]); err == nil {
+               startDate = t

Review Comment:
   decodeStartupDetails ignores start_date parse errors and leaves StartDate as 
the zero time. If start_date is provided but malformed, this should likely be 
treated as a decoding error (or at least logged) to avoid silently running 
tasks with an incorrect start date.
   



##########
go-sdk/pkg/execution/messages.go:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "fmt"
+       "time"
+)
+
+// Inbound messages (Supervisor -> Runtime).
+
+// DagFileParseRequest is sent by the supervisor to request DAG parsing.
+type DagFileParseRequest struct {
+       File       string
+       BundlePath string
+}
+
+func decodeDagFileParseRequest(m map[string]any) (*DagFileParseRequest, error) 
{
+       file, err := mapString(m, "file")
+       if err != nil {
+               return nil, err
+       }
+       bundlePath := mapStringOr(m, "bundle_path", "")
+       return &DagFileParseRequest{File: file, BundlePath: bundlePath}, nil
+}
+
+// TaskInstanceInfo holds task instance details from StartupDetails.
+type TaskInstanceInfo struct {
+       ID             string
+       TaskID         string
+       DagID          string
+       RunID          string
+       TryNumber      int
+       DagVersionID   string
+       MapIndex       int
+       ContextCarrier map[string]any
+}
+
+func decodeTaskInstanceInfo(m map[string]any) (TaskInstanceInfo, error) {
+       if m == nil {
+               return TaskInstanceInfo{}, fmt.Errorf("nil task instance map")
+       }
+       id, err := mapString(m, "id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.id: %w", err)
+       }
+       taskID, err := mapString(m, "task_id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.task_id: %w", err)
+       }
+       dagID, err := mapString(m, "dag_id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.dag_id: %w", err)
+       }
+       runID, err := mapString(m, "run_id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.run_id: %w", err)
+       }
+       tryNumber := mapIntOr(m, "try_number", 1)
+       dagVersionID := mapStringOr(m, "dag_version_id", "")
+       mapIndex := mapIntOr(m, "map_index", -1)
+       contextCarrier := mapMap(m, "context_carrier")
+
+       return TaskInstanceInfo{
+               ID:             id,
+               TaskID:         taskID,
+               DagID:          dagID,
+               RunID:          runID,
+               TryNumber:      tryNumber,
+               DagVersionID:   dagVersionID,
+               MapIndex:       mapIndex,
+               ContextCarrier: contextCarrier,
+       }, nil
+}
+
+// BundleInfoMsg holds bundle identification from StartupDetails.
+type BundleInfoMsg struct {
+       Name    string
+       Version string
+}
+
+func decodeBundleInfo(m map[string]any) BundleInfoMsg {
+       if m == nil {
+               return BundleInfoMsg{}
+       }
+       return BundleInfoMsg{
+               Name:    mapStringOr(m, "name", ""),
+               Version: mapStringOr(m, "version", ""),
+       }
+}
+
+// TIRunContext holds the runtime context for a task instance.
+type TIRunContext struct {
+       LogicalDate       *time.Time
+       DataIntervalStart *time.Time
+       DataIntervalEnd   *time.Time
+}
+
+func decodeTIRunContext(m map[string]any) TIRunContext {
+       if m == nil {
+               return TIRunContext{}
+       }
+       ctx := TIRunContext{}
+       if t, err := asTime(m["logical_date"]); err == nil {
+               ctx.LogicalDate = &t
+       }
+       if t, err := asTime(m["data_interval_start"]); err == nil {
+               ctx.DataIntervalStart = &t
+       }
+       if t, err := asTime(m["data_interval_end"]); err == nil {
+               ctx.DataIntervalEnd = &t
+       }
+       return ctx

Review Comment:
   decodeTIRunContext silently drops logical_date/data_interval_* values when 
asTime fails (wrong type/invalid RFC3339). For protocol messages it’s usually 
better to return an explicit decoding error when a field is present but 
malformed, so mismatches don’t get hidden as nil timestamps.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to