[ 
https://issues.apache.org/jira/browse/BEAM-5378?focusedWorklogId=144539&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144539
 ]

ASF GitHub Bot logged work on BEAM-5378:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Sep/18 06:16
            Start Date: 15/Sep/18 06:16
    Worklog Time Spent: 10m 
      Work Description: herohde commented on a change in pull request #6395: 
[BEAM-5378] Update go wordcap example to work on Dataflow runner
URL: https://github.com/apache/beam/pull/6395#discussion_r217877228
 
 

 ##########
 File path: sdks/go/examples/wordcap/wordcap.go
 ##########
 @@ -31,7 +30,7 @@ import (
 )
 
 var (
-       input = flag.String("input", 
os.ExpandEnv("$GOPATH/src/github.com/apache/beam/sdks/go/data/haiku/old_pond.txt"),
 "Files to read.")
+       input = flag.String("input", 
"gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.")
 
 Review comment:
   Let me try to clarify the code here and purpose of this example. It already 
runs on Dataflow. This flag is deliberately local and super-small (old_pond). 
textio.Immediate embeds it into a transforms, so that is a straight-line 
IO-free pipeline that just logs the output of that local file. Its purpose was 
to debug the Go SDK at first and later the Flnk portable runner.
   
   Multiout is in a similar bucket. It may rather make sense to move them out 
of the way of the normal examples?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 144539)
    Time Spent: 1h  (was: 50m)

