[ 
https://issues.apache.org/jira/browse/BEAM-7144?focusedWorklogId=260821&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-260821
 ]

ASF GitHub Bot logged work on BEAM-7144:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jun/19 05:11
            Start Date: 15/Jun/19 05:11
    Worklog Time Spent: 10m 
      Work Description: JozoVilcek commented on pull request #8850: [BEAM-7144] 
Fix for rescaling problem on Flink >= 1.6
URL: https://github.com/apache/beam/pull/8850#discussion_r294035124
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 ##########
 @@ -1233,4 +1234,121 @@ private void restoreWatermarkHoldsView() throws 
Exception {
       }
     }
   }
+
+  /** Eagerly create user state to work around 
https://jira.apache.org/jira/browse/FLINK-12653. */
+  public static class EarlyBinder implements StateBinder {
 
 Review comment:
   Thanks! 👍 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 260821)
    Time Spent: 2.5h  (was: 2h 20m)

> Job re-scale fails on Flink 1.7
> -------------------------------
>
>                 Key: BEAM-7144
>                 URL: https://issues.apache.org/jira/browse/BEAM-7144
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.11.0
>            Reporter: Jozef Vilcek
>            Assignee: Maximilian Michels
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> I am unable to rescale job after moving it to flink runner 1.7. What I am 
> doing is:
>  # Recompile job code just with swapped flink runner version 1.5 -> 1.7
>  # Run streaming job with parallelism 112 and maxParallelism 448
>  # Wait until checkpoint is taken
>  # Stop job
>  # Run job again with parallelims 224 and checpooint path to restore from
>  # Job fails
> The same happens if I try to increase parallelims. This procedure works for 
> the same job compiled with flink runner 1.5 and run on 1.5.0. Fails with 
> runner 1.7 on flink 1.7.2
> Exception is:
> {noformat}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> WindowDoFnOperator_2b6af61dc418f10e82551367a7e7f78e_(83/224) from any of the 
> 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 5 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 101, Size: 0
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
> at 
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:73)
> at 
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> ... 7 more{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to