Alex Amato created BEAM-7452:
--------------------------------
Summary: Ensure all RunnerHarnesses provide a valid
RunnerApi.IsBounded value on all PCollections
Key: BEAM-7452
URL: https://issues.apache.org/jira/browse/BEAM-7452
Project: Beam
Issue Type: New Feature
Components: runner-core
Reporter: Alex Amato
Fixing this requires updating 4 locations.
* Dataflow RunnerHarness
* FNAPDoFnRunner
* UnifiedWorker
* Shared libraries for this proto generation, which should cover OSS runners
+ Remove the workaround in ProcessBundleHandler.java which will assume that all
PCollections are bounded, if not set.
See PCollectionTranslation.fromProto which should be always passed a valid
value and not default to error or assume the PCollection is bounded.
Context
===
When I was updating the java SDK to conditionally serialize some elements to
reported a sampled byte size metric, I encountered this.
Its due to to the refactoring in my
[PR/8416|https://github.com/apache/beam/pull/8416], the RehydratedComponents
was pulled up a level, and shared now among all the calls to
createRunnerForPTransform in the various PtransfomRunnerFactories.
I is now triggering some code paths which were not previously triggered for all
types of PTransforms/PCollections, causing this error to occur.
jsonPayload: {
exception:
"org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Cannot convert unknown
org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to
org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
at
org.apache.beam.runners.core.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:144)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry.getMultiplexingConsumer(PCollectionConsumerRegistry.java:145)
at
org.apache.beam.fn.harness.DoFnPTransformRunnerFactory$Context.<init>(DoFnPTransformRunnerFactory.java:284)
at
org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:97)
at
org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:63)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:198)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:166)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:306)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:160)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:144)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Cannot convert unknown
org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded to
org.apache.beam.sdk.values.PCollection.IsBounded: UNSPECIFIED
at
org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:88)
at
org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:56)
at
org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:103)
at
org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:93)
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
... 17 more
"
job: "2019-05-29_03_31_14-4799355109250203557"
logger: "org.apache.beam.fn.harness.control.BeamFnControlClient"
*message: "Exception while trying to handle InstructionRequest -28"*
portability_worker_id: "1"
thread: "16"
worker: "testpipeline-pabloem-0529-05290331-75o8-harness-htz8"
}
The root of the issue is that the ProcessBundleDescriptors are invalid. The
RunnerHarnesses are not setting the
org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded which breaks the
specification and leads to this error.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)