Tomas Roos created BEAM-5378:
--------------------------------

             Summary: Make examples great again
                 Key: BEAM-5378
                 URL: https://issues.apache.org/jira/browse/BEAM-5378
             Project: Beam
          Issue Type: Bug
          Components: sdk-go
    Affects Versions: Not applicable
            Reporter: Tomas Roos
            Assignee: Robert Burke


I've been spending a day or so running through the example available for the Go 
SDK in order to see what works and on what runner (direct, dataflow), and what 
doesn't and here's the results.

All available examples for the go sdk. For me as a new developer on apache beam 
and dataflow it would be a tremendous value to have all examples running 
because many of them have legitimate use-cases behind them. 


{code:java}
├── complete
│   └── autocomplete
│       └── autocomplete.go
├── contains
│   └── contains.go
├── cookbook
│   ├── combine
│   │   └── combine.go
│   ├── filter
│   │   └── filter.go
│   ├── join
│   │   └── join.go
│   ├── max
│   │   └── max.go
│   └── tornadoes
│       └── tornadoes.go
├── debugging_wordcount
│   └── debugging_wordcount.go
├── forest
│   └── forest.go
├── grades
│   └── grades.go
├── minimal_wordcount
│   └── minimal_wordcount.go
├── multiout
│   └── multiout.go
├── pingpong
│   └── pingpong.go
├── streaming_wordcap
│   └── wordcap.go
├── windowed_wordcount
│   └── windowed_wordcount.go
├── wordcap
│   └── wordcap.go
├── wordcount
│   └── wordcount.go
└── yatzy
    └── yatzy.go

{code}

All examples that are supposed to be runnable by the direct driver (not 
depending on gcp platform services) are runnable.

On the otherhand these are the tests that needs to be updated because its not 
runnable on the dataflow platform for various reasons.
I tried to figure them out and all I can do is to pin point at least where it 
fails since my knowledge so far in the beam / dataflow internals is limited.

.
├── complete
│   └── autocomplete
│       └── autocomplete.go

Runs successfully if swapping the input to one of the shakespear data files 
from gs://
But when running this it yields a error from the top.Largest func (discussed in 
another issue that top.Largest needs to have a serializeable combinator / 
accumulator)

➜  autocomplete git:(master) ✗ ./autocomplete --project fair-app-213019 
--runner dataflow --staging_location=gs://fair-app-213019/staging-test2 
--worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
 
2018/09/11 15:35:26 Running autocomplete
Unable to encode combiner for lifting: failed to encode custom coder: bad 
underlying type: bad field type: bad element: unencodable type: interface 
{}2018/09/11 15:35:26 Using running binary as worker binary: './autocomplete'
2018/09/11 15:35:26 Staging worker binary: ./autocomplete


├── contains
│   └── contains.go

Fails when running debug.Head for some mysterious reason, might have to do with 
the param passing into the x,y iterator. Frankly I dont know and could not 
figure.
But removing the debug.Head call everything works as expected and succeeds.

├── cookbook
│   ├── combine
│   │   └── combine.go

Fails because of extractFn which is a struct is not registered through the 
beam.RegisterType (is this a must or not?)
It works as a work around at least

➜  combine git:(master) ✗ ./combine --output=fair-app-213019:combineoutput.test 
--project=fair-app-213019 --runner=dataflow 
--staging_location=gs://203019-staging/ 
--worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
 
2018/09/11 15:40:50 Running combine
panic: Failed to serialize 3: ParDo [In(Main): main.WordRow <- {2: 
main.WordRow/main.WordRow[json] GLO}] -> [Out: KV<string,string> -> {3: 
KV<string,string>/KV<bytes,bytes> GLO}]: encode: bad userfn: recv type must be 
registered: *main.extractFn


│   ├── filter
│   │   └── filter.go

Fails go-job-1-1536673624017210012
2018-09-11 (15:47:13) Output i0 for step was not found. 

│   ├── join
│   │   └── join.go

Working as expected! Whey!

│   ├── max
│   │   └── max.go

Working!

│   └── tornadoes
│       └── tornadoes.go

Working!

├── debugging_wordcount
│   └── debugging_wordcount.go

Runs on direct runner but at dataflow this fails with  
go-job-1-1536840754314770217
Workflow failed. Causes: 
S12:AsViewe14_i2/GroupByKeyHashAndSortByKeyAndWindow/Read+AsViewe14_i2/ToIsmRecordForMultimap
 failed., Unable to rename output files from 
gs://fair-app-213019/staging-test2/tmp/dax-tmp-2018-09-13_05_12_44-15927590761710593971-S12-0-e83a2d3d2da24ef/[email protected]
 to gs://fair-app-213019/staging-test2/tmp/tmp-e83a2d3d2da29b2@*.ism., Unable 
to rename 
"gs://fair-app-213019/staging-test2/tmp/dax-tmp-2018-09-13_05_12_44-15927590761710593971-S12-0-e83a2d3d2da24ef/tmp-e83a2d3d2da29b2-shard--try-2a30b4c674ca50f3-endshard.ism"
 to 
"gs://fair-app-213019/staging-test2/tmp/tmp-e83a2d3d2da29b2-00002-of-00003.ism".
RESP: instruction_id: "-189" process_bundle: < metrics: < ptransforms: < key: 
"-159" value: < processed_elements: < measured: < output_element_counts: < key: 
"-157" value: 1657 > > > > > ptransforms: < key: "main.filterFn" value: < > > 
ptransforms: < key: "main.formatFn" value: < > > > >
RESP: instruction_id: "-191" error: "execution plan for -189 not found" 
register: < >

