[
https://issues.apache.org/jira/browse/BEAM-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alex Amato updated BEAM-7452:
-----------------------------
Labels: portable-metrics-bugs (was: )
> Ensure all RunnerHarnesses provide a valid RunnerApi.IsBounded value on all
> PCollections
> ----------------------------------------------------------------------------------------
>
> Key: BEAM-7452
> URL: https://issues.apache.org/jira/browse/BEAM-7452
> Project: Beam
> Issue Type: New Feature
> Components: runner-core
> Reporter: Alex Amato
> Priority: Major
> Labels: portable-metrics-bugs
>
> Fixing this requires updating 4 locations.
> * Dataflow RunnerHarness
> * FNAPDoFnRunner
> * UnifiedWorker
> * Shared libraries for this proto generation, which should cover OSS runners
> + Remove the workaround in ProcessBundleHandler.java which will assume that
> all PCollections are bounded, if not set.
> See PCollectionTranslation.fromProto which should be always passed a valid
> value and not default to error or assume the PCollection is bounded.
>
> Context
> ===
> When I was updating the java SDK to conditionally serialize some elements to
> reported a sampled byte size metric, I encountered this.
>
> Its due to to the refactoring in my
> [PR/8416|https://github.com/apache/beam/pull/8416], the RehydratedComponents
> was pulled up a level, and shared now among all the calls to
> createRunnerForPTransform in the various PtransfomRunnerFactories.
>
> I is now triggering some code paths which were not previously triggered for
> all types of PTransforms/PCollections, causing this error to occur.
>
> jsonPayload: {
> exception:
> "org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.IllegalArgumentException: Cannot convert unknown
> org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to
> org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
> at
> org.apache.beam.runners.core.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:144)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry.getMultiplexingConsumer(PCollectionConsumerRegistry.java:145)
> at
> org.apache.beam.fn.harness.DoFnPTransformRunnerFactory$Context.<init>(DoFnPTransformRunnerFactory.java:284)
> at
> org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:97)
> at
> org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:63)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:198)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:306)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:160)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:144)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Cannot convert unknown
> org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to
> org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED
> at
> org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:88)
> at
> org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:56)
> at
> org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:103)
> at
> org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:93)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
> ... 17 more
> "
> job: "2019-05-29_03_31_14-4799355109250203557"
> logger: "org.apache.beam.fn.harness.control.BeamFnControlClient"
> *message: "Exception while trying to handle InstructionRequest -28"*
> portability_worker_id: "1"
> thread: "16"
> worker: "testpipeline-pabloem-0529-05290331-75o8-harness-htz8"
> }
>
> The root of the issue is that the ProcessBundleDescriptors are invalid. The
> RunnerHarnesses are not setting the
> org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded which breaks the
> specification and leads to this error.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)