[ 
https://issues.apache.org/jira/browse/FLINK-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17066738#comment-17066738
 ] 

&res commented on FLINK-16571:
------------------------------

Are you talking about these:
{code}
A type cannot be a key if:
* it is a POJO type but does not override the hashCode() method and relies on 
the Object.hashCode() implementation.
* it is an array of any type.
{code}

It does make sense to me, these are similar constrains that would apply to key 
objects in a HashMap. But for flink the constraint are stricter as it expects 
the hashCode() value to be same on any JVM, since data get distributed across 
different task manager instances. 


> Throw exception when current key is out of range
> ------------------------------------------------
>
>                 Key: FLINK-16571
>                 URL: https://issues.apache.org/jira/browse/FLINK-16571
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Core
>    Affects Versions: 1.9.2
>            Reporter: &res
>            Priority: Trivial
>
> * I've got a stream of records, that are "keyed by" using a class whose 
> hashCode isn't stable across jvm instances. 
> * The records are then processed by a parallel operator, which is running on 
> several task managers on the cluster.
> * A given task manager receive a record, calculates the hashCode of the key 
> and sends it to another task manager instance according to it's slot 
> allocation
> * The other task manager instance receives the record for a given parallel 
> slot (allocated by the other instance), calculate the hash for the record 
> (which doesn't match the original hash) and then I get this error:
> {code}
> java.lang.NullPointerException
>       at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
>       at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
>       at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90)
> {code}
> It took me a while to figure out what's going on, and while the issue is on 
> my side, it would have helped if the InternalKeyContextImpl had complained in 
> the first place that the hashCode was out of range.
> InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when 
> currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash 
> code function isn't stable.
> A few notes:
> * The reason I'm getting the issue is because on oracle jdk, Enum hashcode 
> use the memory address of the enum. I should use ordinal when hashing to 
> ensure stability
> * The issue is more likely to happen is cluster.evenly-spread-out-slots is on 
> (which forces the job to be distributed to different instances)
> Here's an example that replicates the issue. It has to run using oracle jvm 
> on a cluster of a few nodes, using luster.evenly-spread-out-slots:
> {code:java}
> import java.lang.reflect.Field;
> import java.util.Random;
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.KeyGroupRange;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> public class JobThatFails {
>   private static final Logger LOG = 
> LoggerFactory.getLogger(JobThatFails.class);
>   private enum EnumKey {
>     ONE,
>     TWO,
>     THREE,
>     FOUR,
>     FIVE,
>     SIX,
>     SEVEN,
>     EIGHT,
>     NINE,
>     TEN
>   }
>   void defineGraph(StreamExecutionEnvironment env) {
>     int parallelism = 4;
>     DataStream<Tuple2<EnumKey, Integer>> src = null;
>     for (int i = 0; i < 10; ++i) {
>       DataStream<Tuple2<EnumKey, Integer>> eachSrc = env.addSource(new 
> IntegerSourceFunction());
>       if (src == null) {
>         src = eachSrc;
>       } else {
>         src = src.union(eachSrc);
>       }
>     }
>     src.keyBy(p -> p.f0)
>         .map(new StatefulCounter())
>         .name(StatefulCounter.class.getSimpleName())
>         .setParallelism(parallelism)
>         .addSink(
>             new SinkFunction<Integer>() {
>               @Override
>               public void invoke(Integer value, Context context) {}
>             });
>   }
>   private static class IntegerSourceFunction implements 
> SourceFunction<Tuple2<EnumKey, Integer>> {
>     volatile boolean running;
>     private IntegerSourceFunction() {}
>     @Override
>     public void run(SourceContext<Tuple2<EnumKey, Integer>> ctx) throws 
> Exception {
>       running = true;
>       Random random = new Random();
>       while (running) {
>         for (EnumKey e : EnumKey.values()) {
>           ctx.collect(Tuple2.of(e, random.nextInt(10)));
>         }
>         Thread.sleep(1000);
>       }
>     }
>     @Override
>     public void cancel() {
>       running = false;
>     }
>   }
>   class StatefulCounter extends RichMapFunction<Tuple2<EnumKey, Integer>, 
> Integer> {
>     private transient ValueState<Integer> stateHandle;
>     @Override
>     public Integer map(Tuple2<EnumKey, Integer> value) throws Exception {
>       LOG.info(getDebugMessage());
>       Integer state = stateHandle.value();
>       if (state == null) {
>         state = 0;
>       }
>       state = state + value.f1;
>       stateHandle.update(state);
>       return state;
>     }
>     @Override
>     public void open(Configuration configuration) {
>       stateHandle =
>           getRuntimeContext()
>               .getState(
>                   new ValueStateDescriptor<>(
>                       JobThatFails.class.getSimpleName(), 
> TypeInformation.of(Integer.class)));
>     }
>     private String getDebugMessage() {
>       Object stateTable = JobThatFails.readField(stateHandle, "stateTable");
>       Object keyContext = JobThatFails.readField(stateTable, "keyContext");
>       Object currentNamespace = JobThatFails.readField(stateHandle, 
> "currentNamespace");
>       Object length = ((Object[]) JobThatFails.readField(stateTable, 
> "keyGroupedStateMaps")).length;
>       Object keyGroupOffset = JobThatFails.readField(stateTable, 
> "keyGroupOffset");
>       JobThatFails.readField(keyContext, "currentKeyGroupIndex");
>       KeyGroupRange keyGroupRange =
>           (KeyGroupRange) JobThatFails.readField(keyContext, "keyGroupRange");
>       return String.format(
>           "numberOfKeyGroups=%s currentKey=%s currentKeyGroupIndex=%s 
> keyGroupOffset=%s currentNamespace=%s length=%s startKeyGroup=%s 
> endKeyGroup=%s",
>           JobThatFails.readField(keyContext, "numberOfKeyGroups"),
>           JobThatFails.readField(keyContext, "currentKey"),
>           JobThatFails.readField(keyContext, "currentKeyGroupIndex"),
>           keyGroupOffset,
>           currentNamespace,
>           length,
>           keyGroupRange.getStartKeyGroup(),
>           keyGroupRange.getEndKeyGroup());
>     }
>   }
>   static Object readField(Object object, String name) {
>     try {
>       Field field = getField(name, object.getClass());
>       field.setAccessible(true);
>       return field.get(object);
>     } catch (IllegalAccessException | NoSuchFieldException | 
> SecurityException e) {
>       throw new RuntimeException(
>           String.format(
>               "Cannot read %s from %s %s", name, 
> object.getClass().getSimpleName(), object),
>           e);
>     }
>   }
>   static Field getField(String name, Class<?> clazz) throws 
> NoSuchFieldException {
>     try {
>       return clazz.getDeclaredField(name);
>     } catch (NoSuchFieldException e) {
>       if (clazz.getSuperclass() == null) {
>         throw e;
>       } else {
>         return getField(name, clazz.getSuperclass());
>       }
>     }
>   }
>   public static void main(String[] args) throws Exception {
>     JobThatFails job = new JobThatFails();
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     job.defineGraph(env);
>     JobExecutionResult result = 
> env.execute(JobThatFails.class.getSimpleName());
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to