[ https://issues.apache.org/jira/browse/BEAM-1053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15961931#comment-15961931 ]
ASF GitHub Bot commented on BEAM-1053: -------------------------------------- GitHub user tweise opened a pull request: https://github.com/apache/beam/pull/2473 [BEAM-1053] ApexGroupByKeyOperator serialization issues Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-<Jira issue #>] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `<Jira issue #>` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- R: @jkff Probably this code will go away after the SDF work but this bug is trivial to fix and has been there for a while, so let's close it out separately. For stateInternals it was already addressed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/beam BEAM-1053_ApexGroupByKeySerialization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2473.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2473 ---- commit adf378568e7c676c3b2780c07caae679ea461f01 Author: Thomas Weise <t...@apache.org> Date: 2017-04-08T20:01:01Z BEAM-1053 ApexGroupByKeyOperator serialization issues ---- > ApexGroupByKeyOperator serialization issues > ------------------------------------------- > > Key: BEAM-1053 > URL: https://issues.apache.org/jira/browse/BEAM-1053 > Project: Beam > Issue Type: Bug > Components: runner-apex > Affects Versions: 0.3.0-incubating > Reporter: Sandeep Deshmukh > Assignee: Thomas Weise > > While trying to use Apex Runner for wordcount program, the > ApexGroupByKeyOperator fails with following exception. > 2016-11-28 20:54:47,696 INFO com.datatorrent.stram.engine.StreamingContainer: > Deploy request: > [OperatorDeployInfo[id=9,name=ExtractPayload,type=GENERIC,checkpoint={583c4b11000000b3, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=stream14,sourceNodeId=8,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=stream0,bufferServer=<null>]]], > > OperatorDeployInfo[id=8,name=ReadFromHDFS,type=INPUT,checkpoint={583c4b11000000b3, > 0, > 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=stream14,bufferServer=<null>]]], > > OperatorDeployInfo[id=11,name=Application.CountWords/Count.PerElement/Init/Map,type=GENERIC,checkpoint={583c4b11000000b3, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=stream17,sourceNodeId=10,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=stream18,bufferServer=<null>]]], > > OperatorDeployInfo[id=10,name=Application.CountWords/ParDo(ExtractWords),type=GENERIC,checkpoint={583c4b11000000b3, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=stream0,sourceNodeId=9,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=stream17,bufferServer=<null>]]], > > OperatorDeployInfo[id=13,name=Application.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues/AnonymousParDo,type=GENERIC,checkpoint={583c4b11000000b3, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=stream12,sourceNodeId=12,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=stream3,bufferServer=<null>]]], > > OperatorDeployInfo[id=14,name=WriteToHDFS/Window.Into()/ApexRunner.AssignWindowsAndSetStrategy/AssignWindows/AssignWindows,type=GENERIC,checkpoint={583c4b11000000b3, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=stream3,sourceNodeId=13,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=stream10,bufferServer=goofy]]], > > OperatorDeployInfo[id=12,name=Application.CountWords/Count.PerElement/Count.PerKey/GroupByKey,type=GENERIC,checkpoint={583c4b11000000b3, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=stream18,sourceNodeId=11,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=stream12,bufferServer=<null>]]]] > 2016-11-28 20:54:48,922 ERROR > com.datatorrent.stram.engine.StreamingContainer: deploy request failed > com.esotericsoftware.kryo.KryoException: Class cannot be created (missing > no-arg constructor): java.nio.HeapByteBuffer > Serialization trace: > activeTimers > (org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator) > at > com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:192) > at > com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:137) > at > com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:915) > at > com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:863) > at > com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:821) > at > com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:706) > at > com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:311) > 2016-11-28 20:54:48,932 ERROR > com.datatorrent.stram.engine.StreamingContainer: Fatal exception in container! -- This message was sent by Atlassian JIRA (v6.3.15#6346)