jason810496 commented on code in PR #67315:
URL: https://github.com/apache/airflow/pull/67315#discussion_r3296225102


##########
go-sdk/pkg/execution/messages.go:
##########
@@ -0,0 +1,405 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "fmt"
+       "time"
+)
+
+// Inbound messages (Supervisor -> Runtime).
+
+// TaskInstanceInfo holds task instance details from StartupDetails.
+type TaskInstanceInfo struct {
+       ID             string
+       TaskID         string
+       DagID          string
+       RunID          string
+       TryNumber      int
+       DagVersionID   string
+       MapIndex       int
+       ContextCarrier map[string]any
+}
+
+func decodeTaskInstanceInfo(m map[string]any) (TaskInstanceInfo, error) {
+       if m == nil {
+               return TaskInstanceInfo{}, fmt.Errorf("nil task instance map")
+       }
+       id, err := mapString(m, "id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.id: %w", err)
+       }
+       taskID, err := mapString(m, "task_id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.task_id: %w", err)
+       }
+       dagID, err := mapString(m, "dag_id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.dag_id: %w", err)
+       }
+       runID, err := mapString(m, "run_id")
+       if err != nil {
+               return TaskInstanceInfo{}, fmt.Errorf("ti.run_id: %w", err)
+       }
+       tryNumber := mapIntOr(m, "try_number", 1)
+       dagVersionID := mapStringOr(m, "dag_version_id", "")
+       mapIndex := mapIntOr(m, "map_index", -1)
+       contextCarrier := mapMap(m, "context_carrier")
+
+       return TaskInstanceInfo{
+               ID:             id,
+               TaskID:         taskID,
+               DagID:          dagID,
+               RunID:          runID,
+               TryNumber:      tryNumber,
+               DagVersionID:   dagVersionID,
+               MapIndex:       mapIndex,
+               ContextCarrier: contextCarrier,
+       }, nil
+}
+
+// BundleInfoMsg holds bundle identification from StartupDetails.
+type BundleInfoMsg struct {
+       Name    string
+       Version string
+}
+
+func decodeBundleInfo(m map[string]any) BundleInfoMsg {
+       if m == nil {
+               return BundleInfoMsg{}
+       }
+       return BundleInfoMsg{
+               Name:    mapStringOr(m, "name", ""),
+               Version: mapStringOr(m, "version", ""),
+       }
+}
+
+// TIRunContext holds the runtime context for a task instance.
+type TIRunContext struct {
+       LogicalDate       *time.Time
+       DataIntervalStart *time.Time
+       DataIntervalEnd   *time.Time
+}
+
+func decodeTIRunContext(m map[string]any) (TIRunContext, error) {
+       if m == nil {
+               return TIRunContext{}, nil
+       }
+       ctx := TIRunContext{}
+       for _, f := range []struct {
+               key string
+               dst **time.Time
+       }{
+               {"logical_date", &ctx.LogicalDate},
+               {"data_interval_start", &ctx.DataIntervalStart},
+               {"data_interval_end", &ctx.DataIntervalEnd},
+       } {
+               raw, present := m[f.key]
+               if !present || raw == nil {
+                       continue
+               }
+               t, err := asTime(raw)
+               if err != nil {
+                       return TIRunContext{}, fmt.Errorf("ti_context.%s: %w", 
f.key, err)
+               }
+               *f.dst = &t
+       }
+       return ctx, nil
+}
+
+// StartupDetails is sent by the supervisor to initiate task execution.
+type StartupDetails struct {
+       TI                TaskInstanceInfo
+       DagRelPath        string
+       BundleInfo        BundleInfoMsg
+       StartDate         time.Time
+       TIContext         TIRunContext
+       SentryIntegration string
+}
+
+func decodeStartupDetails(m map[string]any) (*StartupDetails, error) {
+       tiMap := mapMap(m, "ti")
+       ti, err := decodeTaskInstanceInfo(tiMap)
+       if err != nil {
+               return nil, fmt.Errorf("decoding ti: %w", err)
+       }
+
+       dagRelPath := mapStringOr(m, "dag_rel_path", "")
+       bundleInfo := decodeBundleInfo(mapMap(m, "bundle_info"))
+
+       var startDate time.Time
+       if raw, present := m["start_date"]; present && raw != nil {
+               startDate, err = asTime(raw)
+               if err != nil {
+                       return nil, fmt.Errorf("start_date: %w", err)
+               }
+       }
+
+       tiContext, err := decodeTIRunContext(mapMap(m, "ti_context"))
+       if err != nil {
+               return nil, fmt.Errorf("decoding ti_context: %w", err)
+       }
+       sentryIntegration := mapStringOr(m, "sentry_integration", "")
+
+       return &StartupDetails{
+               TI:                ti,
+               DagRelPath:        dagRelPath,
+               BundleInfo:        bundleInfo,
+               StartDate:         startDate,
+               TIContext:         tiContext,
+               SentryIntegration: sentryIntegration,
+       }, nil
+}
+
+// Response types (for runtime-initiated requests).
+
+// ConnectionResult is the response to GetConnection.
+type ConnectionResult struct {
+       ConnID   string
+       ConnType string
+       Host     string
+       Schema   string
+       Login    string
+       Password string
+       Port     int
+       Extra    string
+}
+
+func decodeConnectionResult(m map[string]any) (*ConnectionResult, error) {
+       return &ConnectionResult{
+               ConnID:   mapStringOr(m, "conn_id", ""),
+               ConnType: mapStringOr(m, "conn_type", ""),
+               Host:     mapStringOr(m, "host", ""),
+               Schema:   mapStringOr(m, "schema", ""),
+               Login:    mapStringOr(m, "login", ""),
+               Password: mapStringOr(m, "password", ""),
+               Port:     mapIntOr(m, "port", 0),
+               Extra:    mapStringOr(m, "extra", ""),
+       }, nil
+}
+
+// VariableResult is the response to GetVariable.
+type VariableResult struct {
+       Key   string
+       Value any
+}
+
+func decodeVariableResult(m map[string]any) (*VariableResult, error) {
+       return &VariableResult{
+               Key:   mapStringOr(m, "key", ""),
+               Value: m["value"],
+       }, nil
+}
+
+// XComResult is the response to GetXCom.
+type XComResult struct {
+       Key   string
+       Value any
+}
+
+func decodeXComResult(m map[string]any) (*XComResult, error) {
+       return &XComResult{
+               Key:   mapStringOr(m, "key", ""),
+               Value: m["value"],
+       }, nil
+}
+
+// ErrorResponse represents an error returned by the supervisor.
+type ErrorResponse struct {
+       Error  string
+       Detail any
+}
+
+func decodeErrorResponse(m map[string]any) *ErrorResponse {
+       if m == nil {
+               return nil
+       }
+       return &ErrorResponse{
+               Error:  mapStringOr(m, "error", ""),
+               Detail: m["detail"],
+       }
+}
+
+// Outbound messages (Runtime -> Supervisor).
+
+// GetConnectionMsg is sent to request a connection from the supervisor.
+type GetConnectionMsg struct {
+       ConnID string
+}
+
+func (m GetConnectionMsg) toMap() map[string]any {
+       return map[string]any{
+               "type":    "GetConnection",
+               "conn_id": m.ConnID,
+       }
+}
+
+// GetVariableMsg is sent to request a variable from the supervisor.
+type GetVariableMsg struct {
+       Key string
+}
+
+func (m GetVariableMsg) toMap() map[string]any {
+       return map[string]any{
+               "type": "GetVariable",
+               "key":  m.Key,
+       }
+}
+
+// GetXComMsg is sent to request an XCom value from the supervisor.
+type GetXComMsg struct {
+       Key               string
+       DagID             string
+       TaskID            string
+       RunID             string
+       MapIndex          *int
+       IncludePriorDates bool
+}
+
+func (m GetXComMsg) toMap() map[string]any {
+       result := map[string]any{
+               "type":                "GetXCom",
+               "key":                 m.Key,
+               "dag_id":              m.DagID,
+               "task_id":             m.TaskID,
+               "run_id":              m.RunID,
+               "include_prior_dates": m.IncludePriorDates,
+       }
+       if m.MapIndex != nil {
+               result["map_index"] = *m.MapIndex
+       }
+       return result
+}
+
+// SetXComMsg is sent to set an XCom value.
+type SetXComMsg struct {
+       Key          string
+       Value        any
+       DagID        string
+       TaskID       string
+       RunID        string
+       MapIndex     int

Review Comment:
   Rest of the comments were addressed in 
https://github.com/apache/airflow/pull/67315/changes/9ca34584b896f60919dbe61bb5a92935d1434f8d.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to