[
https://issues.apache.org/jira/browse/BEAM-5378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Burke reassigned BEAM-5378:
----------------------------------
Assignee: (was: Robert Burke)
> Ensure all Go SDK examples run on all runners
> ---------------------------------------------
>
> Key: BEAM-5378
> URL: https://issues.apache.org/jira/browse/BEAM-5378
> Project: Beam
> Issue Type: Bug
> Components: sdk-go
> Affects Versions: Not applicable
> Reporter: Tomas Roos
> Priority: Major
>
> I've been spending a day or so running through the example available for the
> Go SDK in order to see what works and on what runner (direct, dataflow), and
> what doesn't and here's the results.
> All available examples for the go sdk. For me as a new developer on apache
> beam and dataflow it would be a tremendous value to have all examples running
> because many of them have legitimate use-cases behind them.
> {code:java}
> ├── complete
> │ └── autocomplete
> │ └── autocomplete.go
> ├── contains
> │ └── contains.go
> ├── cookbook
> │ ├── combine
> │ │ └── combine.go
> │ ├── filter
> │ │ └── filter.go
> │ ├── join
> │ │ └── join.go
> │ ├── max
> │ │ └── max.go
> │ └── tornadoes
> │ └── tornadoes.go
> ├── debugging_wordcount
> │ └── debugging_wordcount.go
> ├── forest
> │ └── forest.go
> ├── grades
> │ └── grades.go
> ├── minimal_wordcount
> │ └── minimal_wordcount.go
> ├── multiout
> │ └── multiout.go
> ├── pingpong
> │ └── pingpong.go
> ├── streaming_wordcap
> │ └── wordcap.go
> ├── windowed_wordcount
> │ └── windowed_wordcount.go
> ├── wordcap
> │ └── wordcap.go
> ├── wordcount
> │ └── wordcount.go
> └── yatzy
> └── yatzy.go
> {code}
> All examples that are supposed to be runnable by the direct driver (not
> depending on gcp platform services) are runnable.
> On the otherhand these are the tests that needs to be updated because its not
> runnable on the dataflow platform for various reasons.
> I tried to figure them out and all I can do is to pin point at least where it
> fails since my knowledge so far in the beam / dataflow internals is limited.
> .
> ├── complete
> │ └── autocomplete
> │ └── autocomplete.go
> Runs successfully if swapping the input to one of the shakespear data files
> from gs://
> But when running this it yields a error from the top.Largest func (discussed
> in another issue that top.Largest needs to have a serializeable combinator /
> accumulator)
> ➜ autocomplete git:(master) ✗ ./autocomplete --project fair-app-213019
> --runner dataflow --staging_location=gs://fair-app-213019/staging-test2
> --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
>
> 2018/09/11 15:35:26 Running autocomplete
> Unable to encode combiner for lifting: failed to encode custom coder: bad
> underlying type: bad field type: bad element: unencodable type: interface
> {}2018/09/11 15:35:26 Using running binary as worker binary: './autocomplete'
> 2018/09/11 15:35:26 Staging worker binary: ./autocomplete
> ├── contains
> │ └── contains.go
> Fails when running debug.Head for some mysterious reason, might have to do
> with the param passing into the x,y iterator. Frankly I dont know and could
> not figure.
> But removing the debug.Head call everything works as expected and succeeds.
> ├── cookbook
> │ ├── combine
> │ │ └── combine.go
> Fails because of extractFn which is a struct is not registered through the
> beam.RegisterType (is this a must or not?)
> It works as a work around at least
> ➜ combine git:(master) ✗ ./combine
> --output=fair-app-213019:combineoutput.test --project=fair-app-213019
> --runner=dataflow --staging_location=gs://203019-staging/
> --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
>
> 2018/09/11 15:40:50 Running combine
> panic: Failed to serialize 3: ParDo [In(Main): main.WordRow <- {2:
> main.WordRow/main.WordRow[json] GLO}] -> [Out: KV<string,string> -> {3:
> KV<string,string>/KV<bytes,bytes> GLO}]: encode: bad userfn: recv type must
> be registered: *main.extractFn
> │ ├── filter
> │ │ └── filter.go
> Fails go-job-1-1536673624017210012
> 2018-09-11 (15:47:13) Output i0 for step was not found.
> │ ├── join
> │ │ └── join.go
> Working as expected! Whey!
> │ ├── max
> │ │ └── max.go
> Working!
> │ └── tornadoes
> │ └── tornadoes.go
> Working!
> ├── debugging_wordcount
> │ └── debugging_wordcount.go
> Runs on direct runner but at dataflow this fails with
> go-job-1-1536840754314770217
> Workflow failed. Causes:
> S12:AsViewe14_i2/GroupByKeyHashAndSortByKeyAndWindow/Read+AsViewe14_i2/ToIsmRecordForMultimap
> failed., Unable to rename output files from
> gs://fair-app-213019/staging-test2/tmp/dax-tmp-2018-09-13_05_12_44-15927590761710593971-S12-0-e83a2d3d2da24ef/[email protected]
> to gs://fair-app-213019/staging-test2/tmp/tmp-e83a2d3d2da29b2@*.ism., Unable
> to rename
> "gs://fair-app-213019/staging-test2/tmp/dax-tmp-2018-09-13_05_12_44-15927590761710593971-S12-0-e83a2d3d2da24ef/tmp-e83a2d3d2da29b2-shard--try-2a30b4c674ca50f3-endshard.ism"
> to
> "gs://fair-app-213019/staging-test2/tmp/tmp-e83a2d3d2da29b2-00002-of-00003.ism".
> RESP: instruction_id: "-189" process_bundle: < metrics: < ptransforms: < key:
> "-159" value: < processed_elements: < measured: < output_element_counts: <
> key: "-157" value: 1657 > > > > > ptransforms: < key: "main.filterFn" value:
> < > > ptransforms: < key: "main.formatFn" value: < > > > >
> RESP: instruction_id: "-191" error: "execution plan for -189 not found"
> register: < >
> most likely this error belongs to the stats.Count call
> ├── forest
> │ └── forest.go
> Bazinga, all good!
> ├── grades
> │ └── grades.go
> Fails and is yield a error when invoking this on dataflow in combiner lifting
> encoding go-job-1-1536841991979521665
> 2018/09/13 14:33:11 Running grades
> Unable to encode combiner for lifting: failed to encode custom coder: bad
> underlying type: bad field type: bad element: unencodable type: interface
> {}Unable to encode combiner for lifting: failed to encode custom coder: bad
> underlying type: bad field type: bad element: unencodable type: interface
> {}Unable to encode combiner for lifting: failed to encode custom coder: bad
> underlying type: bad field type: bad element: unencodable type: interface
> {}Unable to encode combiner for lifting: failed to encode custom coder: bad
> underlying type: bad field type: bad element: unencodable type: interface
> {}2018/09/13 14:33:11 Cross-compiling
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/examples/grades/grades.go
> as /tmp/worker-1-1536841991982910418
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
> received from SDK harness for instruction -303: execute failed: panic:
> incompatible func type goroutine 47 [running]:
> runtime/debug.Stack(0xc420244a38, 0xc3b160, 0xe96e80)
> /usr/local/go/src/runtime/debug/stack.go:24 +0xa7
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420244c00)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39
> +0x6e
> panic(0xc3b160, 0xe96e80)
> /usr/local/go/src/runtime/panic.go:502 +0x229
> github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.ToFunc2x1(0xea2420,
> 0xc42100f380, 0xdd37ed, 0x11)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:429
> +0x15d
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*MergeAccumulators).Up(0xc420b1a770,
> 0xea6120, 0xc420f4c240, 0x0, 0xc420244bc0)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/combine.go:368
> +0xb2
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Unit).Up-fm(0xea6120,
> 0xc420f4c240, 0xc420244c00, 0x0)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
> +0x43
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0xea6120,
> 0xc420f4c240, 0xc420244ca0, 0x0, 0x0)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
> +0x6c
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420031030,
> 0xea5f60, 0xc420f4c240, 0xc4210f7648, 0x4, 0xe9c680, 0xc420f4c210, 0xe994c0,
> 0xc4201e4e00, 0xdfc7c0, ...)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
> +0x202
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201f2bd0,
> 0xea5f60, 0xc420f4c1b0, 0xc4201e4d80, 0xc420149920)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:197
> +0x823
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0xea5f60,
> 0xc420149920, 0xc4201e4d80)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:114
> +0x167
> created by github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:127
> +0x608
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
> at
> com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274)
> at
> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
> at
> com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101)
> at
> com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:393)
> at
> com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:362)
> at
> com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:290)
> at
> com.google.cloud.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:179)
> at
> com.google.cloud.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:107)
> Suppressed: java.lang.IllegalStateException: Already closed.
> at
> org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:95)
> at
> com.google.cloud.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:93)
> at
> com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:89)
> ... 6 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction -303: execute failed: panic: incompatible func type goroutine 47
> [running]:
> runtime/debug.Stack(0xc420244a38, 0xc3b160, 0xe96e80)
> /usr/local/go/src/runtime/debug/stack.go:24 +0xa7
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420244c00)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39
> +0x6e
> panic(0xc3b160, 0xe96e80)
> /usr/local/go/src/runtime/panic.go:502 +0x229
> github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.ToFunc2x1(0xea2420,
> 0xc42100f380, 0xdd37ed, 0x11)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:429
> +0x15d
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*MergeAccumulators).Up(0xc420b1a770,
> 0xea6120, 0xc420f4c240, 0x0, 0xc420244bc0)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/combine.go:368
> +0xb2
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Unit).Up-fm(0xea6120,
> 0xc420f4c240, 0xc420244c00, 0x0)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
> +0x43
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0xea6120,
> 0xc420f4c240, 0xc420244ca0, 0x0, 0x0)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
> +0x6c
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420031030,
> 0xea5f60, 0xc420f4c240, 0xc4210f7648, 0x4, 0xe9c680, 0xc420f4c210, 0xe994c0,
> 0xc4201e4e00, 0xdfc7c0, ...)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
> +0x202
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201f2bd0,
> 0xea5f60, 0xc420f4c1b0, 0xc4201e4d80, 0xc420149920)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:197
> +0x823
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0xea5f60,
> 0xc420149920, 0xc4201e4d80)
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:114
> +0x167
> created by github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
>
> /home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:127
> +0x608
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
> at
> org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
> at
> org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
> at
> org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
> at
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
> at
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
> at
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> ├── minimal_wordcount
> │ └── minimal_wordcount.go
> Runs only on direct, implemented PR https://github.com/apache/beam/pull/6386
> ├── multiout
> │ └── multiout.go
> Runs like a boss!
> ├── pingpong
> │ └── pingpong.go
> Stating it can't run on dataflow
> // NOTE(herohde) 2/23/2017: Dataflow does not allow cyclic composite
> structures.
> ├── streaming_wordcap
> │ └── wordcap.go
> Brilliant!
> ├── windowed_wordcount
> │ └── windowed_wordcount.go
> All good!
> ├── wordcap
> │ └── wordcap.go
> Runs fine on direct runner but not on dataflow because of input is local and
> is Using
> textio.Immediate, hence not able to pass in a gs:// path
> ├── wordcount
> │ └── wordcount.go
> All good!
> └── yatzy
> └── yatzy.go
> Fails on dataflow
> go-job-1-1536847665315762634
> error: "execution plan for -50 not found" register: < >
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)