[
https://issues.apache.org/jira/browse/BEAM-3866?focusedWorklogId=82977&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82977
]
ASF GitHub Bot logged work on BEAM-3866:
----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Mar/18 22:47
Start Date: 21/Mar/18 22:47
Worklog Time Spent: 10m
Work Description: tgroh closed pull request #4919: [BEAM-3866] Remove
windowed value requirement for Go SDK External
URL: https://github.com/apache/beam/pull/4919
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/README.md b/sdks/go/README.md
index ed0a669dd53..f9a08e0b6db 100644
--- a/sdks/go/README.md
+++ b/sdks/go/README.md
@@ -37,39 +37,34 @@ are parameterized by Go flags. For example, to run
wordcount do:
$ pwd
[...]/sdks/go
$ go run examples/wordcount/wordcount.go --output=/tmp/result.txt
-2017/12/05 10:46:37 Pipeline:
-2017/12/05 10:46:37 Nodes: {1: W<[]uint8>/GW/W<bytes>!GW}
-{2: W<string>/GW/W<bytes>!GW}
-{3: W<string>/GW/W<bytes>!GW}
-{4: W<string>/GW/W<bytes>!GW}
-{5: W<string>/GW/W<bytes>!GW}
-{6: W<KV<string,int>>/GW/W<KV<bytes,int[json]>>!GW}
-{7: W<GBK<string,int>>/GW/W<GBK<bytes,int[json]>>!GW}
-{8: W<KV<string,int>>/GW/W<KV<bytes,int[json]>>!GW}
-{9: W<string>/GW/W<bytes>!GW}
-Edges: 1: Impulse [] -> [Out: W<[]uint8> -> {1: W<[]uint8>/GW/W<bytes>!GW}]
-2: ParDo [In(Main): W<[]uint8> <- {1: W<[]uint8>/GW/W<bytes>!GW}] -> [Out:
W<T> -> {2: W<string>/GW/W<bytes>!GW}]
-3: ParDo [In(Main): W<string> <- {2: W<string>/GW/W<bytes>!GW}] -> [Out:
W<string> -> {3: W<string>/GW/W<bytes>!GW}]
-4: ParDo [In(Main): W<string> <- {3: W<string>/GW/W<bytes>!GW}] -> [Out:
W<string> -> {4: W<string>/GW/W<bytes>!GW}]
-5: ParDo [In(Main): W<string> <- {4: W<string>/GW/W<bytes>!GW}] -> [Out:
W<string> -> {5: W<string>/GW/W<bytes>!GW}]
-6: ParDo [In(Main): W<T> <- {5: W<string>/GW/W<bytes>!GW}] -> [Out:
W<KV<T,int>> -> {6: W<KV<string,int>>/GW/W<KV<bytes,int[json]>>!GW}]
-7: GBK [In(Main): KV<T,U> <- {6:
W<KV<string,int>>/GW/W<KV<bytes,int[json]>>!GW}] -> [Out: GBK<T,U> -> {7:
W<GBK<string,int>>/GW/W<GBK<bytes,int[json]>>!GW}]
-8: Combine [In(Main): W<int> <- {7:
W<GBK<string,int>>/GW/W<GBK<bytes,int[json]>>!GW}] -> [Out: W<KV<string,int>>
-> {8: W<KV<string,int>>/GW/W<KV<bytes,int[json]>>!GW}]
-9: ParDo [In(Main): W<KV<string,int>> <- {8:
W<KV<string,int>>/GW/W<KV<bytes,int[json]>>!GW}] -> [Out: W<string> -> {9:
W<string>/GW/W<bytes>!GW}]
-10: ParDo [In(Main): W<string> <- {9: W<string>/GW/W<bytes>!GW}] -> []
-2017/12/05 10:46:37 Execution units:
-2017/12/05 10:46:37 1: Impulse[0]
-2017/12/05 10:46:37 2: ParDo[beam.createFn] Out:[3]
-2017/12/05 10:46:37 3: ParDo[textio.expandFn] Out:[4]
-2017/12/05 10:46:37 4: ParDo[textio.readFn] Out:[5]
-2017/12/05 10:46:37 5: ParDo[main.extractFn] Out:[6]
-2017/12/05 10:46:37 6: ParDo[stats.mapFn] Out:[7]
-2017/12/05 10:46:37 7: GBK. Out:8
-2017/12/05 10:46:37 8: Combine[stats.sumIntFn] Keyed:true (Use:false) Out:[9]
-2017/12/05 10:46:37 9: ParDo[main.formatFn] Out:[10]
-2017/12/05 10:46:37 10: ParDo[textio.writeFileFn] Out:[]
-2017/12/05 10:46:37 Reading from
gs://apache-beam-samples/shakespeare/kinglear.txt
-2017/12/05 10:46:38 Writing to /tmp/result.txt
+[{6: KV<string,int>/GW/KV<bytes,int[varintz]>}]
+[{10: KV<int,string>/GW/KV<int[varintz],bytes>}]
+2018/03/21 09:39:03 Pipeline:
+2018/03/21 09:39:03 Nodes: {1: []uint8/GW/bytes}
+{2: string/GW/bytes}
+{3: string/GW/bytes}
+{4: string/GW/bytes}
+{5: string/GW/bytes}
+{6: KV<string,int>/GW/KV<bytes,int[varintz]>}
+{7: CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}
+{8: KV<string,int>/GW/KV<bytes,int[varintz]>}
+{9: string/GW/bytes}
+{10: KV<int,string>/GW/KV<int[varintz],bytes>}
+{11: CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}
+Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/GW/bytes}]
+2: ParDo [In(Main): []uint8 <- {1: []uint8/GW/bytes}] -> [Out: T -> {2:
string/GW/bytes}]
+3: ParDo [In(Main): string <- {2: string/GW/bytes}] -> [Out: string -> {3:
string/GW/bytes}]
+4: ParDo [In(Main): string <- {3: string/GW/bytes}] -> [Out: string -> {4:
string/GW/bytes}]
+5: ParDo [In(Main): string <- {4: string/GW/bytes}] -> [Out: string -> {5:
string/GW/bytes}]
+6: ParDo [In(Main): T <- {5: string/GW/bytes}] -> [Out: KV<T,int> -> {6:
KV<string,int>/GW/KV<bytes,int[varintz]>}]
+7: CoGBK [In(Main): KV<string,int> <- {6:
KV<string,int>/GW/KV<bytes,int[varintz]>}] -> [Out: CoGBK<string,int> -> {7:
CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}]
+8: Combine [In(Main): int <- {7:
CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}] -> [Out: KV<string,int> -> {8:
KV<string,int>/GW/KV<bytes,int[varintz]>}]
+9: ParDo [In(Main): KV<string,int> <- {8:
KV<string,int>/GW/KV<bytes,int[varintz]>}] -> [Out: string -> {9:
string/GW/bytes}]
+10: ParDo [In(Main): T <- {9: string/GW/bytes}] -> [Out: KV<int,T> -> {10:
KV<int,string>/GW/KV<int[varintz],bytes>}]
+11: CoGBK [In(Main): KV<int,string> <- {10:
KV<int,string>/GW/KV<int[varintz],bytes>}] -> [Out: CoGBK<int,string> -> {11:
CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}]
+12: ParDo [In(Main): CoGBK<int,string> <- {11:
CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}] -> []
+2018/03/21 09:39:03 Reading from
gs://apache-beam-samples/shakespeare/kinglear.txt
+2018/03/21 09:39:04 Writing to /tmp/result.txt
```
The debugging output is currently quite verbose and likely to change. The
output is a local
diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index db0c06a3991..5081feca062 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -46,8 +46,7 @@ func (c Coder) IsValid() bool {
}
// Type returns the full type 'A' of elements the coder can encode and decode.
-// 'A' must be a concrete Windowed Value type, such as W<int> or
-// W<KV<int,string>>.
+// 'A' must be a concrete full type, such as int or KV<int,string>.
func (c Coder) Type() FullType {
if !c.IsValid() {
panic("Invalid Coder")
diff --git a/sdks/go/pkg/beam/create.go b/sdks/go/pkg/beam/create.go
index 7b9a995a438..ac3fe231ff7 100644
--- a/sdks/go/pkg/beam/create.go
+++ b/sdks/go/pkg/beam/create.go
@@ -30,11 +30,11 @@ func init() {
// TODO(herohde) 7/11/2017: add variants that use coder encoding.
// Create inserts a fixed set of values into the pipeline. The values must
-// be of the same type 'A' and the returned PCollection is of type W<A>.
+// be of the same type 'A' and the returned PCollection is of type A.
// For example:
//
-// foo := beam.Create(s, "a", "b", "c") // foo : W<string>
-// bar := beam.Create(s, 1, 2, 3) // bar : W<int>
+// foo := beam.Create(s, "a", "b", "c") // foo : string
+// bar := beam.Create(s, 1, 2, 3) // bar : int
//
// The returned PCollections can be used as any other PCollections. The values
// are JSON-coded. Each runner may place limits on the sizes of the values and
@@ -47,7 +47,7 @@ func Create(s Scope, values ...interface{}) PCollection {
// array. It is a convenience wrapper over Create. For example:
//
// list := []string{"a", "b", "c"}
-// foo := beam.CreateList(s, list) // foo : W<string>
+// foo := beam.CreateList(s, list) // foo : string
func CreateList(s Scope, list interface{}) PCollection {
var ret []interface{}
val := reflect.ValueOf(list)
diff --git a/sdks/go/pkg/beam/external.go b/sdks/go/pkg/beam/external.go
index 5811b5a8299..337d59b02f2 100644
--- a/sdks/go/pkg/beam/external.go
+++ b/sdks/go/pkg/beam/external.go
@@ -19,7 +19,6 @@ import (
"fmt"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
- "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
)
// External defines a Beam external transform. The interpretation of this
primitive is runner
@@ -42,11 +41,6 @@ func TryExternal(s Scope, spec string, payload []byte, in
[]PCollection, out []F
return nil, fmt.Errorf("invalid pcollection to
external: index %v", i)
}
}
- for _, t := range out {
- if !typex.IsW(t) {
- return nil, fmt.Errorf("output type to external must be
windowed: %v", t)
- }
- }
var ins []*graph.Node
for _, col := range in {
diff --git a/sdks/go/pkg/beam/gbk.go b/sdks/go/pkg/beam/gbk.go
index 966c3a3e567..33faffb1633 100644
--- a/sdks/go/pkg/beam/gbk.go
+++ b/sdks/go/pkg/beam/gbk.go
@@ -21,9 +21,9 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
)
-// GroupByKey is a PTransform that takes a PCollection of type W<KV<A,B>,
+// GroupByKey is a PTransform that takes a PCollection of type KV<A,B>,
// groups the values by key and windows, and returns a PCollection of type
-// W<GBK<A,B>> representing a map from each distinct key and window of the
+// GBK<A,B> representing a map from each distinct key and window of the
// input PCollection to an iterable over all the values associated with
// that key in the input per window. Each key in the output PCollection is
// unique within each window.
diff --git a/sdks/go/pkg/beam/impulse.go b/sdks/go/pkg/beam/impulse.go
index b5bb2b8ef6d..a542884393e 100644
--- a/sdks/go/pkg/beam/impulse.go
+++ b/sdks/go/pkg/beam/impulse.go
@@ -22,7 +22,7 @@ import (
// Impulse emits a single empty []byte into the global window. The resulting
// PCollection is a singleton of type []byte. For example:
//
-// foo := beam.Impulse(p) // foo : W<[]byte>
+// foo := beam.Impulse(p) // foo : []byte
//
// The purpose of Impulse is to trigger another transform, such as
// ones that take all information as side inputs.
@@ -33,7 +33,7 @@ func Impulse(s Scope) PCollection {
// ImpulseValue emits the supplied byte slice into the global window. The
resulting
// PCollection is a singleton of type []byte. For example:
//
-// foo := beam.ImpulseValue(s, []byte{}) // foo : W<[]byte>
+// foo := beam.ImpulseValue(s, []byte{}) // foo : []byte
//
func ImpulseValue(s Scope, value []byte) PCollection {
if !s.IsValid() {
diff --git a/sdks/go/pkg/beam/util.go b/sdks/go/pkg/beam/util.go
index e730765c61d..08fabf81537 100644
--- a/sdks/go/pkg/beam/util.go
+++ b/sdks/go/pkg/beam/util.go
@@ -16,6 +16,7 @@
package beam
func init() {
+ RegisterFunction(addFixedKeyFn)
RegisterFunction(dropKeyFn)
RegisterFunction(dropValueFn)
RegisterFunction(swapKVFn)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 82977)
Time Spent: 1h 50m (was: 1h 40m)
> Move Go SDK to not use WindowedValue for PCollections
> -----------------------------------------------------
>
> Key: BEAM-3866
> URL: https://issues.apache.org/jira/browse/BEAM-3866
> Project: Beam
> Issue Type: Bug
> Components: sdk-go
> Reporter: Henning Rohde
> Assignee: Henning Rohde
> Priority: Major
> Labels: portability
> Fix For: 2.5.0
>
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> The windowing information is part of the gRPC instructions. Dataflow still
> expects the old way.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)