[ 
https://issues.apache.org/jira/browse/FLINK-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17065986#comment-17065986
 ] 

&res commented on FLINK-16571:
------------------------------

[~libenchao], I was hopping we could start the discussion about this issue. 

While this issue isn't a big problem in itself, it took me a lot of time to 
debug, and I'd like to save other users some time.
My idea for a fix would be to throw an exception if the key group index is out 
of the key group range:
https://github.com/apache/flink/compare/master...arthurandres:FLINK-16571-throw-error-when-key-hash-is-out-of-range?expand=1

Another solution would to change the transport layer. 
We would send the hash code value with the serialized key over the network and 
check that it matches the hash code value of the deserialized value when it is 
received. 
But that would be a much bigger change and it would not feel natural to send 
the hash code on the network anyway.

What do you think?

> Throw exception when current key is out of range
> ------------------------------------------------
>
>                 Key: FLINK-16571
>                 URL: https://issues.apache.org/jira/browse/FLINK-16571
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Core
>    Affects Versions: 1.9.2
>            Reporter: &res
>            Priority: Trivial
>
> * I've got a stream of records, that are "keyed by" using a class whose 
> hashCode isn't stable across jvm instances. 
> * The records are then processed by a parallel operator, which is running on 
> several task managers on the cluster.
> * A given task manager receive a record, calculates the hashCode of the key 
> and sends it to another task manager instance according to it's slot 
> allocation
> * The other task manager instance receives the record for a given parallel 
> slot (allocated by the other instance), calculate the hash for the record 
> (which doesn't match the original hash) and then I get this error:
> {code}
> java.lang.NullPointerException
>       at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
>       at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
>       at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90)
> {code}
> It took me a while to figure out what's going on, and while the issue is on 
> my side, it would have helped if the InternalKeyContextImpl had complained in 
> the first place that the hashCode was out of range.
> InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when 
> currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash 
> code function isn't stable.
> A few notes:
> * The reason I'm getting the issue is because on oracle jdk, Enum hashcode 
> use the memory address of the enum. I should use ordinal when hashing to 
> ensure stability
> * The issue is more likely to happen is cluster.evenly-spread-out-slots is on 
> (which forces the job to be distributed to different instances)
> Here's an example that replicates the issue. It has to run using oracle jvm 
> on a cluster of a few nodes, using luster.evenly-spread-out-slots:
> {code:java}
> import java.lang.reflect.Field;
> import java.util.Random;
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.KeyGroupRange;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> public class JobThatFails {
>   private static final Logger LOG = 
> LoggerFactory.getLogger(JobThatFails.class);
>   private enum EnumKey {
>     ONE,
>     TWO,
>     THREE,
>     FOUR,
>     FIVE,
>     SIX,
>     SEVEN,
>     EIGHT,
>     NINE,
>     TEN
>   }
>   void defineGraph(StreamExecutionEnvironment env) {
>     int parallelism = 4;
>     DataStream<Tuple2<EnumKey, Integer>> src = null;
>     for (int i = 0; i < 10; ++i) {
>       DataStream<Tuple2<EnumKey, Integer>> eachSrc = env.addSource(new 
> IntegerSourceFunction());
>       if (src == null) {
>         src = eachSrc;
>       } else {
>         src = src.union(eachSrc);
>       }
>     }
>     src.keyBy(p -> p.f0)
>         .map(new StatefulCounter())
>         .name(StatefulCounter.class.getSimpleName())
>         .setParallelism(parallelism)
>         .addSink(
>             new SinkFunction<Integer>() {
>               @Override
>               public void invoke(Integer value, Context context) {}
>             });
>   }
>   private static class IntegerSourceFunction implements 
> SourceFunction<Tuple2<EnumKey, Integer>> {
>     volatile boolean running;
>     private IntegerSourceFunction() {}
>     @Override
>     public void run(SourceContext<Tuple2<EnumKey, Integer>> ctx) throws 
> Exception {
>       running = true;
>       Random random = new Random();
>       while (running) {
>         for (EnumKey e : EnumKey.values()) {
>           ctx.collect(Tuple2.of(e, random.nextInt(10)));
>         }
>         Thread.sleep(1000);
>       }
>     }
>     @Override
>     public void cancel() {
>       running = false;
>     }
>   }
>   class StatefulCounter extends RichMapFunction<Tuple2<EnumKey, Integer>, 
> Integer> {
>     private transient ValueState<Integer> stateHandle;
>     @Override
>     public Integer map(Tuple2<EnumKey, Integer> value) throws Exception {
>       LOG.info(getDebugMessage());
>       Integer state = stateHandle.value();
>       if (state == null) {
>         state = 0;
>       }
>       state = state + value.f1;
>       stateHandle.update(state);
>       return state;
>     }
>     @Override
>     public void open(Configuration configuration) {
>       stateHandle =
>           getRuntimeContext()
>               .getState(
>                   new ValueStateDescriptor<>(
>                       JobThatFails.class.getSimpleName(), 
> TypeInformation.of(Integer.class)));
>     }
>     private String getDebugMessage() {
>       Object stateTable = JobThatFails.readField(stateHandle, "stateTable");
>       Object keyContext = JobThatFails.readField(stateTable, "keyContext");
>       Object currentNamespace = JobThatFails.readField(stateHandle, 
> "currentNamespace");
>       Object length = ((Object[]) JobThatFails.readField(stateTable, 
> "keyGroupedStateMaps")).length;
>       Object keyGroupOffset = JobThatFails.readField(stateTable, 
> "keyGroupOffset");
>       JobThatFails.readField(keyContext, "currentKeyGroupIndex");
>       KeyGroupRange keyGroupRange =
>           (KeyGroupRange) JobThatFails.readField(keyContext, "keyGroupRange");
>       return String.format(
>           "numberOfKeyGroups=%s currentKey=%s currentKeyGroupIndex=%s 
> keyGroupOffset=%s currentNamespace=%s length=%s startKeyGroup=%s 
> endKeyGroup=%s",
>           JobThatFails.readField(keyContext, "numberOfKeyGroups"),
>           JobThatFails.readField(keyContext, "currentKey"),
>           JobThatFails.readField(keyContext, "currentKeyGroupIndex"),
>           keyGroupOffset,
>           currentNamespace,
>           length,
>           keyGroupRange.getStartKeyGroup(),
>           keyGroupRange.getEndKeyGroup());
>     }
>   }
>   static Object readField(Object object, String name) {
>     try {
>       Field field = getField(name, object.getClass());
>       field.setAccessible(true);
>       return field.get(object);
>     } catch (IllegalAccessException | NoSuchFieldException | 
> SecurityException e) {
>       throw new RuntimeException(
>           String.format(
>               "Cannot read %s from %s %s", name, 
> object.getClass().getSimpleName(), object),
>           e);
>     }
>   }
>   static Field getField(String name, Class<?> clazz) throws 
> NoSuchFieldException {
>     try {
>       return clazz.getDeclaredField(name);
>     } catch (NoSuchFieldException e) {
>       if (clazz.getSuperclass() == null) {
>         throw e;
>       } else {
>         return getField(name, clazz.getSuperclass());
>       }
>     }
>   }
>   public static void main(String[] args) throws Exception {
>     JobThatFails job = new JobThatFails();
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     job.defineGraph(env);
>     JobExecutionResult result = 
> env.execute(JobThatFails.class.getSimpleName());
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to