[ 
https://issues.apache.org/jira/browse/BEAM-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Amato updated BEAM-7452:
-----------------------------
    Labels: portable-metrics-bugs  (was: )

> Ensure all RunnerHarnesses provide a valid RunnerApi.IsBounded value on all 
> PCollections
> ----------------------------------------------------------------------------------------
>
>                 Key: BEAM-7452
>                 URL: https://issues.apache.org/jira/browse/BEAM-7452
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core
>            Reporter: Alex Amato
>            Priority: Major
>              Labels: portable-metrics-bugs
>
> Fixing this requires updating 4 locations.
>  * Dataflow RunnerHarness
>  * FNAPDoFnRunner
>  * UnifiedWorker
>  * Shared libraries for this proto generation, which should cover OSS runners
> + Remove the workaround in ProcessBundleHandler.java which will assume that 
> all PCollections are bounded, if not set.
> See PCollectionTranslation.fromProto which should be always passed a valid 
> value and not default to error or assume the PCollection is bounded.
>  
> Context
> ===
> When I was updating the java SDK to conditionally serialize some elements to 
> reported a sampled byte size metric, I encountered this.
>  
> Its due to to the refactoring in my 
> [PR/8416|https://github.com/apache/beam/pull/8416], the RehydratedComponents 
> was pulled up a level, and shared now among all the calls to 
> createRunnerForPTransform in the various PtransfomRunnerFactories.
>  
> I is now triggering some code paths which were not previously triggered for 
> all types of PTransforms/PCollections, causing this error to occur.
>  
>  jsonPayload: {
>   exception:  
> "org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.IllegalArgumentException: Cannot convert unknown 
> org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to 
> org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED
>  at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
>  at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
>  at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
>  at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
>  at 
> org.apache.beam.runners.core.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:144)
>  at 
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry.getMultiplexingConsumer(PCollectionConsumerRegistry.java:145)
>  at 
> org.apache.beam.fn.harness.DoFnPTransformRunnerFactory$Context.<init>(DoFnPTransformRunnerFactory.java:284)
>  at 
> org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:97)
>  at 
> org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:63)
>  at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:198)
>  at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166)
>  at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166)
>  at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:306)
>  at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:160)
>  at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:144)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Cannot convert unknown 
> org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to 
> org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED
>  at 
> org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:88)
>  at 
> org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:56)
>  at 
> org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:103)
>  at 
> org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:93)
>  at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
>  at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
>  at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
>  at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
>  ... 17 more
> "   
>   job:  "2019-05-29_03_31_14-4799355109250203557"   
>   logger:  "org.apache.beam.fn.harness.control.BeamFnControlClient"   
>   *message:  "Exception while trying to handle InstructionRequest -28"*   
>   portability_worker_id:  "1"   
>   thread:  "16"   
>   worker:  "testpipeline-pabloem-0529-05290331-75o8-harness-htz8"   
>  }
>  
> The root of the issue is that the ProcessBundleDescriptors are invalid. The 
> RunnerHarnesses are not setting the 
> org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded which breaks the 
> specification and leads to this error.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to