most likely this error belongs to the stats.Count call

├── forest
│   └── forest.go

Bazinga, all good!

├── grades
│   └── grades.go

Fails and is yield a error when invoking this on dataflow in combiner lifting 
encoding  go-job-1-1536841991979521665

2018/09/13 14:33:11 Running grades
Unable to encode combiner for lifting: failed to encode custom coder: bad 
underlying type: bad field type: bad element: unencodable type: interface 
{}Unable to encode combiner for lifting: failed to encode custom coder: bad 
underlying type: bad field type: bad element: unencodable type: interface 
{}Unable to encode combiner for lifting: failed to encode custom coder: bad 
underlying type: bad field type: bad element: unencodable type: interface 
{}Unable to encode combiner for lifting: failed to encode custom coder: bad 
underlying type: bad field type: bad element: unencodable type: interface 
{}2018/09/13 14:33:11 Cross-compiling 
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/examples/grades/grades.go 
as /tmp/worker-1-1536841991982910418

 java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
received from SDK harness for instruction -303: execute failed: panic: 
incompatible func type goroutine 47 [running]:
runtime/debug.Stack(0xc420244a38, 0xc3b160, 0xe96e80)
        /usr/local/go/src/runtime/debug/stack.go:24 +0xa7
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420244c00)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39
 +0x6e
panic(0xc3b160, 0xe96e80)
        /usr/local/go/src/runtime/panic.go:502 +0x229
github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.ToFunc2x1(0xea2420, 
0xc42100f380, 0xdd37ed, 0x11)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:429
 +0x15d
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*MergeAccumulators).Up(0xc420b1a770,
 0xea6120, 0xc420f4c240, 0x0, 0xc420244bc0)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/combine.go:368
 +0xb2
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Unit).Up-fm(0xea6120,
 0xc420f4c240, 0xc420244c00, 0x0)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
 +0x43
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0xea6120, 
0xc420f4c240, 0xc420244ca0, 0x0, 0x0)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
 +0x6c
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420031030,
 0xea5f60, 0xc420f4c240, 0xc4210f7648, 0x4, 0xe9c680, 0xc420f4c210, 0xe994c0, 
0xc4201e4e00, 0xdfc7c0, ...)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
 +0x202
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201f2bd0,
 0xea5f60, 0xc420f4c1b0, 0xc4201e4d80, 0xc420149920)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:197
 +0x823
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0xea5f60,
 0xc420149920, 0xc4201e4d80)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:114
 +0x167
created by github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:127
 +0x608

        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at 
com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274)
        at 
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        at 
com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101)
        at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:393)
        at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:362)
        at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:290)
        at 
com.google.cloud.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:179)
        at 
com.google.cloud.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:107)
        Suppressed: java.lang.IllegalStateException: Already closed.
                at 
org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:95)
                at 
com.google.cloud.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:93)
                at 
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:89)
                ... 6 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction -303: execute failed: panic: incompatible func type goroutine 47 
[running]:
runtime/debug.Stack(0xc420244a38, 0xc3b160, 0xe96e80)
        /usr/local/go/src/runtime/debug/stack.go:24 +0xa7
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420244c00)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39
 +0x6e
panic(0xc3b160, 0xe96e80)
        /usr/local/go/src/runtime/panic.go:502 +0x229
github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.ToFunc2x1(0xea2420, 
0xc42100f380, 0xdd37ed, 0x11)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:429
 +0x15d
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*MergeAccumulators).Up(0xc420b1a770,
 0xea6120, 0xc420f4c240, 0x0, 0xc420244bc0)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/combine.go:368
 +0xb2
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Unit).Up-fm(0xea6120,
 0xc420f4c240, 0xc420244c00, 0x0)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
 +0x43
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0xea6120, 
0xc420f4c240, 0xc420244ca0, 0x0, 0x0)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
 +0x6c
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420031030,
 0xea5f60, 0xc420f4c240, 0xc4210f7648, 0x4, 0xe9c680, 0xc420f4c210, 0xe994c0, 
0xc4201e4e00, 0xdfc7c0, ...)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:90
 +0x202
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201f2bd0,
 0xea5f60, 0xc420f4c1b0, 0xc4201e4d80, 0xc420149920)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:197
 +0x823
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0xea5f60,
 0xc420149920, 0xc4201e4d80)
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:114
 +0x167
created by github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
        
/home/tomas/dev/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:127
 +0x608

        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
        at 
org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
        at 
org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at 
org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
        at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
        at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    
├── minimal_wordcount
│   └── minimal_wordcount.go

Runs only on direct, implemented PR https://github.com/apache/beam/pull/6386

├── multiout
│   └── multiout.go

Runs like a boss!

├── pingpong
│   └── pingpong.go

Stating it can't run on dataflow
// NOTE(herohde) 2/23/2017: Dataflow does not allow cyclic composite structures.

├── streaming_wordcap
│   └── wordcap.go

Brilliant!

├── windowed_wordcount
│   └── windowed_wordcount.go

All good!

├── wordcap
│   └── wordcap.go

Runs fine on direct runner but not on dataflow because of input is local and is 
Using
textio.Immediate, hence not able to pass in a gs:// path 

├── wordcount
│   └── wordcount.go

All good!

└── yatzy
    └── yatzy.go

Fails on dataflow

go-job-1-1536847665315762634

error: "execution plan for -50 not found" register: < >



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to