[ 
https://issues.apache.org/jira/browse/BEAM-5378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625483#comment-16625483
 ] 

Tomas Roos commented on BEAM-5378:
----------------------------------

Just wanna mention that the last fix on SideInput's 
https://github.com/apache/beam/commit/e5ef9b50bf539cc035da9eaafe78ba22956dbf2c#diff-f3be3d8ed61ef20108b76e1a0c59541e
  fixed the debugging_wordcount.go. So I'm updating the source issue to reflect 

> Ensure all Go SDK examples run successfully
> -------------------------------------------
>
>                 Key: BEAM-5378
>                 URL: https://issues.apache.org/jira/browse/BEAM-5378
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go
>    Affects Versions: Not applicable
>            Reporter: Tomas Roos
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> I've been spending a day or so running through the example available for the 
> Go SDK in order to see what works and on what runner (direct, dataflow), and 
> what doesn't and here's the results.
> All available examples for the go sdk. For me as a new developer on apache 
> beam and dataflow it would be a tremendous value to have all examples running 
> because many of them have legitimate use-cases behind them. 
> {code:java}
> ├── complete
> │   └── autocomplete
> │       └── autocomplete.go
> ├── contains
> │   └── contains.go
> ├── cookbook
> │   ├── combine
> │   │   └── combine.go
> │   ├── filter
> │   │   └── filter.go
> │   ├── join
> │   │   └── join.go
> │   ├── max
> │   │   └── max.go
> │   └── tornadoes
> │       └── tornadoes.go
> ├── debugging_wordcount
> │   └── debugging_wordcount.go
> ├── forest
> │   └── forest.go
> ├── grades
> │   └── grades.go
> ├── minimal_wordcount
> │   └── minimal_wordcount.go
> ├── multiout
> │   └── multiout.go
> ├── pingpong
> │   └── pingpong.go
> ├── streaming_wordcap
> │   └── wordcap.go
> ├── windowed_wordcount
> │   └── windowed_wordcount.go
> ├── wordcap
> │   └── wordcap.go
> ├── wordcount
> │   └── wordcount.go
> └── yatzy
>     └── yatzy.go
> {code}
> All examples that are supposed to be runnable by the direct driver (not 
> depending on gcp platform services) are runnable.
> On the otherhand these are the tests that needs to be updated because its not 
> runnable on the dataflow platform for various reasons.
> I tried to figure them out and all I can do is to pin point at least where it 
> fails since my knowledge so far in the beam / dataflow internals is limited.
> .
> ├── complete
> │   └── autocomplete
> │       └── autocomplete.go
> Runs successfully if swapping the input to one of the shakespear data files 
> from gs://
> But when running this it yields a error from the top.Largest func (discussed 
> in another issue that top.Largest needs to have a serializeable combinator / 
> accumulator)
> ➜  autocomplete git:(master) ✗ ./autocomplete --project fair-app-213019 
> --runner dataflow --staging_location=gs://fair-app-213019/staging-test2 
> --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
>  
> 2018/09/11 15:35:26 Running autocomplete
> Unable to encode combiner for lifting: failed to encode custom coder: bad 
> underlying type: bad field type: bad element: unencodable type: interface 
> {}2018/09/11 15:35:26 Using running binary as worker binary: './autocomplete'
> 2018/09/11 15:35:26 Staging worker binary: ./autocomplete
> ├── contains
> │   └── contains.go
> Fails when running debug.Head for some mysterious reason, might have to do 
> with the param passing into the x,y iterator. Frankly I dont know and could 
> not figure.
> But removing the debug.Head call everything works as expected and succeeds.
> ├── cookbook
> │   ├── combine
> │   │   └── combine.go
> Fails because of extractFn which is a struct is not registered through the 
> beam.RegisterType (is this a must or not?)
> It works as a work around at least
> ➜  combine git:(master) ✗ ./combine 
> --output=fair-app-213019:combineoutput.test --project=fair-app-213019 
> --runner=dataflow --staging_location=gs://203019-staging/ 
> --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
>  
> 2018/09/11 15:40:50 Running combine
> panic: Failed to serialize 3: ParDo [In(Main): main.WordRow <- {2: 
> main.WordRow/main.WordRow[json] GLO}] -> [Out: KV<string,string> -> {3: 
> KV<string,string>/KV<bytes,bytes> GLO}]: encode: bad userfn: recv type must 
> be registered: *main.extractFn
> │   ├── filter
> │   │   └── filter.go
> Fails go-job-1-1536673624017210012
> 2018-09-11 (15:47:13) Output i0 for step was not found. 
> │   ├── join
> │   │   └── join.go
> Working as expected! Whey!
> │   ├── max
> │   │   └── max.go
> Working!
> │   └── tornadoes
> │       └── tornadoes.go
> Working!
> ├── debugging_wordcount
> │   └── debugging_wordcount.go
> Runs on direct runner but at dataflow this fails with  
> go-job-1-1536840754314770217
> Workflow failed. Causes: 
> S12:AsViewe14_i2/GroupByKeyHashAndSortByKeyAndWindow/Read+AsViewe14_i2/ToIsmRecordForMultimap
>  failed., Unable to rename output files from 
> gs://fair-app-213019/staging-test2/tmp/dax-tmp-2018-09-13_05_12_44-15927590761710593971-S12-0-e83a2d3d2da24ef/tmp-e83a2d3d2da2...@dax.ism
>  to gs://fair-app-213019/staging-test2/tmp/tmp-e83a2d3d2da29b2@*.ism., Unable 
> to rename 
> "gs://fair-app-213019/staging-test2/tmp/dax-tmp-2018-09-13_05_12_44-15927590761710593971-S12-0-e83a2d3d2da24ef/tmp-e83a2d3d2da29b2-shard--try-2a30b4c674ca50f3-endshard.ism"
>  to 
> "gs://fair-app-213019/staging-test2/tmp/tmp-e83a2d3d2da29b2-00002-of-00003.ism".
> RESP: instruction_id: "-189" process_bundle: < metrics: < ptransforms: < key: 
> "-159" value: < processed_elements: < measured: < output_element_counts: < 
> key: "-157" value: 1657 > > > > > ptransforms: < key: "main.filterFn" value: 
> < > > ptransforms: < key: "main.formatFn" value: < > > > >
> RESP: instruction_id: "-191" error: "execution plan for -189 not found" 
> register: < >
> most likely this error belongs to the stats.Count call
> ├── forest
> │   └── forest.go
> Bazinga, all good!
> ├── grades
> │   └── grades.go
> Fails and is yield a error when invoking this on dataflow in combiner lifting 
> encoding  go-job-1-1536841991979521665
> 2018/09/13 14:33:11 Running grades
> Unable to encode combiner for lifting: failed to encode custom coder: bad 
> underlying type: bad field type: bad element: unencodable type: interface 
> {}Unable to encode combiner for lifting: failed to encode custom coder: bad 
> underlying type: bad field type: bad element: unencodable type: interface 
> {}Unable to encode combiner for lifting: failed to encode custom coder: bad 
> underlying type: bad field type: bad element: unencodable type: interface 
> {}Unable to encode combiner for lifting: failed to encode custom coder: bad 
> underlying type: bad field type: bad element: unencodable type: interface 
> {}2018/09/13 14:33:11 Cross-compiling 
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/examples/grades/grades.go
>  as /tmp/worker-1-1536841991982910418
>  java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> received from SDK harness for instruction -303: execute failed: panic: 
> incompatible func type goroutine 47 [running]:
> runtime/debug.Stack(0xc420244a38, 0xc3b160, 0xe96e80)
>       /usr/local/go/src/runtime/debug/stack.go:24 +0xa7
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420244c00)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39
>  +0x6e
> panic(0xc3b160, 0xe96e80)
>       /usr/local/go/src/runtime/panic.go:502 +0x229
> github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.ToFunc2x1(0xea2420,
>  0xc42100f380, 0xdd37ed, 0x11)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:429
>  +0x15d
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*MergeAccumulators).Up(0xc420b1a770,
>  0xea6120, 0xc420f4c240, 0x0, 0xc420244bc0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/combine.go:368
>  +0xb2
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Unit).Up-fm(0xea6120,
>  0xc420f4c240, 0xc420244c00, 0x0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
>  +0x43
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0xea6120,
>  0xc420f4c240, 0xc420244ca0, 0x0, 0x0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
>  +0x6c
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420031030,
>  0xea5f60, 0xc420f4c240, 0xc4210f7648, 0x4, 0xe9c680, 0xc420f4c210, 0xe994c0, 
> 0xc4201e4e00, 0xdfc7c0, ...)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
>  +0x202
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201f2bd0,
>  0xea5f60, 0xc420f4c1b0, 0xc4201e4d80, 0xc420149920)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:197
>  +0x823
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0xea5f60,
>  0xc420149920, 0xc4201e4d80)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:114
>  +0x167
> created by github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:127
>  +0x608
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>       at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
>       at 
> com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274)
>       at 
> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
>       at 
> com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101)
>       at 
> com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:393)
>       at 
> com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:362)
>       at 
> com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:290)
>       at 
> com.google.cloud.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:179)
>       at 
> com.google.cloud.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:107)
>       Suppressed: java.lang.IllegalStateException: Already closed.
>               at 
> org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:95)
>               at 
> com.google.cloud.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:93)
>               at 
> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:89)
>               ... 6 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for 
> instruction -303: execute failed: panic: incompatible func type goroutine 47 
> [running]:
> runtime/debug.Stack(0xc420244a38, 0xc3b160, 0xe96e80)
>       /usr/local/go/src/runtime/debug/stack.go:24 +0xa7
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420244c00)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39
>  +0x6e
> panic(0xc3b160, 0xe96e80)
>       /usr/local/go/src/runtime/panic.go:502 +0x229
> github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.ToFunc2x1(0xea2420,
>  0xc42100f380, 0xdd37ed, 0x11)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:429
>  +0x15d
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*MergeAccumulators).Up(0xc420b1a770,
>  0xea6120, 0xc420f4c240, 0x0, 0xc420244bc0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/combine.go:368
>  +0xb2
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Unit).Up-fm(0xea6120,
>  0xc420f4c240, 0xc420244c00, 0x0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
>  +0x43
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0xea6120,
>  0xc420f4c240, 0xc420244ca0, 0x0, 0x0)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
>  +0x6c
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420031030,
>  0xea5f60, 0xc420f4c240, 0xc4210f7648, 0x4, 0xe9c680, 0xc420f4c210, 0xe994c0, 
> 0xc4201e4e00, 0xdfc7c0, ...)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
>  +0x202
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201f2bd0,
>  0xea5f60, 0xc420f4c1b0, 0xc4201e4d80, 0xc420149920)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:197
>  +0x823
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0xea5f60,
>  0xc420149920, 0xc4201e4d80)
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:114
>  +0x167
> created by github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
>       
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:127
>  +0x608
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
>       at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
>     
> ├── minimal_wordcount
> │   └── minimal_wordcount.go
> Runs only on direct, implemented PR https://github.com/apache/beam/pull/6386
> ├── multiout
> │   └── multiout.go
> Runs like a boss!
> ├── pingpong
> │   └── pingpong.go
> Stating it can't run on dataflow
> // NOTE(herohde) 2/23/2017: Dataflow does not allow cyclic composite 
> structures.
> ├── streaming_wordcap
> │   └── wordcap.go
> Brilliant!
> ├── windowed_wordcount
> │   └── windowed_wordcount.go
> All good!
> ├── wordcap
> │   └── wordcap.go
> Runs fine on direct runner but not on dataflow because of input is local and 
> is Using
> textio.Immediate, hence not able to pass in a gs:// path 
> ├── wordcount
> │   └── wordcount.go
> All good!
> └── yatzy
>     └── yatzy.go
> Fails on dataflow
> go-job-1-1536847665315762634
> error: "execution plan for -50 not found" register: < >



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to