[ 
https://issues.apache.org/jira/browse/BEAM-4745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía resolved BEAM-4745.
--------------------------------
       Resolution: Fixed
    Fix Version/s: 2.6.0

> SDF tests broken by innocent change due to Dataflow worker dependencies
> -----------------------------------------------------------------------
>
>                 Key: BEAM-4745
>                 URL: https://issues.apache.org/jira/browse/BEAM-4745
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>            Priority: Critical
>             Fix For: 2.6.0
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> https://github.com/apache/beam/pull/5894 broke SDF in Dataflow streaming 
> runner, using SDFs fails with the error below.
> The reason is that Dataflow worker has a staged copy of some stuff including 
> runners-core-construction, and it comes before user code in the classpath. So 
> the pipeline includes a serialized SplittableParDo from master, but the 
> worker deserializes it using a stale class file.
> This needs to be fixed on Dataflow side. Filing this JIRA just to track the 
> externally facing issue.
> Meanwhile to stop the bleeding I'm going to revert the change, even though by 
> itself it's a correct change, but it's better to have SDFs not invoke 
> setup/teardown than to have them not work at all.
> CC: [~iemejia]
> java.lang.RuntimeException: 
> com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:192)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
>         
> com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
>         
> com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
>         
> com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)
> Caused by: 
> com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
>         
> com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90)
>         
> com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
>         
> com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
>         
> com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
>         
> com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalArgumentException: unable to deserialize 
> Serialized DoFnInfo
>         
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>         
> com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:61)
>         
> com.google.cloud.dataflow.worker.UserParDoFnFactory.lambda$create$0(UserParDoFnFactory.java:92)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
>         
> com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90)
>         
> com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
>         
> com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
>         
> com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
>         
> com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException: 
> org.apache.beam.runners.core.construction.SplittableParDo$PairWithRestrictionFn;
>  local class incompatible: stream classdesc serialVersionUID = 
> -2216501394657530686, local class serialVersionUID = -6277163835950193211
>         java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
>         
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630)
>         java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
>         
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>         
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>         java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>         
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>         
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
>         
> com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:61)
>         
> com.google.cloud.dataflow.worker.UserParDoFnFactory.lambda$create$0(UserParDoFnFactory.java:92)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
>         
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
>         
> com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90)
>         
> com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
>         
> com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
>         
> com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
>         
> com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
>         
> com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
>         
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to