[ 
https://issues.apache.org/jira/browse/BEAM-3827?focusedWorklogId=103199&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103199
 ]

ASF GitHub Bot logged work on BEAM-3827:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/May/18 01:37
            Start Date: 18/May/18 01:37
    Worklog Time Spent: 10m 
      Work Description: jasonkuster closed pull request #5326: [BEAM-3827] Add 
Go integration tests
URL: https://github.com/apache/beam/pull/5326
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/examples/windowed_wordcount/windowed_wordcount.go 
b/sdks/go/examples/windowed_wordcount/windowed_wordcount.go
index 753a0a12f57..f6748ab2fa9 100644
--- a/sdks/go/examples/windowed_wordcount/windowed_wordcount.go
+++ b/sdks/go/examples/windowed_wordcount/windowed_wordcount.go
@@ -42,13 +42,13 @@ import (
        "reflect"
        "time"
 
-       "github.com/apache/beam/sdks/go/examples/windowed_wordcount/wordcount"
        "github.com/apache/beam/sdks/go/pkg/beam"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+       "github.com/apache/beam/sdks/go/test/integration/wordcount"
 )
 
 var (
diff --git a/sdks/go/pkg/beam/io/filesystem/memfs/memory.go 
b/sdks/go/pkg/beam/io/filesystem/memfs/memory.go
new file mode 100644
index 00000000000..b9d19ccbbde
--- /dev/null
+++ b/sdks/go/pkg/beam/io/filesystem/memfs/memory.go
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package memfs contains a in-memory Beam filesystem. Useful for testing.
+package memfs
+
+import (
+       "bytes"
+       "context"
+       "io"
+       "io/ioutil"
+       "os"
+       "sort"
+       "strings"
+       "sync"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
+)
+
+func init() {
+       filesystem.Register("memfs", New)
+}
+
+var instance = &fs{m: make(map[string][]byte)}
+
+type fs struct {
+       m  map[string][]byte
+       mu sync.Mutex
+}
+
+// New returns the global memory filesystem.
+func New(ctx context.Context) filesystem.Interface {
+       return instance
+}
+
+func (f *fs) Close() error {
+       return nil
+}
+
+func (f *fs) List(ctx context.Context, glob string) ([]string, error) {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+
+       var ret []string
+       for k := range f.m {
+               ret = append(ret, k)
+       }
+       sort.Strings(ret)
+       return ret, nil
+}
+
+func (f *fs) OpenRead(ctx context.Context, filename string) (io.ReadCloser, 
error) {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+
+       if v, ok := f.m[normalize(filename)]; ok {
+               return ioutil.NopCloser(bytes.NewReader(v)), nil
+       }
+       return nil, os.ErrNotExist
+}
+
+func (f *fs) OpenWrite(ctx context.Context, filename string) (io.WriteCloser, 
error) {
+       return &commitWriter{key: filename}, nil
+}
+
+// Write stores the given key and value in the global store.
+func Write(key string, value []byte) {
+       instance.mu.Lock()
+       defer instance.mu.Unlock()
+
+       cp := make([]byte, len(value))
+       copy(cp, value)
+
+       instance.m[normalize(key)] = cp
+}
+
+func normalize(key string) string {
+       if strings.HasPrefix(key, "memfs://") {
+               return key
+       }
+       return "memfs://" + key
+}
+
+type commitWriter struct {
+       key string
+       buf bytes.Buffer
+}
+
+func (w *commitWriter) Write(p []byte) (n int, err error) {
+       return w.buf.Write(p)
+}
+
+func (w *commitWriter) Close() error {
+       Write(w.key, w.buf.Bytes())
+       return nil
+}
diff --git a/sdks/go/pkg/beam/io/filesystem/memfs/memory_test.go 
b/sdks/go/pkg/beam/io/filesystem/memfs/memory_test.go
new file mode 100644
index 00000000000..de3f7f913b3
--- /dev/null
+++ b/sdks/go/pkg/beam/io/filesystem/memfs/memory_test.go
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package memfs
+
+import (
+       "context"
+       "os"
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
+)
+
+// TestReadWrite tests that read and write from the memory filesystem
+// works as expected.
+func TestReadWrite(t *testing.T) {
+       ctx := context.Background()
+       fs := New(ctx)
+
+       if err := filesystem.Write(ctx, fs, "foo", []byte("foo")); err != nil {
+               t.Error(err)
+       }
+       if err := filesystem.Write(ctx, fs, "bar", []byte("bar")); err != nil {
+               t.Error(err)
+       }
+
+       foo, err := filesystem.Read(ctx, fs, "foo")
+       if err != nil {
+               t.Errorf("Read(foo) failed: %v", err)
+       }
+       if string(foo) != "foo" {
+               t.Errorf("Read(foo) = %v, want foo", string(foo))
+       }
+
+       bar, err := filesystem.Read(ctx, fs, "bar")
+       if err != nil {
+               t.Errorf("Read(bar) failed: %v", err)
+       }
+       if string(bar) != "bar" {
+               t.Errorf("Read(bar) = %v, want bar", string(bar))
+       }
+
+       if _, err := filesystem.Read(ctx, fs, "baz"); err != os.ErrNotExist {
+               t.Errorf("Read(baz) = %v, want os.ErrNotExist", err)
+       }
+}
+
+// TestDirectWrite tests that that direct writes to the memory filesystem 
works.
+func TestDirectWrite(t *testing.T) {
+       ctx := context.Background()
+       fs := New(ctx)
+
+       Write("foo2", []byte("foo"))
+
+       foo, err := filesystem.Read(ctx, fs, "foo2")
+       if err != nil {
+               t.Errorf("Read(foo2) failed: %v", err)
+       }
+       if string(foo) != "foo" {
+               t.Errorf("Read(foo2) = %v, want foo", string(foo))
+       }
+}
diff --git a/sdks/go/pkg/beam/io/filesystem/util.go 
b/sdks/go/pkg/beam/io/filesystem/util.go
new file mode 100644
index 00000000000..c22165bd037
--- /dev/null
+++ b/sdks/go/pkg/beam/io/filesystem/util.go
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package filesystem
+
+import (
+       "context"
+       "io/ioutil"
+)
+
+// Read fully reads the given file from the file system.
+func Read(ctx context.Context, fs Interface, filename string) ([]byte, error) {
+       r, err := fs.OpenRead(ctx, filename)
+       if err != nil {
+               return nil, err
+       }
+       defer r.Close()
+
+       return ioutil.ReadAll(r)
+}
+
+// Write writes the given content to the file system.
+func Write(ctx context.Context, fs Interface, filename string, data []byte) 
error {
+       w, err := fs.OpenWrite(ctx, filename)
+       if err != nil {
+               return err
+       }
+
+       if _, err := w.Write(data); err != nil {
+               return err
+       }
+       return w.Close()
+}
diff --git a/sdks/go/pkg/beam/options/jobopts/options.go 
b/sdks/go/pkg/beam/options/jobopts/options.go
index 765849b238d..99df6b1b076 100644
--- a/sdks/go/pkg/beam/options/jobopts/options.go
+++ b/sdks/go/pkg/beam/options/jobopts/options.go
@@ -25,6 +25,8 @@ import (
        "strings"
        "time"
 
+       "sync/atomic"
+
        "github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
@@ -62,11 +64,14 @@ func GetEndpoint() (string, error) {
        return *Endpoint, nil
 }
 
-// GetJobName returns the specified job name or, if not present, an
+var unique int32
+
+// GetJobName returns the specified job name or, if not present, a fresh
 // autogenerated name. Convenience function.
 func GetJobName() string {
        if *JobName == "" {
-               *JobName = fmt.Sprintf("go-job-%v", time.Now().UnixNano())
+               id := atomic.AddInt32(&unique, 1)
+               return fmt.Sprintf("go-job-%v-%v", id, time.Now().UnixNano())
        }
        return *JobName
 }
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 38d04fee143..605f584ca1d 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -24,7 +24,6 @@ import (
        "fmt"
        "io"
        "os"
-       "os/user"
        "path"
        "time"
 
@@ -97,7 +96,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        var jobLabels map[string]string
        if *labels != "" {
                if err := json.Unmarshal([]byte(*labels), &jobLabels); err != 
nil {
-                       return fmt.Errorf("Error reading --label flag as JSON: 
%v", err)
+                       return fmt.Errorf("error reading --label flag as JSON: 
%v", err)
                }
        }
        jobName := jobopts.GetJobName()
@@ -126,21 +125,22 @@ func Execute(ctx context.Context, p *beam.Pipeline) error 
{
 
        // (1) Upload Go binary and model to GCS.
 
-       if *jobopts.WorkerBinary == "" {
+       bin := *jobopts.WorkerBinary
+       if bin == "" {
                worker, err := runnerlib.BuildTempWorkerBinary(ctx)
                if err != nil {
                        return err
                }
                defer os.Remove(worker)
 
-               *jobopts.WorkerBinary = worker
+               bin = worker
        } else {
-               log.Infof(ctx, "Using specified worker binary: '%v'", 
*jobopts.WorkerBinary)
+               log.Infof(ctx, "Using specified worker binary: '%v'", bin)
        }
 
-       log.Infof(ctx, "Staging worker binary: %v", *jobopts.WorkerBinary)
+       log.Infof(ctx, "Staging worker binary: %v", bin)
 
-       binary, err := stageWorker(ctx, project, *stagingLocation, 
*jobopts.WorkerBinary)
+       binary, err := stageWorker(ctx, project, *stagingLocation, bin)
        if err != nil {
                return err
        }
@@ -209,7 +209,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                        Experiments:       append(jobopts.GetExperiments(), 
"beam_fn_api"),
                },
                Labels: jobLabels,
-               Steps: steps,
+               Steps:  steps,
        }
 
        if *numWorkers > 0 {
@@ -331,13 +331,6 @@ func stageWorker(ctx context.Context, project, location, 
worker string) (string,
        return gcsx.Upload(client, project, bucket, obj, fd)
 }
 
-func username() string {
-       if u, err := user.Current(); err == nil {
-               return u.Username
-       }
-       return "anon"
-}
-
 func findPipelineFlags() []*displayData {
        var ret []*displayData
 
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
index 2549a8bc14d..52cf4682d21 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
@@ -45,18 +45,18 @@ func BuildTempWorkerBinary(ctx context.Context) (string, 
error) {
 //
 //   
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/runners/beamexec/main.go
 (skip: 2)
 // * 
/Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go
 (skip: 3)
-//   /usr/local/go/src/runtime/proc.go (skip: 4)
-//   /usr/local/go/src/runtime/asm_amd64.s (skip: 5)
+//   /usr/local/go/src/runtime/proc.go (skip: 4)      // not always present
+//   /usr/local/go/src/runtime/asm_amd64.s (skip: 4 or 5)
 func BuildWorkerBinary(ctx context.Context, filename string) error {
        program := ""
        for i := 3; ; i++ {
                _, file, _, ok := runtime.Caller(i)
-               if !ok || strings.HasSuffix(file, "runtime/proc.go") {
+               if !ok || !strings.HasSuffix(file, ".go") || 
strings.HasSuffix(file, "runtime/proc.go") {
                        break
                }
                program = file
        }
-       if program == "" {
+       if !strings.HasSuffix(program, ".go") {
                return fmt.Errorf("could not detect user main")
        }
 
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
index 39065a9cff8..0eb6bcd53cb 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
@@ -48,19 +48,20 @@ func Execute(ctx context.Context, p *pb.Pipeline, endpoint 
string, opt *JobOptio
 
        // (2) Stage artifacts.
 
-       if opt.Worker == "" {
+       bin := opt.Worker
+       if bin == "" {
                worker, err := BuildTempWorkerBinary(ctx)
                if err != nil {
                        return "", err
                }
                defer os.Remove(worker)
 
-               opt.Worker = worker
+               bin = worker
        } else {
-               log.Infof(ctx, "Using specified worker binary: '%v'", 
opt.Worker)
+               log.Infof(ctx, "Using specified worker binary: '%v'", bin)
        }
 
-       token, err := Stage(ctx, prepID, artifactEndpoint, opt.Worker)
+       token, err := Stage(ctx, prepID, artifactEndpoint, bin)
        if err != nil {
                return "", err
        }
diff --git a/sdks/go/pkg/beam/testing/passert/hash.go 
b/sdks/go/pkg/beam/testing/passert/hash.go
new file mode 100644
index 00000000000..2f4f5743e4c
--- /dev/null
+++ b/sdks/go/pkg/beam/testing/passert/hash.go
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package passert
+
+import (
+       "crypto/md5"
+       "encoding/base64"
+       "fmt"
+       "reflect"
+       "sort"
+
+       "github.com/apache/beam/sdks/go/pkg/beam"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*hashFn)(nil)).Elem())
+}
+
+// Hash validates that the incoming PCollection<string> has the given size and
+// base64-encoded MD5 hash code. It buffers the entire PCollection in memory
+// and sorts it for determinism.
+func Hash(s beam.Scope, col beam.PCollection, name, hash string, size int) {
+       s = s.Scope(fmt.Sprintf("passert.Hash(%v)", name))
+
+       keyed := beam.AddFixedKey(s, col)
+       grouped := beam.GroupByKey(s, keyed)
+       beam.ParDo0(s, &hashFn{Name: name, Size: size, Hash: hash}, grouped)
+}
+
+type hashFn struct {
+       Name string `json:"name,omitempty"`
+       Size int    `json:"size,omitempty"`
+       Hash string `json:"hash,omitempty"`
+}
+
+func (f *hashFn) ProcessElement(_ int, lines func(*string) bool) error {
+       var col []string
+       var str string
+       for lines(&str) {
+               col = append(col, str)
+       }
+       sort.Strings(col)
+
+       md5W := md5.New()
+       for _, str := range col {
+               if _, err := md5W.Write([]byte(str)); err != nil {
+                       panic(err) // cannot fail
+               }
+       }
+       hash := base64.StdEncoding.EncodeToString(md5W.Sum(nil))
+
+       if f.Size != len(col) || f.Hash != hash {
+               return fmt.Errorf("passert.Hash(%v) = (%v,%v), want (%v,%v)", 
f.Name, len(col), hash, f.Size, f.Hash)
+       }
+       return nil
+}
diff --git a/sdks/go/pkg/beam/util.go b/sdks/go/pkg/beam/util.go
index 7ad1e053a41..4cabc44acf9 100644
--- a/sdks/go/pkg/beam/util.go
+++ b/sdks/go/pkg/beam/util.go
@@ -32,6 +32,12 @@ func init() {
 // look exactly like the more primitive sources/sinks, but be picked at
 // pipeline construction time.
 
+// NewPipelineWithRoot creates a new empty pipeline and its root scope.
+func NewPipelineWithRoot() (*Pipeline, Scope) {
+       p := NewPipeline()
+       return p, p.Root()
+}
+
 // Seq is a convenience helper to chain single-input/single-output ParDos 
together
 // in a sequence.
 func Seq(s Scope, col PCollection, dofns ...interface{}) PCollection {
diff --git a/sdks/go/pkg/beam/x/hooks/perf/perf.go 
b/sdks/go/pkg/beam/x/hooks/perf/perf.go
index e0a816284e6..6890ec022b2 100644
--- a/sdks/go/pkg/beam/x/hooks/perf/perf.go
+++ b/sdks/go/pkg/beam/x/hooks/perf/perf.go
@@ -49,7 +49,7 @@ func init() {
                enabled := len(enabledProfCaptureHooks) > 0
                var cpuProfBuf bytes.Buffer
                return hooks.Hook{
-                       Req: func(ctx context.Context,  req 
*fnpb.InstructionRequest) (context.Context, error) {
+                       Req: func(ctx context.Context, req 
*fnpb.InstructionRequest) (context.Context, error) {
                                if !enabled || req.GetProcessBundle() == nil {
                                        return ctx, nil
                                }
diff --git a/sdks/go/test/integration/driver.go 
b/sdks/go/test/integration/driver.go
new file mode 100644
index 00000000000..308be63ef71
--- /dev/null
+++ b/sdks/go/test/integration/driver.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// The integration driver provides a suite of tests to run against a 
registered runner.
+package main
+
+import (
+       "context"
+       "flag"
+       "regexp"
+       "sync"
+       "sync/atomic"
+
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/memfs"
+       "github.com/apache/beam/sdks/go/pkg/beam/log"
+       "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+       "github.com/apache/beam/sdks/go/test/integration/wordcount"
+)
+
+var (
+       cpus   = flag.Int("cpus", 10, "Number of tests to run in parallel")
+       filter = flag.String("filter", ".*", "Test filer to run a subset of 
tests")
+)
+
+const old_pond = "memfs://old_pond"
+
+func init() {
+       memfs.Write(old_pond, []byte("old pond \na frog leaps in\nwater's 
sound\n"))
+}
+
+type namedPipeline struct {
+       name string
+       p    *beam.Pipeline
+}
+
+func main() {
+       flag.Parse()
+       beam.Init()
+
+       if *cpus < 1 {
+               *cpus = 1
+       }
+
+       pipelines := []namedPipeline{
+               {"wordcount:memfs", wordcount.WordCount(old_pond, 
"+Qj8iAnV5BI2A4sbzUbb6Q==", 8)},
+               {"wordcount:kinglear", 
wordcount.WordCount("gs://apache-beam-samples/shakespeare/kinglear.txt", 
"7ZCh5ih9m8IW1w+iS8sRKg==", 4749)},
+       }
+
+       re := regexp.MustCompile(*filter)
+
+       ch := make(chan namedPipeline, len(pipelines))
+       for _, np := range pipelines {
+               if re.MatchString(np.name) {
+                       ch <- np
+               }
+       }
+       close(ch)
+
+       ctx := context.Background()
+
+       var failures int32
+       var wg sync.WaitGroup
+       for i := 0; i < *cpus; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for np := range ch {
+                               log.Infof(ctx, "Running %v test ..", np.name)
+
+                               if err := beamx.Run(ctx, np.p); err != nil {
+                                       atomic.AddInt32(&failures, 1)
+                                       log.Errorf(ctx, "Test %v failed: %v", 
np.name, err)
+                               } else {
+                                       log.Infof(ctx, "Test %v completed", 
np.name)
+                               }
+                       }
+               }()
+       }
+       wg.Wait()
+
+       if failures > 0 {
+               log.Exitf(ctx, "Result: %v tests failed", failures)
+       }
+       log.Infof(ctx, "Result: all tests passed!")
+}
diff --git a/sdks/go/examples/windowed_wordcount/wordcount/wordcount.go 
b/sdks/go/test/integration/wordcount/wordcount.go
similarity index 73%
rename from sdks/go/examples/windowed_wordcount/wordcount/wordcount.go
rename to sdks/go/test/integration/wordcount/wordcount.go
index 8fed0d1213e..109b94769bd 100644
--- a/sdks/go/examples/windowed_wordcount/wordcount/wordcount.go
+++ b/sdks/go/test/integration/wordcount/wordcount.go
@@ -13,9 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package wordcount contains the Wordcount transform. It is
-// identical to the wordcount example, but in a separate package
-// to allow sharing and unit testing.
+// Package wordcount contains transforms for wordcount.
 package wordcount
 
 import (
@@ -23,7 +21,11 @@ import (
        "regexp"
        "strings"
 
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
        "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
 )
 
@@ -33,6 +35,20 @@ var (
        lineLen = beam.NewDistribution("extract", "lineLenDistro")
 )
 
+// CountWords is a composite transform that counts the words of a PCollection
+// of lines. It expects a PCollection of type string and returns a PCollection
+// of type KV<string,int>. The Beam type checker enforces these constraints
+// during pipeline construction.
+func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
+       s = s.Scope("CountWords")
+
+       // Convert lines of text into individual words.
+       col := beam.ParDo(s, extractFn, lines)
+
+       // Count the number of times each word occurs.
+       return stats.Count(s, col)
+}
+
 // extractFn is a DoFn that emits the words in a given line.
 func extractFn(ctx context.Context, line string, emit func(string)) {
        lineLen.Update(ctx, int64(len(line)))
@@ -44,16 +60,22 @@ func extractFn(ctx context.Context, line string, emit 
func(string)) {
        }
 }
 
-// CountWords is a composite transform that counts the words of a PCollection
-// of lines. It expects a PCollection of type string and returns a PCollection
-// of type KV<string,int>. The Beam type checker enforces these constraints
-// during pipeline construction.
-func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
-       s = s.Scope("CountWords")
+// Format formats a KV of a word and its count as a string.
+func Format(s beam.Scope, counted beam.PCollection) beam.PCollection {
+       return beam.ParDo(s, formatFn, counted)
+}
 
-       // Convert lines of text into individual words.
-       col := beam.ParDo(s, extractFn, lines)
+func formatFn(w string, c int) string {
+       return fmt.Sprintf("%s: %v", w, c)
+}
 
-       // Count the number of times each word occurs.
-       return stats.Count(s, col)
+// WordCount returns a self-validating wordcount pipeline.
+func WordCount(glob, hash string, size int) *beam.Pipeline {
+       p, s := beam.NewPipelineWithRoot()
+
+       in := textio.Read(s, glob)
+       out := Format(s, CountWords(s, in))
+       passert.Hash(s, out, "out", hash, size)
+
+       return p
 }
diff --git a/sdks/go/test/integration/wordcount/wordcount_test.go 
b/sdks/go/test/integration/wordcount/wordcount_test.go
new file mode 100644
index 00000000000..e86810d63f0
--- /dev/null
+++ b/sdks/go/test/integration/wordcount/wordcount_test.go
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wordcount
+
+import (
+       "strings"
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/memfs"
+       "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+)
+
+func TestWordCount(t *testing.T) {
+       tests := []struct {
+               lines []string
+               words int
+               hash  string
+       }{
+               {
+                       []string{
+                               "foo",
+                       },
+                       1,
+                       "6zZtmVTet7aIhR3wmPE8BA==",
+               },
+               {
+                       []string{
+                               "foo foo foo",
+                               "foo foo",
+                               "foo",
+                       },
+                       1,
+                       "jAk8+k4BOH7vQDUiUZdfWg==",
+               },
+               {
+                       []string{
+                               "bar bar foo bar foo foo",
+                       },
+                       2,
+                       "Nz70m/sn3Ep9o484r7MalQ==",
+               },
+               {
+                       []string{
+                               "foo bar foo bar foo bar",
+                       },
+                       2,
+                       "Nz70m/sn3Ep9o484r7MalQ==", // ordering doesn't matter: 
same hash as above
+               },
+               {
+                       []string{
+                               "",
+                               "bar foo bar",
+                               "  \t ",
+                               " \n\n\n ",
+                               "foo bar",
+                               "       foo",
+                       },
+                       2,
+                       "Nz70m/sn3Ep9o484r7MalQ==", // whitespace doesn't 
matter: same hash as above
+               },
+       }
+
+       for _, test := range tests {
+               const filename = "memfs://input"
+               memfs.Write(filename, []byte(strings.Join(test.lines, "\n")))
+
+               p := WordCount(filename, test.hash, test.words)
+               if err := ptest.Run(p); err != nil {
+                       t.Errorf("WordCount(\"%v\") failed: %v", 
strings.Join(test.lines, "|"), err)
+               }
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 103199)
    Time Spent: 50m  (was: 40m)

> Add Go SDK integration tests
> ----------------------------
>
>                 Key: BEAM-3827
>                 URL: https://issues.apache.org/jira/browse/BEAM-3827
>             Project: Beam
>          Issue Type: Task
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Henning Rohde
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> We should add post-commit testing similar to ValidateRunner tests to ensure 
> that the model is implemented correctly.
> Proposal: 
> https://docs.google.com/document/d/1jy6EE7D4RjgfNV0FhD3rMsT1YKhnUfcHRZMAlC6ygXw/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to