waj334 opened a new issue, #21945:
URL: https://github.com/apache/beam/issues/21945
### What happened?
I tried a extremely simple beam pipeline:
```
package main
import (
"context"
"flag"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"log"
)
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p, s := beam.NewPipelineWithRoot()
beam.Impulse(s)
if err := beamx.Run(ctx, p); err != nil {
log.Fatal(ctx, "Failed to execute job: %v", err)
}
}
```
When using the flink runner with the following error:
```
C:\Users\waj33\AppData\Local\Temp\GoLand\___2Telemetry_Beam_Pipeline.exe
--runner=flink --endpoint=localhost:8099
2022/06/18 19:29:36 No environment config specified. Using default config:
'apache/beam_go_sdk:2.41.0.dev'
2022/06/18 19:29:36 components: <
transforms: <
key: "e1"
value: <
unique_name: "Impulse"
spec: <
urn: "beam:transform:impulse:v1"
>
outputs: <
key: "i0"
value: "n1"
>
>
>
pcollections: <
key: "n1"
value: <
unique_name: "n1"
coder_id: "c0"
is_bounded: BOUNDED
windowing_strategy_id: "w0"
>
>
windowing_strategies: <
key: "w0"
value: <
window_fn: <
urn: "beam:window_fn:global_windows:v1"
>
merge_status: NON_MERGING
window_coder_id: "c1"
trigger: <
default: <
>
>
accumulation_mode: DISCARDING
output_time: END_OF_WINDOW
closing_behavior: EMIT_IF_NONEMPTY
on_time_behavior: FIRE_IF_NONEMPTY
environment_id: "go"
>
>
coders: <
key: "c0"
value: <
spec: <
urn: "beam:coder:bytes:v1"
>
>
>
coders: <
key: "c1"
value: <
spec: <
urn: "beam:coder:global_window:v1"
>
>
>
environments: <
key: "go"
value: <
urn: "beam:env:docker:v1"
payload: "\n\035apache/beam_go_sdk:2.41.0.dev"
capabilities: "beam:protocol:progress_reporting:v1"
capabilities: "beam:protocol:multi_core_bundle_processing:v1"
capabilities: "beam:transform:sdf_truncate_sized_restrictions:v1"
capabilities: "beam:protocol:worker_status:v1"
capabilities: "beam:protocol:monitoring_info_short_ids:v1"
capabilities: "beam:version:sdk_base:go"
capabilities: "beam:coder:bytes:v1"
capabilities: "beam:coder:bool:v1"
capabilities: "beam:coder:varint:v1"
capabilities: "beam:coder:double:v1"
capabilities: "beam:coder:string_utf8:v1"
capabilities: "beam:coder:length_prefix:v1"
capabilities: "beam:coder:kv:v1"
capabilities: "beam:coder:iterable:v1"
capabilities: "beam:coder:state_backed_iterable:v1"
capabilities: "beam:coder:windowed_value:v1"
capabilities: "beam:coder:global_window:v1"
capabilities: "beam:coder:interval_window:v1"
capabilities: "beam:coder:row:v1"
capabilities: "beam:coder:nullable:v1"
dependencies: <
type_urn: "beam:artifact:type:file:v1"
role_urn: "beam:artifact:role:go_worker_binary:v1"
>
>
>
>
root_transform_ids: "e1"
2022/06/18 19:29:39 Prepared job with id:
go-job-1-1655598576054300700_3356a1e1-23e9-4522-8839-f5547d5b18cb and staging
token: go-job-1-1655598576054300700_3356a1e1-23e9-4522-8839-f5547d5b18cb
2022/06/18 19:29:39 Staged binary artifact with token:
2022/06/18 19:29:39 Submitted job:
go0job0101655598576054300700-root-0619002939-c5a32241_10511e7a-30ce-4f6e-872a-5023c28ba4d9
2022/06/18 19:29:39 Job state: STOPPED
2022/06/18 19:29:39 Job state: STARTING
2022/06/18 19:29:39 Job state: RUNNING
2022/06/18 19:29:45 (): java.lang.RuntimeException:
java.util.concurrent.ExecutionException: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not start the
JobMaster.
at
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:958)
at
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:195)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:118)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:85)
at
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not start the
JobMaster.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
... 11 more
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not start the
JobMaster.
at
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
... 1 more
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could
not start the JobMaster.
at
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.api.common.InvalidProgramException: The job graph is cyclic.
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
Source)
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown
Source)
... 4 more
Caused by: org.apache.flink.api.common.InvalidProgramException: The job
graph is cyclic.
at
org.apache.flink.runtime.jobgraph.JobGraph.getVerticesSortedTopologicallyFromSources(JobGraph.java:442)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:186)
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:149)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363)
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:191)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:139)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:135)
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:322)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more
2022/06/18 19:29:45 ():
org.apache.flink.api.common.InvalidProgramException: The job graph is cyclic.
2022/06/18 19:29:45 Job state: FAILED
2022/06/18 19:29:45 context.BackgroundFailed to execute job: %vjob
go0job0101655598576054300700-root-0619002939-c5a32241_10511e7a-30ce-4f6e-872a-5023c28ba4d9
failed
Process finished with the exit code 1
```
### Issue Priority
Priority: 3
### Issue Component
Component: sdk-go
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]