> Ensure all Go SDK examples run successfully
> -------------------------------------------
>
>                 Key: BEAM-5378
>                 URL: https://issues.apache.org/jira/browse/BEAM-5378
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go
>    Affects Versions: Not applicable
>            Reporter: Tomas Roos
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> I've been spending a day or so running through the example available for the 
> Go SDK in order to see what works and on what runner (direct, dataflow), and 
> what doesn't and here's the results.
> All available examples for the go sdk. For me as a new developer on apache 
> beam and dataflow it would be a tremendous value to have all examples running 
> because many of them have legitimate use-cases behind them. 
> {code:java}
> ├── complete
> │   └── autocomplete
> │       └── autocomplete.go
> ├── contains
> │   └── contains.go
> ├── cookbook
> │   ├── combine
> │   │   └── combine.go
> │   ├── filter
> │   │   └── filter.go
> │   ├── join
> │   │   └── join.go
> │   ├── max
> │   │   └── max.go
> │   └── tornadoes
> │       └── tornadoes.go
> ├── debugging_wordcount
> │   └── debugging_wordcount.go
> ├── forest
> │   └── forest.go
> ├── grades
> │   └── grades.go
> ├── minimal_wordcount
> │   └── minimal_wordcount.go
> ├── multiout
> │   └── multiout.go
> ├── pingpong
> │   └── pingpong.go
> ├── streaming_wordcap
> │   └── wordcap.go
> ├── windowed_wordcount
> │   └── windowed_wordcount.go
> ├── wordcap
> │   └── wordcap.go
> ├── wordcount
> │   └── wordcount.go
> └── yatzy
>     └── yatzy.go
> {code}
> All examples that are supposed to be runnable by the direct driver (not 
> depending on gcp platform services) are runnable.
> On the otherhand these are the tests that needs to be updated because its not 
> runnable on the dataflow platform for various reasons.
> I tried to figure them out and all I can do is to pin point at least where it 
> fails since my knowledge so far in the beam / dataflow internals is limited.
> .
> ├── complete
> │   └── autocomplete
> │       └── autocomplete.go
> Runs successfully if swapping the input to one of the shakespear data files 
> from gs://
> But when running this it yields a error from the top.Largest func (discussed 
> in another issue that top.Largest needs to have a serializeable combinator / 
> accumulator)
> ➜  autocomplete git:(master) ✗ ./autocomplete --project fair-app-213019 
> --runner dataflow --staging_location=gs://fair-app-213019/staging-test2 
> --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
>  
> 2018/09/11 15:35:26 Running autocomplete
> Unable to encode combiner for lifting: failed to encode custom coder: bad 
> underlying type: bad field type: bad element: unencodable type: interface 
> {}2018/09/11 15:35:26 Using running binary as worker binary: './autocomplete'
> 2018/09/11 15:35:26 Staging worker binary: ./autocomplete
> ├── contains
> │   └── contains.go
> Fails when running debug.Head for some mysterious reason, might have to do 
> with the param passing into the x,y iterator. Frankly I dont know and could 
> not figure.
> But removing the debug.Head call everything works as expected and succeeds.
> ├── cookbook
> │   ├── combine
> │   │   └── combine.go
> Fails because of extractFn which is a struct is not registered through the 
> beam.RegisterType (is this a must or not?)
> It works as a work around at least
> ➜  combine git:(master) ✗ ./combine 
> --output=fair-app-213019:combineoutput.test --project=fair-app-213019 
> --runner=dataflow --staging_location=gs://203019-staging/ 
> --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
>  
> 2018/09/11 15:40:50 Running combine
> panic: Failed to serialize 3: ParDo [In(Main): main.WordRow <- {2: 
> main.WordRow/main.WordRow[json] GLO}] -> [Out: KV<string,string> -> {3: 
> KV<string,string>/KV<bytes,bytes> GLO}]: encode: bad userfn: recv type must 
> be registered: *main.extractFn
> │   ├── filter
> │   │   └── filter.go
> Fails go-job-1-1536673624017210012
> 2018-09-11 (15:47:13) Output i0 for step was not found. 
> │   ├── join
> │   │   └── join.go
> Working as expected! Whey!
> │   ├── max
> │   │   └── max.go
> Working!
> │   └── tornadoes
> │       └── tornadoes.go
> Working!
> ├── debugging_wordcount
> │   └── debugging_wordcount.go
> Runs on direct runner but at dataflow this fails with  
> go-job-1-1536840754314770217
> Workflow failed. Causes: 
> S12:AsViewe14_i2/GroupByKeyHashAndSortByKeyAndWindow/Read+AsViewe14_i2/ToIsmRecordForMultimap
>  failed., Unable to rename output files from 
> gs://fair-app-213019/staging-test2/tmp/dax-tmp-2018-09-13_05_12_44-15927590761710593971-S12-0-e83a2d3d2da24ef/tmp-e83a2d3d2da2...@dax.ism
>  to gs://fair-app-213019/staging-test2/tmp/tmp-e83a2d3d2da29b2@*.ism., Unable 
> to rename 
> "gs://fair-app-213019/staging-test2/tmp/dax-tmp-2018-09-13_05_12_44-15927590761710593971-S12-0-e83a2d3d2da24ef/tmp-e83a2d3d2da29b2-shard--try-2a30b4c674ca50f3-endshard.ism"
>  to 
> "gs://fair-app-213019/staging-test2/tmp/tmp-e83a2d3d2da29b2-00002-of-00003.ism".
> RESP: instruction_id: "-189" process_bundle: < metrics: < ptransforms: < key: 
> "-159" value: < processed_elements: < measured: < output_element_counts: < 
> key: "-157" value: 1657 > > > > > ptransforms: < key: "main.filterFn" value: 
> < > > ptransforms: < key: "main.formatFn" value: < > > > >
> RESP: instruction_id: "-191" error: "execution plan for -189 not found" 
> register: < >
> most likely this error belongs to the stats.Count call
> ├── forest
> │   └── forest.go
> Bazinga, all good!
> ├── grades
> │   └── grades.go
> Fails and is yield a error when invoking this on dataflow in combiner lifting 
> encoding  go-job-1-1536841991979521665
> 2018/09/13 14:33:11 Running grades
> Unable to encode combiner for lifting: failed to encode custom coder: bad 
> underlying type: bad field type: bad element: unencodable type: interface 
> {}Unable to encode combiner for lifting: failed to encode custom coder: bad 
> underlying type: bad field type: bad element: unencodable type: interface 
> {}Unable to encode combiner for lifting: failed to encode custom coder: bad 
> underlying type: bad field type: bad element: unencodable type: interface 
> {}Unable to encode combiner for lifting: failed to encode custom coder: bad 
> underlying type: bad field type: bad element: unencodable type: interface 
> {}2018/09/13 14:33:11 Cross-compiling 
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/examples/grades/grades.go
>  as /tmp/worker-1-1536841991982910418
>  java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> received from SDK harness for instruction -303: execute failed: panic: 
> incompatible func type goroutine 47 [running]:
> runtime/debug.Stack(0xc420244a38, 0xc3b160, 0xe96e80)
>       /usr/local/go/src/runtime/debug/stack.go:24 +0xa7
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420244c00)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39
>  +0x6e
> panic(0xc3b160, 0xe96e80)
>       /usr/local/go/src/runtime/panic.go:502 +0x229
> github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.ToFunc2x1(0xea2420,
>  0xc42100f380, 0xdd37ed, 0x11)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:429
>  +0x15d
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*MergeAccumulators).Up(0xc420b1a770,
>  0xea6120, 0xc420f4c240, 0x0, 0xc420244bc0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/combine.go:368
>  +0xb2
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Unit).Up-fm(0xea6120,
>  0xc420f4c240, 0xc420244c00, 0x0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
>  +0x43
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0xea6120,
>  0xc420f4c240, 0xc420244ca0, 0x0, 0x0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
>  +0x6c
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420031030,
>  0xea5f60, 0xc420f4c240, 0xc4210f7648, 0x4, 0xe9c680, 0xc420f4c210, 0xe994c0, 
> 0xc4201e4e00, 0xdfc7c0, ...)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
>  +0x202
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201f2bd0,
>  0xea5f60, 0xc420f4c1b0, 0xc4201e4d80, 0xc420149920)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:197
>  +0x823
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0xea5f60,
>  0xc420149920, 0xc4201e4d80)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:114
>  +0x167
> created by github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:127
>  +0x608
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>       at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
>       at 
> com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274)
>       at 
> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
>       at 
> com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101)
>       at 
> com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:393)
>       at 
> com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:362)
>       at 
> com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:290)
>       at 
> com.google.cloud.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:179)
>       at 
> com.google.cloud.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:107)
>       Suppressed: java.lang.IllegalStateException: Already closed.
>               at 
> org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:95)
>               at 
> com.google.cloud.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:93)
>               at 
> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:89)
>               ... 6 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for 
> instruction -303: execute failed: panic: incompatible func type goroutine 47 
> [running]:
> runtime/debug.Stack(0xc420244a38, 0xc3b160, 0xe96e80)
>       /usr/local/go/src/runtime/debug/stack.go:24 +0xa7
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420244c00)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39
>  +0x6e
> panic(0xc3b160, 0xe96e80)
>       /usr/local/go/src/runtime/panic.go:502 +0x229
> github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.ToFunc2x1(0xea2420,
>  0xc42100f380, 0xdd37ed, 0x11)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:429
>  +0x15d
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*MergeAccumulators).Up(0xc420b1a770,
>  0xea6120, 0xc420f4c240, 0x0, 0xc420244bc0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/combine.go:368
>  +0xb2
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Unit).Up-fm(0xea6120,
>  0xc420f4c240, 0xc420244c00, 0x0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
>  +0x43
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0xea6120,
>  0xc420f4c240, 0xc420244ca0, 0x0, 0x0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
>  +0x6c
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420031030,
>  0xea5f60, 0xc420f4c240, 0xc4210f7648, 0x4, 0xe9c680, 0xc420f4c210, 0xe994c0, 
> 0xc4201e4e00, 0xdfc7c0, ...)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
>  +0x202
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201f2bd0,
>  0xea5f60, 0xc420f4c1b0, 0xc4201e4d80, 0xc420149920)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:197
>  +0x823
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0xea5f60,
>  0xc420149920, 0xc4201e4d80)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:114
>  +0x167
> created by github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:127
>  +0x608
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
>     
> ├── minimal_wordcount
> │   └── minimal_wordcount.go
> Runs only on direct, implemented PR https://github.com/apache/beam/pull/6386
> ├── multiout
> │   └── multiout.go
> Runs like a boss!
> ├── pingpong
> │   └── pingpong.go
> Stating it can't run on dataflow
> // NOTE(herohde) 2/23/2017: Dataflow does not allow cyclic composite 
> structures.
> ├── streaming_wordcap
> │   └── wordcap.go
> Brilliant!
> ├── windowed_wordcount
> │   └── windowed_wordcount.go
> All good!
> ├── wordcap
> │   └── wordcap.go
> Runs fine on direct runner but not on dataflow because of input is local and 
> is Using
> textio.Immediate, hence not able to pass in a gs:// path 
> ├── wordcount
> │   └── wordcount.go
> All good!
> └── yatzy
>     └── yatzy.go
> Fails on dataflow
> go-job-1-1536847665315762634
> error: "execution plan for -50 not found" register: < >



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to