[ 
https://issues.apache.org/jira/browse/BEAM-7144?focusedWorklogId=260691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-260691
 ]

ASF GitHub Bot logged work on BEAM-7144:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Jun/19 20:22
            Start Date: 14/Jun/19 20:22
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #8850: [BEAM-7144] Fix 
for rescaling problem on Flink >= 1.6
URL: https://github.com/apache/beam/pull/8850#discussion_r293962544
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 ##########
 @@ -1233,4 +1234,121 @@ private void restoreWatermarkHoldsView() throws 
Exception {
       }
     }
   }
+
+  /** Eagerly create user state to work around 
https://jira.apache.org/jira/browse/FLINK-12653. */
+  public static class EarlyBinder implements StateBinder {
 
 Review comment:
   Feel free to ask anything :) `getPartitionedState` is a wrapper around 
`getOrCreateKeyedState`. The latter creates a keyed state (as the name 
suggests). To create a keyed state, you pass in a namespace serializer and a 
state descriptor. The state descriptor contains a state serializer and a state 
name. So the state is ready to be used now, but without an active namespace, it 
cannot be accessed. You can now cast the `State` you received to 
`InternalKvState` which allows you to set the namespace. The alternative to use 
`getPartitionedState`, which allows to specify an actual namespace.
   
   Perhaps this is even clearer when looking at the source code:
   
   ```java
        public <N, S extends State> S getPartitionedState(
                        final N namespace,
                        final TypeSerializer<N> namespaceSerializer,
                        final StateDescriptor<S, ?> stateDescriptor) throws 
Exception {
                checkNotNull(namespace, "Namespace");
                   /* ... */
                final S state = getOrCreateKeyedState(namespaceSerializer, 
stateDescriptor);
                final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, 
?>) state;
                   /* ... */
                kvState.setCurrentNamespace(namespace);
   
                return state;
        }
   ```  
   
   I've omitted some caching code for simplicity.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 260691)
    Time Spent: 1h 50m  (was: 1h 40m)

> Job re-scale fails on Flink 1.7
> -------------------------------
>
>                 Key: BEAM-7144
>                 URL: https://issues.apache.org/jira/browse/BEAM-7144
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.11.0
>            Reporter: Jozef Vilcek
>            Assignee: Maximilian Michels
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> I am unable to rescale job after moving it to flink runner 1.7. What I am 
> doing is:
>  # Recompile job code just with swapped flink runner version 1.5 -> 1.7
>  # Run streaming job with parallelism 112 and maxParallelism 448
>  # Wait until checkpoint is taken
>  # Stop job
>  # Run job again with parallelims 224 and checpooint path to restore from
>  # Job fails
> The same happens if I try to increase parallelims. This procedure works for 
> the same job compiled with flink runner 1.5 and run on 1.5.0. Fails with 
> runner 1.7 on flink 1.7.2
> Exception is:
> {noformat}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> WindowDoFnOperator_2b6af61dc418f10e82551367a7e7f78e_(83/224) from any of the 
> 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 5 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 101, Size: 0
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
> at 
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:73)
> at 
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> ... 7 more{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to