[
https://issues.apache.org/jira/browse/BEAM-3294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16289830#comment-16289830
]
ASF GitHub Bot commented on BEAM-3294:
--------------------------------------
kennknowles closed pull request #4237: [BEAM-3294] Make Go SDK external
transform a primitive
URL: https://github.com/apache/beam/pull/4237
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index f1fdc237c9c..9405573e8af 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -42,7 +42,7 @@ func (c Coder) IsValid() bool {
// Type returns the full type 'A' of elements the coder can encode and decode.
// 'A' must be a concrete Windowed Value type, such as W<int> or
// W<KV<int,string>>.
-func (c Coder) Type() typex.FullType {
+func (c Coder) Type() FullType {
if !c.IsValid() {
panic("Invalid Coder")
}
@@ -73,7 +73,7 @@ func UnwrapCoder(c Coder) *coder.Coder {
}
// NewCoder infers a Coder for any bound full type.
-func NewCoder(t typex.FullType) Coder {
+func NewCoder(t FullType) Coder {
c, err := inferCoder(t)
if err != nil {
panic(err) // for now
@@ -81,7 +81,7 @@ func NewCoder(t typex.FullType) Coder {
return Coder{c}
}
-func inferCoder(t typex.FullType) (*coder.Coder, error) {
+func inferCoder(t FullType) (*coder.Coder, error) {
switch t.Class() {
case typex.Concrete, typex.Container:
switch t.Type() {
@@ -122,7 +122,7 @@ func inferCoder(t typex.FullType) (*coder.Coder, error) {
}
}
-func inferCoders(list []typex.FullType) ([]*coder.Coder, error) {
+func inferCoders(list []FullType) ([]*coder.Coder, error) {
var ret []*coder.Coder
for _, t := range list {
c, err := inferCoder(t)
@@ -148,7 +148,7 @@ func JSONEnc(in typex.T) ([]byte, error) {
}
// JSONDec decodes the supplied JSON into an instance of the supplied type.
-func JSONDec(t reflect.Type, in []byte) (typex.T, error) {
+func JSONDec(t reflect.Type, in []byte) (T, error) {
val := reflect.New(t)
if err := json.Unmarshal(in, val.Interface()); err != nil {
return nil, err
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go
b/sdks/go/pkg/beam/core/graph/coder/coder.go
index 074d2a7ef99..7286aedd86c 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -13,6 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Package coder contains coder representation and utilities. Coders describe
+// how to serialize and deserialize pipeline data and may be provided by users.
package coder
import (
diff --git a/sdks/go/pkg/beam/core/graph/edge.go
b/sdks/go/pkg/beam/core/graph/edge.go
index e87286f23c4..4044cfaefc4 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -31,15 +31,14 @@ type Opcode string
// Valid opcodes.
const (
+ Impulse Opcode = "Impulse"
ParDo Opcode = "ParDo"
GBK Opcode = "GBK" // TODO: Unify with CoGBK?
- Source Opcode = "Source"
- Sink Opcode = "Sink"
+ External Opcode = "External"
Flatten Opcode = "Flatten"
Combine Opcode = "Combine"
DataSource Opcode = "DataSource"
DataSink Opcode = "DataSink"
- Impulse Opcode = "Impulse"
)
// InputKind represents the role of the input and its shape.
@@ -143,6 +142,12 @@ type Target struct {
Name string
}
+// Payload represents an external payload.
+type Payload struct {
+ URN string
+ Data []byte
+}
+
// TODO(herohde) 5/24/2017: how should we represent/obtain the coder for
Combine
// accumulator types? Coder registry? Assume JSON?
@@ -153,11 +158,12 @@ type MultiEdge struct {
parent *Scope
Op Opcode
- DoFn *DoFn // ParDo, Source.
- CombineFn *CombineFn // Combine.
- Port *Port // DataSource, DataSink.
- Target *Target // DataSource, DataSink.
+ DoFn *DoFn // ParDo
+ CombineFn *CombineFn // Combine
+ Port *Port // DataSource, DataSink
+ Target *Target // DataSource, DataSink
Value []byte // Impulse
+ Payload *Payload // External
Input []*Inbound
Output []*Outbound
@@ -246,21 +252,27 @@ func NewFlatten(g *Graph, s *Scope, in []*Node)
(*MultiEdge, error) {
return edge, nil
}
+// NewExternal inserts an External transform. The system makes no assumptions
about
+// what this transform might do.
+func NewExternal(g *Graph, s *Scope, payload *Payload, in []*Node, out
[]typex.FullType) *MultiEdge {
+ edge := g.NewEdge(s)
+ edge.Op = External
+ edge.Payload = payload
+ for _, n := range in {
+ edge.Input = append(edge.Input, &Inbound{Kind: Main, From: n,
Type: n.Type()})
+ }
+ for _, t := range out {
+ n := g.NewNode(t, inputWindow(in))
+ edge.Output = append(edge.Output, &Outbound{To: n, Type: t})
+ }
+ return edge
+}
+
// NewParDo inserts a new ParDo edge into the graph.
func NewParDo(g *Graph, s *Scope, u *DoFn, in []*Node, typedefs
map[string]reflect.Type) (*MultiEdge, error) {
return newDoFnNode(ParDo, g, s, u, in, typedefs)
}
-// NewSource inserts a Source transform.
-func NewSource(g *Graph, s *Scope, u *DoFn, typedefs map[string]reflect.Type)
(*MultiEdge, error) {
- return newDoFnNode(Source, g, s, u, nil, typedefs)
-}
-
-// NewSink inserts a Sink transform.
-func NewSink(g *Graph, s *Scope, u *DoFn, in *Node) (*MultiEdge, error) {
- return newDoFnNode(Sink, g, s, u, []*Node{in}, nil)
-}
-
func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs
map[string]reflect.Type) (*MultiEdge, error) {
// TODO(herohde) 5/22/2017: revisit choice of ProcessElement as
representative. We should
// perhaps create a synthetic method for binding purposes? The main
question is how to
diff --git a/sdks/go/pkg/beam/core/graph/window/window.go
b/sdks/go/pkg/beam/core/graph/window/window.go
index 306374bbbd6..ee05149eb54 100644
--- a/sdks/go/pkg/beam/core/graph/window/window.go
+++ b/sdks/go/pkg/beam/core/graph/window/window.go
@@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Package window contains window representation and utilities.
package window
import "fmt"
diff --git a/sdks/go/pkg/beam/core/runtime/exec/exec.go
b/sdks/go/pkg/beam/core/runtime/exec/exec.go
index 95e035acbf2..b8dff126081 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/exec.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/exec.go
@@ -13,6 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Package exec contains runtime plan representation and execution. A pipeline
+// must be translated to a runtime plan to be executed.
package exec
import (
diff --git a/sdks/go/pkg/beam/core/runtime/exec/nodes.go
b/sdks/go/pkg/beam/core/runtime/exec/nodes.go
index 57cd6233f93..7b76fab3614 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/nodes.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/nodes.go
@@ -90,84 +90,9 @@ func (m *Multiplex) String() string {
// NOTE(herohde) 5/1/2017: flatten is implicit in the execution model, so there
// is no need for a separate unit.
-// Source is a simplified source. It emits all elements in one invocation.
-type Source struct {
- UID UnitID
- Edge *graph.MultiEdge
- Out []Node
-}
-
-func (n *Source) ID() UnitID {
- return n.UID
-}
-
// TODO(herohde) 5/22/2017: Setup/StartBundle would be separate once we don't
// use purely single-bundle processing.
-func (n *Source) Up(ctx context.Context) error {
- if err := Up(ctx, n.Out...); err != nil {
- return err
- }
- if err := n.invoke(ctx, n.Edge.DoFn.SetupFn()); err != nil {
- return err
- }
- return n.invoke(ctx, n.Edge.DoFn.StartBundleFn())
-}
-
-func (n *Source) Process(ctx context.Context) error {
- return n.invoke(ctx, n.Edge.DoFn.ProcessElementFn())
-}
-
-func (n *Source) invoke(ctx context.Context, fn *funcx.Fn) error {
- if fn == nil {
- return nil
- }
-
- // (1) Populate contexts
-
- args := make([]reflect.Value, len(fn.Param))
-
- if index, ok := fn.Context(); ok {
- args[index] = reflect.ValueOf(ctx)
- }
-
- // NOTE: sources have no main or side input. We do not allow direct
form to
- // support "single value" sources.
-
- // (2) Outputs
-
- out := fn.Params(funcx.FnEmit)
- if len(out) != len(n.Out) {
- return fmt.Errorf("incorrect number of output nodes: %v, want
%v", len(n.Out), len(out))
- }
- for i := 0; i < len(out); i++ {
- param := fn.Param[out[i]]
- args[out[i]] = makeEmit(ctx, param.T, n.Out[i])
- }
-
- // (3) Invoke
-
- ret := fn.Fn.Call(args)
- if index, ok := fn.Error(); ok && ret[index].Interface() != nil {
- return fmt.Errorf("Source %v failed: %v", fn.Name,
ret[index].Interface())
- }
- return nil
-}
-
-func (n *Source) Down(ctx context.Context) error {
- if err := n.invoke(ctx, n.Edge.DoFn.FinishBundleFn()); err != nil {
- return err
- }
- if err := n.invoke(ctx, n.Edge.DoFn.TeardownFn()); err != nil {
- return err
- }
- return Down(ctx, n.Out...)
-}
-
-func (n *Source) String() string {
- return fmt.Sprintf("Source[%v] Out:%v", path.Base(n.Edge.DoFn.Name()),
IDs(n.Out...))
-}
-
// TODO(herohde) 4/26/2017: SideInput representation? We want it to be amenable
// to the State API. For now, just use Stream.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 2fd0d553860..8d629b1154c 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -318,6 +318,14 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
if s, ok := tryEncodeSpecial(t); ok {
return &v1.Type{Kind: v1.Type_SPECIAL, Special: s}, nil
}
+ if k, ok := runtime.TypeKey(t); ok {
+ if _, present := runtime.LookupType(k); present {
+ // External type. Serialize by key and lookup in
registry
+ // on decoding side.
+
+ return &v1.Type{Kind: v1.Type_EXTERNAL, ExternalKey:
k}, nil
+ }
+ }
// The supplied type isn't special, so apply the standard encodings.
switch t.Kind() {
@@ -354,15 +362,6 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
return &v1.Type{Kind: v1.Type_SLICE, Element: elm}, nil
case reflect.Struct:
- if k, ok := runtime.TypeKey(t); ok {
- if _, present := runtime.LookupType(k); present {
- // External type. Serialize by key and lookup
in registry
- // on decoding side.
-
- return &v1.Type{Kind: v1.Type_EXTERNAL,
ExternalKey: k}, nil
- }
- }
-
var fields []*v1.Type_StructField
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
diff --git a/sdks/go/pkg/beam/core/runtime/init.go
b/sdks/go/pkg/beam/core/runtime/init.go
index 54f7a5f0a4b..db9850f9593 100644
--- a/sdks/go/pkg/beam/core/runtime/init.go
+++ b/sdks/go/pkg/beam/core/runtime/init.go
@@ -13,6 +13,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Package runtime contains runtime hooks and utilities for pipeline options
+// and type registration. Most functionality done in init and hence is
+// available both during pipeline-submission and at runtime.
package runtime
var (
diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go
b/sdks/go/pkg/beam/core/typex/fulltype.go
index 3a7dcc5769b..8b5c65163c1 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype.go
@@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Package typex contains full type representation and utilities for type
checking.
package typex
import (
diff --git a/sdks/go/pkg/beam/external.go b/sdks/go/pkg/beam/external.go
index a7bab8f3ea1..5811b5a8299 100644
--- a/sdks/go/pkg/beam/external.go
+++ b/sdks/go/pkg/beam/external.go
@@ -17,11 +17,9 @@ package beam
import (
"fmt"
- "reflect"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
- "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)
// External defines a Beam external transform. The interpretation of this
primitive is runner
@@ -29,89 +27,38 @@ import (
// spec provided to implement the behavior of the operation. Transform
// libraries should expose an API that captures the user's intent and serialize
// the payload as a byte slice that the runner will deserialize.
-func External(s Scope, spec string, payload []byte, in []PCollection, out
[]reflect.Type) []PCollection {
+func External(s Scope, spec string, payload []byte, in []PCollection, out
[]FullType) []PCollection {
return MustN(TryExternal(s, spec, payload, in, out))
}
// TryExternal attempts to perform the work of External, returning an error
indicating why the operation
-// failed. Failure reasons include the use of side inputs, or an external
transform that has both inputs
-// and outputs.
-func TryExternal(s Scope, spec string, payload []byte, in []PCollection, out
[]reflect.Type) ([]PCollection, error) {
- switch {
- case len(in) == 0 && len(out) == 0:
- return []PCollection{}, fmt.Errorf("External node not
well-formed: out and in both empty")
- case len(in) > 0 && len(out) > 0:
- return []PCollection{}, fmt.Errorf("External DoFns are not
currently supported")
- case len(in) > 1:
- return []PCollection{}, fmt.Errorf("External operations with
side inputs are not currently supported")
- case len(out) > 1:
- return []PCollection{}, fmt.Errorf("External operations with
side outputs are not currently supported")
- case len(out) == 1:
- return tryExternalSource(s, spec, payload, out[0])
- case len(in) == 1:
- return tryExternalSink(s, in[0], spec, payload)
- }
-
- panic(fmt.Errorf("Impossible case: len[in]=%d, len[out]=%d", len(in),
len(out)))
-}
-
-// TODO(wcn): the use of dynamic functions was a creative hack to minimize
changes to the runtime
-// while we design aspects of this feature. While the public API is locked
down as above, the details
-// here about how the payload is conveyed to the runner will certainly change,
as this is a top-level
-// primitive. Runners depending on this coding do so AT THEIR OWN RISK and
will be broken when we convert
-// this implementation to its final internal representation.
-
-func tryExternalSource(s Scope, spec string, payload []byte, out reflect.Type)
([]PCollection, error) {
+// failed.
+func TryExternal(s Scope, spec string, payload []byte, in []PCollection, out
[]FullType) ([]PCollection, error) {
if !s.IsValid() {
return nil, fmt.Errorf("invalid scope")
}
- emit := reflect.FuncOf([]reflect.Type{out}, nil, false)
- fnT := reflect.FuncOf([]reflect.Type{emit},
[]reflect.Type{reflectx.Error}, false)
-
- gen := makeGenWithErrorMessage("ExternalSource node cannot be directly
executed")
- g := &graph.DynFn{Name: "ExternalSource", Data: payload, T: fnT, Gen:
gen}
- fn, err := graph.NewDoFn(g)
-
- if err != nil {
- return []PCollection{}, err
- }
- edge, err := graph.NewSource(s.real, s.scope, fn, nil)
- if err != nil {
- return []PCollection{}, err
- }
- ret := PCollection{edge.Output[0].To}
- ret.SetCoder(NewCoder(ret.Type()))
- return []PCollection{ret}, nil
-}
-
-func tryExternalSink(s Scope, in PCollection, spec string, payload []byte)
([]PCollection, error) {
- if !s.IsValid() {
- return nil, fmt.Errorf("invalid scope")
+ for i, col := range in {
+ if !col.IsValid() {
+ return nil, fmt.Errorf("invalid pcollection to
external: index %v", i)
+ }
}
- if !in.IsValid() {
- return []PCollection{}, fmt.Errorf("invalid main pcollection")
+ for _, t := range out {
+ if !typex.IsW(t) {
+ return nil, fmt.Errorf("output type to external must be
windowed: %v", t)
+ }
}
- fnT := reflect.FuncOf([]reflect.Type{typex.SkipW(in.n.Type()).Type()},
[]reflect.Type{reflectx.Error}, false)
- gen := makeGenWithErrorMessage("ExternalSink node cannot be directly
executed")
- g := &graph.DynFn{Name: "ExternalSink", Data: payload, T: fnT, Gen: gen}
- fn, err := graph.NewDoFn(g)
-
- if err != nil {
- return []PCollection{}, err
- }
- _, err = graph.NewSink(s.real, s.scope, fn, in.n)
- if err != nil {
- return []PCollection{}, err
+ var ins []*graph.Node
+ for _, col := range in {
+ ins = append(ins, col.n)
}
- return []PCollection{}, nil
-}
+ edge := graph.NewExternal(s.real, s.scope, &graph.Payload{URN: spec,
Data: payload}, ins, out)
-func makeGenWithErrorMessage(msg string) func([]byte) func([]reflect.Value)
[]reflect.Value {
- return func(in []byte) func([]reflect.Value) []reflect.Value {
- return func(args []reflect.Value) []reflect.Value {
- ret := reflect.ValueOf(fmt.Errorf(msg))
- return []reflect.Value{ret.Convert(reflectx.Error)}
- }
+ var ret []PCollection
+ for _, out := range edge.Output {
+ c := PCollection{out.To}
+ c.SetCoder(NewCoder(c.Type()))
+ ret = append(ret, c)
}
+ return ret, nil
}
diff --git a/sdks/go/pkg/beam/forward.go b/sdks/go/pkg/beam/forward.go
index 5a47791a8fe..858b3d16d61 100644
--- a/sdks/go/pkg/beam/forward.go
+++ b/sdks/go/pkg/beam/forward.go
@@ -63,6 +63,8 @@ var PipelineOptions = runtime.GlobalOptions
// We forward typex types used in UserFn signatures to avoid having such code
// depend on the typex package directly.
+type FullType = typex.FullType
+
type T = typex.T
type U = typex.U
type V = typex.V
diff --git a/sdks/go/pkg/beam/pcollection.go b/sdks/go/pkg/beam/pcollection.go
index c41918b1b47..de2d56bdfe1 100644
--- a/sdks/go/pkg/beam/pcollection.go
+++ b/sdks/go/pkg/beam/pcollection.go
@@ -54,7 +54,7 @@ func (p PCollection) IsValid() bool {
// Type returns the full type 'A' of the elements. 'A' must be a concrete
// Windowed Value type, such as W<int> or W<KV<int,string>>.
-func (p PCollection) Type() typex.FullType {
+func (p PCollection) Type() FullType {
if !p.IsValid() {
panic("Invalid PCollection")
}
diff --git a/sdks/go/pkg/beam/runners/dataflow/translate.go
b/sdks/go/pkg/beam/runners/dataflow/translate.go
index e896d48c0b7..34cd46badb0 100644
--- a/sdks/go/pkg/beam/runners/dataflow/translate.go
+++ b/sdks/go/pkg/beam/runners/dataflow/translate.go
@@ -181,17 +181,6 @@ func translateEdge(edge *graph.MultiEdge) (string,
properties, error) {
Element: []string{url.QueryEscape(value)},
}, nil
- case graph.Source:
- fn, err := serializeFn(edge)
- if err != nil {
- return "", properties{}, err
- }
- return "ParallelRead", properties{
- CustomSourceInputStep: newCustomSourceInputStep(fn),
- UserName: buildName(edge.Scope(),
edge.DoFn.Name()),
- Format: "custom_source",
- }, nil
-
case graph.ParDo:
fn, err := serializeFn(edge)
if err != nil {
@@ -227,11 +216,6 @@ func translateEdge(edge *graph.MultiEdge) (string,
properties, error) {
SerializedFn: sfn,
}, nil
- case graph.Sink:
- return "ParallelWrite", properties{
- // TODO
- }, nil
-
case graph.Flatten:
return "Flatten", properties{
UserName: buildName(edge.Scope(), "flatten"), // TODO:
user-defined
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go
b/sdks/go/pkg/beam/runners/direct/direct.go
index 5e171138d9b..85c8463f4cf 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -89,10 +89,6 @@ func build(mgr exec.DataManager, instID string, list
[]*graph.MultiEdge) ([]exec
unit := &Impulse{UID: idgen.New(), Edge: edge}
units = append(units, unit)
- case graph.Source:
- unit := &exec.Source{UID: idgen.New(), Edge: edge}
- units = append(units, unit)
-
case graph.ParDo:
unit := &exec.ParDo{UID: idgen.New(), Edge: edge}
units = append(units, unit)
@@ -226,8 +222,6 @@ func build(mgr exec.DataManager, instID string, list
[]*graph.MultiEdge) ([]exec
func getEdge(unit exec.Unit) (*graph.MultiEdge, bool) {
switch unit.(type) {
- case *exec.Source:
- return unit.(*exec.Source).Edge, true
case *exec.ParDo:
return unit.(*exec.ParDo).Edge, true
case *exec.Combine:
@@ -247,8 +241,6 @@ func getEdge(unit exec.Unit) (*graph.MultiEdge, bool) {
func setOut(unit exec.Unit, out []exec.Node) {
switch unit.(type) {
- case *exec.Source:
- unit.(*exec.Source).Out = out
case *exec.ParDo:
unit.(*exec.ParDo).Out = out
case *exec.Combine:
diff --git a/sdks/go/pkg/beam/util/syscallx/syscall.go
b/sdks/go/pkg/beam/util/syscallx/syscall.go
index ca352ecc5df..61f3a03fa74 100644
--- a/sdks/go/pkg/beam/util/syscallx/syscall.go
+++ b/sdks/go/pkg/beam/util/syscallx/syscall.go
@@ -14,7 +14,7 @@
// limitations under the License.
// Package syscallx provides system call utilities that attempt to hide
platform
-// differences. Operations return UnsupportedErr if not implemented on the
+// differences. Operations return ErrUnsupported if not implemented on the
// given platform. Consumers of this package should generally treat that
// error specially.
package syscallx
@@ -23,5 +23,5 @@ import (
"errors"
)
-// UnsupportedErr is the error returned for unsupported operations.
-var UnsupportedErr = errors.New("not supported on platform")
+// ErrUnsupported is the error returned for unsupported operations.
+var ErrUnsupported = errors.New("not supported on platform")
diff --git a/sdks/go/pkg/beam/util/syscallx/syscall_default.go
b/sdks/go/pkg/beam/util/syscallx/syscall_default.go
index ccc93244a42..a85cd3f0e7e 100644
--- a/sdks/go/pkg/beam/util/syscallx/syscall_default.go
+++ b/sdks/go/pkg/beam/util/syscallx/syscall_default.go
@@ -19,10 +19,10 @@ package syscallx
// PhysicalMemorySize returns the total physical memory size.
func PhysicalMemorySize() (uint64, error) {
- return 0, UnsupportedErr
+ return 0, ErrUnsupported
}
// FreeDiskSpace returns the free disk space for a given path.
func FreeDiskSpace(path string) (uint64, error) {
- return 0, UnsupportedErr
+ return 0, ErrUnsupported
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Move to graph.External and remove Source/Sink
> ---------------------------------------------
>
> Key: BEAM-3294
> URL: https://issues.apache.org/jira/browse/BEAM-3294
> Project: Beam
> Issue Type: Task
> Components: sdk-go
> Reporter: Henning Rohde
> Assignee: Henning Rohde
> Priority: Minor
>
> Cleanup: eliminate Source/Sink and promote beam.External to the graph level.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)