[ https://issues.apache.org/jira/browse/BEAM-4745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía resolved BEAM-4745. -------------------------------- Resolution: Fixed Fix Version/s: 2.6.0 > SDF tests broken by innocent change due to Dataflow worker dependencies > ----------------------------------------------------------------------- > > Key: BEAM-4745 > URL: https://issues.apache.org/jira/browse/BEAM-4745 > Project: Beam > Issue Type: Bug > Components: runner-dataflow > Reporter: Eugene Kirpichov > Assignee: Eugene Kirpichov > Priority: Critical > Fix For: 2.6.0 > > Time Spent: 1h > Remaining Estimate: 0h > > https://github.com/apache/beam/pull/5894 broke SDF in Dataflow streaming > runner, using SDFs fails with the error below. > The reason is that Dataflow worker has a staged copy of some stuff including > runners-core-construction, and it comes before user code in the classpath. So > the pipeline includes a serialized SplittableParDo from master, but the > worker deserializes it using a stale class file. > This needs to be fixed on Dataflow side. Filing this JIRA just to track the > externally facing issue. > Meanwhile to stop the bleeding I'm going to revert the change, even though by > itself it's a correct change, but it's better to have SDFs not invoke > setup/teardown than to have them not work at all. > CC: [~iemejia] > java.lang.RuntimeException: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:192) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) > > com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) > > com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90) > > com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) > > com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalArgumentException: unable to deserialize > Serialized DoFnInfo > > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > > com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:61) > > com.google.cloud.dataflow.worker.UserParDoFnFactory.lambda$create$0(UserParDoFnFactory.java:92) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) > > com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90) > > com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) > > com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: java.io.InvalidClassException: > org.apache.beam.runners.core.construction.SplittableParDo$PairWithRestrictionFn; > local class incompatible: stream classdesc serialVersionUID = > -2216501394657530686, local class serialVersionUID = -6277163835950193211 > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71) > > com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:61) > > com.google.cloud.dataflow.worker.UserParDoFnFactory.lambda$create$0(UserParDoFnFactory.java:92) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) > > com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90) > > com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) > > com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005)