Alex Amato created BEAM-7452:
--------------------------------

             Summary: Ensure all RunnerHarnesses provide a valid 
RunnerApi.IsBounded value on all PCollections
                 Key: BEAM-7452
                 URL: https://issues.apache.org/jira/browse/BEAM-7452
             Project: Beam
          Issue Type: New Feature
          Components: runner-core
            Reporter: Alex Amato


Fixing this requires updating 4 locations.
 * Dataflow RunnerHarness
 * FNAPDoFnRunner
 * UnifiedWorker
 * Shared libraries for this proto generation, which should cover OSS runners
+ Remove the workaround in ProcessBundleHandler.java which will assume that all 
PCollections are bounded, if not set.

See PCollectionTranslation.fromProto which should be always passed a valid 
value and not default to error or assume the PCollection is bounded.

 

Context

===

When I was updating the java SDK to conditionally serialize some elements to 
reported a sampled byte size metric, I encountered this.

 
Its due to to the refactoring in my 
[PR/8416|https://github.com/apache/beam/pull/8416], the RehydratedComponents 
was pulled up a level, and shared now among all the calls to 
createRunnerForPTransform in the various PtransfomRunnerFactories.
 
I is now triggering some code paths which were not previously triggered for all 
types of PTransforms/PCollections, causing this error to occur.
 
 jsonPayload: {
  exception:  
"org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: Cannot convert unknown 
org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to 
org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED
 at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
 at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
 at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
 at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
 at 
org.apache.beam.runners.core.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:144)
 at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry.getMultiplexingConsumer(PCollectionConsumerRegistry.java:145)
 at 
org.apache.beam.fn.harness.DoFnPTransformRunnerFactory$Context.<init>(DoFnPTransformRunnerFactory.java:284)
 at 
org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:97)
 at 
org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:63)
 at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:198)
 at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166)
 at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166)
 at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:306)
 at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:160)
 at 
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:144)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Cannot convert unknown 
org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to 
org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED
 at 
org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:88)
 at 
org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:56)
 at 
org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:103)
 at 
org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:93)
 at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
 at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
 at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
 at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
 ... 17 more
"   
  job:  "2019-05-29_03_31_14-4799355109250203557"   
  logger:  "org.apache.beam.fn.harness.control.BeamFnControlClient"   
  *message:  "Exception while trying to handle InstructionRequest -28"*   
  portability_worker_id:  "1"   
  thread:  "16"   
  worker:  "testpipeline-pabloem-0529-05290331-75o8-harness-htz8"   
 }
 
The root of the issue is that the ProcessBundleDescriptors are invalid. The 
RunnerHarnesses are not setting the 
org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded which breaks the 
specification and leads to this error.
 
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to