boyuanzz commented on pull request #13014:
URL: https://github.com/apache/beam/pull/13014#issuecomment-706412811


   It seems like this change causes 
`flink_runner_test.py::FlinkRunnerTestOptimized` fails on error message 
like(it's taken from test_sdf case):
   ```
   [grpc-default-executor-0] WARN 
org.apache.beam.runners.jobsubmission.InMemoryJobService - Encountered 
Unexpected Exception during validation
   java.lang.RuntimeException: Failed to validate transform 
((((ref_PCollection_PCollection_1/Read)+(ref_AppliedPTransform_Create/FlatMap(<lambda
 at 
core.py:2945>)_4))+(ref_AppliedPTransform_Create/MaybeReshuffle/Reshuffle/AddRandomKeys_7))+(ref_AppliedPTransform_Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/Map(reify_timestamps)_9))+(ref_PCollection_PCollection_4/Write)
        at 
org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:221)
        at 
org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:120)
        at 
org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:100)
        at 
org.apache.beam.runners.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:234)
        at 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
   Caused by: java.lang.IllegalArgumentException: ExecutableStage 
((((ref_PCollection_PCollection_1/Read)+(ref_AppliedPTransform_Create/FlatMap(<lambda
 at 
core.py:2945>)_4))+(ref_AppliedPTransform_Create/MaybeReshuffle/Reshuffle/AddRandomKeys_7))+(ref_AppliedPTransform_Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/Map(reify_timestamps)_9))+(ref_PCollection_PCollection_4/Write)
 uses unknown output ref_PCollection_PCollection_4
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
        at 
org.apache.beam.runners.core.construction.graph.PipelineValidator.validateExecutableStage(PipelineValidator.java:306)
        at 
org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:219)
        ... 16 more
   
   ```
   
   How to reproduce:
   ```
   ./gradlew runners:flink:1.10:job-server:shadowJar
   pytest 
sdks/python/apache_beam/runners/portability/flink_runner_test.py::FlinkRunnerTestOptimized::test_sdf
 --test-pipeline-options " --environment_type=LOOPBACK"
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to