[ https://issues.apache.org/jira/browse/BEAM-5378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625483#comment-16625483 ]
Tomas Roos commented on BEAM-5378: ---------------------------------- Just wanna mention that the last fix on SideInput's https://github.com/apache/beam/commit/e5ef9b50bf539cc035da9eaafe78ba22956dbf2c#diff-f3be3d8ed61ef20108b76e1a0c59541e fixed the debugging_wordcount.go. So I'm updating the source issue to reflect > Ensure all Go SDK examples run successfully > ------------------------------------------- > > Key: BEAM-5378 > URL: https://issues.apache.org/jira/browse/BEAM-5378 > Project: Beam > Issue Type: Bug > Components: sdk-go > Affects Versions: Not applicable > Reporter: Tomas Roos > Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > I've been spending a day or so running through the example available for the > Go SDK in order to see what works and on what runner (direct, dataflow), and > what doesn't and here's the results. > All available examples for the go sdk. For me as a new developer on apache > beam and dataflow it would be a tremendous value to have all examples running > because many of them have legitimate use-cases behind them. > {code:java} > ├── complete > │ └── autocomplete > │ └── autocomplete.go > ├── contains > │ └── contains.go > ├── cookbook > │ ├── combine > │ │ └── combine.go > │ ├── filter > │ │ └── filter.go > │ ├── join > │ │ └── join.go > │ ├── max > │ │ └── max.go > │ └── tornadoes > │ └── tornadoes.go > ├── debugging_wordcount > │ └── debugging_wordcount.go > ├── forest > │ └── forest.go > ├── grades > │ └── grades.go > ├── minimal_wordcount > │ └── minimal_wordcount.go > ├── multiout > │ └── multiout.go > ├── pingpong > │ └── pingpong.go > ├── streaming_wordcap > │ └── wordcap.go > ├── windowed_wordcount > │ └── windowed_wordcount.go > ├── wordcap > │ └── wordcap.go > ├── wordcount > │ └── wordcount.go > └── yatzy > └── yatzy.go > {code} > All examples that are supposed to be runnable by the direct driver (not > depending on gcp platform services) are runnable. > On the otherhand these are the tests that needs to be updated because its not > runnable on the dataflow platform for various reasons. > I tried to figure them out and all I can do is to pin point at least where it > fails since my knowledge so far in the beam / dataflow internals is limited. > . > ├── complete > │ └── autocomplete > │ └── autocomplete.go > Runs successfully if swapping the input to one of the shakespear data files > from gs:// > But when running this it yields a error from the top.Largest func (discussed > in another issue that top.Largest needs to have a serializeable combinator / > accumulator) > ➜ autocomplete git:(master) ✗ ./autocomplete --project fair-app-213019 > --runner dataflow --staging_location=gs://fair-app-213019/staging-test2 > --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515 > > 2018/09/11 15:35:26 Running autocomplete > Unable to encode combiner for lifting: failed to encode custom coder: bad > underlying type: bad field type: bad element: unencodable type: interface > {}2018/09/11 15:35:26 Using running binary as worker binary: './autocomplete' > 2018/09/11 15:35:26 Staging worker binary: ./autocomplete > ├── contains > │ └── contains.go > Fails when running debug.Head for some mysterious reason, might have to do > with the param passing into the x,y iterator. Frankly I dont know and could > not figure. > But removing the debug.Head call everything works as expected and succeeds. > ├── cookbook > │ ├── combine > │ │ └── combine.go > Fails because of extractFn which is a struct is not registered through the > beam.RegisterType (is this a must or not?) > It works as a work around at least > ➜ combine git:(master) ✗ ./combine > --output=fair-app-213019:combineoutput.test --project=fair-app-213019 > --runner=dataflow --staging_location=gs://203019-staging/ > --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515 > > 2018/09/11 15:40:50 Running combine > panic: Failed to serialize 3: ParDo [In(Main): main.WordRow <- {2: > main.WordRow/main.WordRow[json] GLO}] -> [Out: KV<string,string> -> {3: > KV<string,string>/KV<bytes,bytes> GLO}]: encode: bad userfn: recv type must > be registered: *main.extractFn > │ ├── filter > │ │ └── filter.go > Fails go-job-1-1536673624017210012 > 2018-09-11 (15:47:13) Output i0 for step was not found. > │ ├── join > │ │ └── join.go > Working as expected! Whey! > │ ├── max > │ │ └── max.go > Working! > │ └── tornadoes > │ └── tornadoes.go > Working! > ├── debugging_wordcount > │ └── debugging_wordcount.go > Runs on direct runner but at dataflow this fails with > go-job-1-1536840754314770217 > Workflow failed. Causes: > S12:AsViewe14_i2/GroupByKeyHashAndSortByKeyAndWindow/Read+AsViewe14_i2/ToIsmRecordForMultimap > failed., Unable to rename output files from > gs://fair-app-213019/staging-test2/tmp/dax-tmp-2018-09-13_05_12_44-15927590761710593971-S12-0-e83a2d3d2da24ef/tmp-e83a2d3d2da2...@dax.ism > to gs://fair-app-213019/staging-test2/tmp/tmp-e83a2d3d2da29b2@*.ism., Unable > to rename > "gs://fair-app-213019/staging-test2/tmp/dax-tmp-2018-09-13_05_12_44-15927590761710593971-S12-0-e83a2d3d2da24ef/tmp-e83a2d3d2da29b2-shard--try-2a30b4c674ca50f3-endshard.ism" > to > "gs://fair-app-213019/staging-test2/tmp/tmp-e83a2d3d2da29b2-00002-of-00003.ism". > RESP: instruction_id: "-189" process_bundle: < metrics: < ptransforms: < key: > "-159" value: < processed_elements: < measured: < output_element_counts: < > key: "-157" value: 1657 > > > > > ptransforms: < key: "main.filterFn" value: > < > > ptransforms: < key: "main.formatFn" value: < > > > > > RESP: instruction_id: "-191" error: "execution plan for -189 not found" > register: < > > most likely this error belongs to the stats.Count call > ├── forest > │ └── forest.go > Bazinga, all good! > ├── grades > │ └── grades.go > Fails and is yield a error when invoking this on dataflow in combiner lifting > encoding go-job-1-1536841991979521665 > 2018/09/13 14:33:11 Running grades > Unable to encode combiner for lifting: failed to encode custom coder: bad > underlying type: bad field type: bad element: unencodable type: interface > {}Unable to encode combiner for lifting: failed to encode custom coder: bad > underlying type: bad field type: bad element: unencodable type: interface > {}Unable to encode combiner for lifting: failed to encode custom coder: bad > underlying type: bad field type: bad element: unencodable type: interface > {}Unable to encode combiner for lifting: failed to encode custom coder: bad > underlying type: bad field type: bad element: unencodable type: interface > {}2018/09/13 14:33:11 Cross-compiling > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/examples/grades/grades.go > as /tmp/worker-1-1536841991982910418 > java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error > received from SDK harness for instruction -303: execute failed: panic: > incompatible func type goroutine 47 [running]: > runtime/debug.Stack(0xc420244a38, 0xc3b160, 0xe96e80) > /usr/local/go/src/runtime/debug/stack.go:24 +0xa7 > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420244c00) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39 > +0x6e > panic(0xc3b160, 0xe96e80) > /usr/local/go/src/runtime/panic.go:502 +0x229 > github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.ToFunc2x1(0xea2420, > 0xc42100f380, 0xdd37ed, 0x11) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:429 > +0x15d > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*MergeAccumulators).Up(0xc420b1a770, > 0xea6120, 0xc420f4c240, 0x0, 0xc420244bc0) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/combine.go:368 > +0xb2 > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Unit).Up-fm(0xea6120, > 0xc420f4c240, 0xc420244c00, 0x0) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90 > +0x43 > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0xea6120, > 0xc420f4c240, 0xc420244ca0, 0x0, 0x0) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42 > +0x6c > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420031030, > 0xea5f60, 0xc420f4c240, 0xc4210f7648, 0x4, 0xe9c680, 0xc420f4c210, 0xe994c0, > 0xc4201e4e00, 0xdfc7c0, ...) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90 > +0x202 > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201f2bd0, > 0xea5f60, 0xc420f4c1b0, 0xc4201e4d80, 0xc420149920) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:197 > +0x823 > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0xea5f60, > 0xc420149920, 0xc4201e4d80) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:114 > +0x167 > created by github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:127 > +0x608 > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) > at > com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274) > at > com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) > at > com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101) > at > com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:393) > at > com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:362) > at > com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:290) > at > com.google.cloud.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:179) > at > com.google.cloud.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:107) > Suppressed: java.lang.IllegalStateException: Already closed. > at > org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:95) > at > com.google.cloud.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:93) > at > com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:89) > ... 6 more > Caused by: java.lang.RuntimeException: Error received from SDK harness for > instruction -303: execute failed: panic: incompatible func type goroutine 47 > [running]: > runtime/debug.Stack(0xc420244a38, 0xc3b160, 0xe96e80) > /usr/local/go/src/runtime/debug/stack.go:24 +0xa7 > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420244c00) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39 > +0x6e > panic(0xc3b160, 0xe96e80) > /usr/local/go/src/runtime/panic.go:502 +0x229 > github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.ToFunc2x1(0xea2420, > 0xc42100f380, 0xdd37ed, 0x11) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:429 > +0x15d > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*MergeAccumulators).Up(0xc420b1a770, > 0xea6120, 0xc420f4c240, 0x0, 0xc420244bc0) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/combine.go:368 > +0xb2 > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Unit).Up-fm(0xea6120, > 0xc420f4c240, 0xc420244c00, 0x0) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90 > +0x43 > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0xea6120, > 0xc420f4c240, 0xc420244ca0, 0x0, 0x0) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42 > +0x6c > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420031030, > 0xea5f60, 0xc420f4c240, 0xc4210f7648, 0x4, 0xe9c680, 0xc420f4c210, 0xe994c0, > 0xc4201e4e00, 0xdfc7c0, ...) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90 > +0x202 > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201f2bd0, > 0xea5f60, 0xc420f4c1b0, 0xc4201e4d80, 0xc420149920) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:197 > +0x823 > github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0xea5f60, > 0xc420149920, 0xc4201e4d80) > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:114 > +0x167 > created by github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main > > /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:127 > +0x608 > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140) > at > org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248) > at > org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) > at > org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) > at > org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263) > at > org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683) > at > org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > ├── minimal_wordcount > │ └── minimal_wordcount.go > Runs only on direct, implemented PR https://github.com/apache/beam/pull/6386 > ├── multiout > │ └── multiout.go > Runs like a boss! > ├── pingpong > │ └── pingpong.go > Stating it can't run on dataflow > // NOTE(herohde) 2/23/2017: Dataflow does not allow cyclic composite > structures. > ├── streaming_wordcap > │ └── wordcap.go > Brilliant! > ├── windowed_wordcount > │ └── windowed_wordcount.go > All good! > ├── wordcap > │ └── wordcap.go > Runs fine on direct runner but not on dataflow because of input is local and > is Using > textio.Immediate, hence not able to pass in a gs:// path > ├── wordcount > │ └── wordcount.go > All good! > └── yatzy > └── yatzy.go > Fails on dataflow > go-job-1-1536847665315762634 > error: "execution plan for -50 not found" register: < > -- This message was sent by Atlassian JIRA (v7.6.3#76005)