ronoaldo commented on issue #22931: URL: https://github.com/apache/beam/issues/22931#issuecomment-1234779435
I'm indeed curious about what I am missing from docs regarding calling xlang transforms. Following the chapter 13 of Beam Programming Model guide, I wrote three Pipelines, one on each language. The Python and Java one expose their SplitWords implementation to an expansion service. I can run each pipeline, both locally and on Google Cloud Dataflow remote worker, to make sure they all are properly working by their own. Each main has some arguments to allow for a cross language call configuration. I have then tested several combinations of xlang calls and only one worked: * Calling the Python implementation from Go fails * Calling the Java implementation from Go also fails, but with a different error. * Calling the Java implementation from Python works! I have not tested yet calling Python implementation from Java. So, to summarize, my experiments yielded these results: | | Java | Python | Go | | ------------------| -------| ------------| -----| | Java | - | Unstested | Unsupported | | Python| Works | - | Unsupported | | Go | Fail | Fail | - | This is my Go code: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/go/pipeline.go This is my Python code: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/pipeline.py This is my Java Pipeilne: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/java/src/main/java/com/ronoaldo/WordCountPipeline.java and my Java exported PTransform: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/java/src/main/java/com/ronoaldo/SplitWordsFromJava.java Am my missing something? This is the Go output after I launch the Java expansion server: ``` 2022/09/01 18:02:49 Using external transform SplitWordsFromJava at localhost:12345 2022/09/01 18:02:50 Executing pipeline with the direct runner. 2022/09/01 18:02:50 Pipeline: 2022/09/01 18:02:50 Nodes: {1: []uint8/bytes GLO} {2: string/string GLO} {3: string/string GLO} {4: KV<string,int64>/KV<string,varint> GLO} {5: string/string GLO} {6: string/string GLO} {7: KV<string,int>/KV<string,int[varintz]> GLO} {8: CoGBK<string,int>/CoGBK<string,int[varintz]> GLO} {9: KV<string,int>/KV<string,int[varintz]> GLO} {10: main.CountedWord/R[main.CountedWord] GLO} {11: KV<int,main.CountedWord>/KV<int[varintz],R[main.CountedWord]> GLO} {12: CoGBK<int,main.CountedWord>/CoGBK<int[varintz],R[main.CountedWord]> GLO} {13: KV<int,[]main.CountedWord>/KV<int[varintz],[]main.CountedWord[json]> GLO} {14: []main.CountedWord/[]main.CountedWord[json] GLO} {15: string/string GLO} {16: KV<int,string>/KV<int[varintz],string> GLO} {17: CoGBK<int,string>/CoGBK<int[varintz],string> GLO} Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}] 2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2: string/string GLO}] 3: ParDo [In(Main): string <- {2: string/string GLO}] -> [Out: string -> {3: string/string GLO}] 4: ParDo [In(Main): string <- {3: string/string GLO}] -> [Out: KV<string,int64> -> {4: KV<string,int64>/KV<string,varint> GLO}] 5: ParDo [In(Main): KV<string,int64> <- {4: KV<string,int64>/KV<string,varint> GLO}] -> [Out: string -> {5: string/string GLO}] 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}] 7: ParDo [In(Main): T <- {6: string/string GLO}] -> [Out: KV<T,int> -> {7: KV<string,int>/KV<string,int[varintz]> GLO}] 8: CoGBK [In(Main): KV<string,int> <- {7: KV<string,int>/KV<string,int[varintz]> GLO}] -> [Out: CoGBK<string,int> -> {8: CoGBK<string,int>/CoGBK<string,int[varintz]> GLO}] 9: Combine [In(Main): int <- {8: CoGBK<string,int>/CoGBK<string,int[varintz]> GLO}] -> [Out: KV<string,int> -> {9: KV<string,int>/KV<string,int[varintz]> GLO}] 10: ParDo [In(Main): KV<string,int> <- {9: KV<string,int>/KV<string,int[varintz]> GLO}] -> [Out: main.CountedWord -> {10: main.CountedWord/R[main.CountedWord] GLO}] 11: ParDo [In(Main): T <- {10: main.CountedWord/R[main.CountedWord] GLO}] -> [Out: KV<int,T> -> {11: KV<int,main.CountedWord>/KV<int[varintz],R[main.CountedWord]> GLO}] 12: CoGBK [In(Main): KV<int,main.CountedWord> <- {11: KV<int,main.CountedWord>/KV<int[varintz],R[main.CountedWord]> GLO}] -> [Out: CoGBK<int,main.CountedWord> -> {12: CoGBK<int,main.CountedWord>/CoGBK<int[varintz],R[main.CountedWord]> GLO}] 13: Combine [In(Main): T <- {12: CoGBK<int,main.CountedWord>/CoGBK<int[varintz],R[main.CountedWord]> GLO}] -> [Out: KV<int,[]T> -> {13: KV<int,[]main.CountedWord>/KV<int[varintz],[]main.CountedWord[json]> GLO}] 14: ParDo [In(Main): KV<X,Y> <- {13: KV<int,[]main.CountedWord>/KV<int[varintz],[]main.CountedWord[json]> GLO}] -> [Out: Y -> {14: []main.CountedWord/[]main.CountedWord[json] GLO}] 15: ParDo [In(Main): []main.CountedWord <- {14: []main.CountedWord/[]main.CountedWord[json] GLO}] -> [Out: string -> {15: string/string GLO}] 16: ParDo [In(Main): T <- {15: string/string GLO}] -> [Out: KV<int,T> -> {16: KV<int,string>/KV<int[varintz],string> GLO}] 17: CoGBK [In(Main): KV<int,string> <- {16: KV<int,string>/KV<int[varintz],string> GLO}] -> [Out: CoGBK<int,string> -> {17: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}] 18: ParDo [In(Main): CoGBK<int,string> <- {17: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}] -> [] 2022/09/01 18:02:50 Failed to execute job: translation failed caused by: unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}] panic: Failed to execute job: translation failed caused by: unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}] goroutine 1 [running]: github.com/apache/beam/sdks/v2/go/pkg/beam/log.Fatalf({0x114d0e0, 0xc0000420e8}, {0xfeb8b2?, 0x18?}, {0xc0006bff60?, 0x0?, 0x464f1b?}) /home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/log/log.go:153 +0xa5 main.main() /home/ronoaldo/workspace/micro-beam/05_xlang/go/pipeline.go:144 +0x2a8 exit status 2 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
