[ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95754
 ]

ASF GitHub Bot logged work on BEAM-3303:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/18 22:29
            Start Date: 26/Apr/18 22:29
    Worklog Time Spent: 10m 
      Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184548981
 
 

 ##########
 File path: sdks/go/pkg/beam/runners/dataflow/translate.go
 ##########
 @@ -454,50 +457,52 @@ func stepID(id int) string {
        return fmt.Sprintf("s%v", id)
 }
 
-func translateWindow(w *window.Window) proto.Message {
-       // TODO: The only windowing strategy we support is the global window.
-       if w.Kind() != window.GlobalWindow {
-               panic(fmt.Sprintf("Unsupported window type supplied: %v", w))
-       }
-       // We compute the fixed content of this message for use in workflows.
-       msg := rnapi_pb.MessageWithComponents{
+func translateWindowingStrategy(w *window.WindowingStrategy) proto.Message {
+       c := graphx.NewCoderMarshaller()
+       ws := graphx.MarshalWindowingStrategy(c, w)
+
+       msg := &rnapi_pb.MessageWithComponents{
                Components: &rnapi_pb.Components{
-                       Coders: map[string]*rnapi_pb.Coder{
-                               "Coder": &rnapi_pb.Coder{
-                                       Spec: &rnapi_pb.SdkFunctionSpec{
-                                               Spec: &rnapi_pb.FunctionSpec{
-                                                       Urn: 
"urn:beam:coders:global_window:0.1",
-                                               },
-                                       },
-                               },
-                       },
+                       Coders: c.Build(),
                },
                Root: &rnapi_pb.MessageWithComponents_WindowingStrategy{
-                       WindowingStrategy: &rnapi_pb.WindowingStrategy{
-                               WindowFn: &rnapi_pb.SdkFunctionSpec{
-                                       Spec: &rnapi_pb.FunctionSpec{
-                                               Urn: 
"beam:windowfn:global_windows:v0.1",
-                                       },
-                               },
-                               MergeStatus:      
rnapi_pb.MergeStatus_NON_MERGING,
-                               AccumulationMode: 
rnapi_pb.AccumulationMode_DISCARDING,
-                               WindowCoderId:    "Coder",
-                               Trigger: &rnapi_pb.Trigger{
-                                       Trigger: &rnapi_pb.Trigger_Default_{
-                                               Default: 
&rnapi_pb.Trigger_Default{},
-                                       },
-                               },
-                               OutputTime:      
rnapi_pb.OutputTime_END_OF_WINDOW,
-                               ClosingBehavior: 
rnapi_pb.ClosingBehavior_EMIT_IF_NONEMPTY,
-                               AllowedLateness: 0,
-                       },
+                       WindowingStrategy: ws,
                },
        }
-
-       return &msg
+       return msg
 }
 
 func encodeSerializedFn(in proto.Message) (string, error) {
-       // The Beam Runner API uses URL query escaping for serialized fn 
messages.
-       return protox.EncodeQueryEscaped(in)
+       // The Beam Runner API uses special escaping for serialized fn messages.
+
+       data, err := proto.Marshal(in)
+       if err != nil {
+               return "", err
+       }
+       return encodeString(data), nil
+}
+
+// encodeString is a custom encoding used in some cases by Dataflow.
+//
+// Uses a simple strategy of converting each byte to a single char,
 
 Review comment:
   Seems you're right. Good catch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 95754)
    Time Spent: 5h 20m  (was: 5h 10m)

> Go windowing support
> --------------------
>
>                 Key: BEAM-3303
>                 URL: https://issues.apache.org/jira/browse/BEAM-3303
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Henning Rohde
>            Priority: Major
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to