[ https://issues.apache.org/jira/browse/FLINK-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17066738#comment-17066738 ]
&res commented on FLINK-16571: ------------------------------ Are you talking about these: {code} A type cannot be a key if: * it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation. * it is an array of any type. {code} It does make sense to me, these are similar constrains that would apply to key objects in a HashMap. But for flink the constraint are stricter as it expects the hashCode() value to be same on any JVM, since data get distributed across different task manager instances. > Throw exception when current key is out of range > ------------------------------------------------ > > Key: FLINK-16571 > URL: https://issues.apache.org/jira/browse/FLINK-16571 > Project: Flink > Issue Type: Improvement > Components: API / Core > Affects Versions: 1.9.2 > Reporter: &res > Priority: Trivial > > * I've got a stream of records, that are "keyed by" using a class whose > hashCode isn't stable across jvm instances. > * The records are then processed by a parallel operator, which is running on > several task managers on the cluster. > * A given task manager receive a record, calculates the hashCode of the key > and sends it to another task manager instance according to it's slot > allocation > * The other task manager instance receives the record for a given parallel > slot (allocated by the other instance), calculate the hash for the record > (which doesn't match the original hash) and then I get this error: > {code} > java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336) > at > org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159) > at > org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90) > {code} > It took me a while to figure out what's going on, and while the issue is on > my side, it would have helped if the InternalKeyContextImpl had complained in > the first place that the hashCode was out of range. > InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when > currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash > code function isn't stable. > A few notes: > * The reason I'm getting the issue is because on oracle jdk, Enum hashcode > use the memory address of the enum. I should use ordinal when hashing to > ensure stability > * The issue is more likely to happen is cluster.evenly-spread-out-slots is on > (which forces the job to be distributed to different instances) > Here's an example that replicates the issue. It has to run using oracle jvm > on a cluster of a few nodes, using luster.evenly-spread-out-slots: > {code:java} > import java.lang.reflect.Field; > import java.util.Random; > import org.apache.flink.api.common.JobExecutionResult; > import org.apache.flink.api.common.functions.RichMapFunction; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.runtime.state.KeyGroupRange; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.SinkFunction; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > public class JobThatFails { > private static final Logger LOG = > LoggerFactory.getLogger(JobThatFails.class); > private enum EnumKey { > ONE, > TWO, > THREE, > FOUR, > FIVE, > SIX, > SEVEN, > EIGHT, > NINE, > TEN > } > void defineGraph(StreamExecutionEnvironment env) { > int parallelism = 4; > DataStream<Tuple2<EnumKey, Integer>> src = null; > for (int i = 0; i < 10; ++i) { > DataStream<Tuple2<EnumKey, Integer>> eachSrc = env.addSource(new > IntegerSourceFunction()); > if (src == null) { > src = eachSrc; > } else { > src = src.union(eachSrc); > } > } > src.keyBy(p -> p.f0) > .map(new StatefulCounter()) > .name(StatefulCounter.class.getSimpleName()) > .setParallelism(parallelism) > .addSink( > new SinkFunction<Integer>() { > @Override > public void invoke(Integer value, Context context) {} > }); > } > private static class IntegerSourceFunction implements > SourceFunction<Tuple2<EnumKey, Integer>> { > volatile boolean running; > private IntegerSourceFunction() {} > @Override > public void run(SourceContext<Tuple2<EnumKey, Integer>> ctx) throws > Exception { > running = true; > Random random = new Random(); > while (running) { > for (EnumKey e : EnumKey.values()) { > ctx.collect(Tuple2.of(e, random.nextInt(10))); > } > Thread.sleep(1000); > } > } > @Override > public void cancel() { > running = false; > } > } > class StatefulCounter extends RichMapFunction<Tuple2<EnumKey, Integer>, > Integer> { > private transient ValueState<Integer> stateHandle; > @Override > public Integer map(Tuple2<EnumKey, Integer> value) throws Exception { > LOG.info(getDebugMessage()); > Integer state = stateHandle.value(); > if (state == null) { > state = 0; > } > state = state + value.f1; > stateHandle.update(state); > return state; > } > @Override > public void open(Configuration configuration) { > stateHandle = > getRuntimeContext() > .getState( > new ValueStateDescriptor<>( > JobThatFails.class.getSimpleName(), > TypeInformation.of(Integer.class))); > } > private String getDebugMessage() { > Object stateTable = JobThatFails.readField(stateHandle, "stateTable"); > Object keyContext = JobThatFails.readField(stateTable, "keyContext"); > Object currentNamespace = JobThatFails.readField(stateHandle, > "currentNamespace"); > Object length = ((Object[]) JobThatFails.readField(stateTable, > "keyGroupedStateMaps")).length; > Object keyGroupOffset = JobThatFails.readField(stateTable, > "keyGroupOffset"); > JobThatFails.readField(keyContext, "currentKeyGroupIndex"); > KeyGroupRange keyGroupRange = > (KeyGroupRange) JobThatFails.readField(keyContext, "keyGroupRange"); > return String.format( > "numberOfKeyGroups=%s currentKey=%s currentKeyGroupIndex=%s > keyGroupOffset=%s currentNamespace=%s length=%s startKeyGroup=%s > endKeyGroup=%s", > JobThatFails.readField(keyContext, "numberOfKeyGroups"), > JobThatFails.readField(keyContext, "currentKey"), > JobThatFails.readField(keyContext, "currentKeyGroupIndex"), > keyGroupOffset, > currentNamespace, > length, > keyGroupRange.getStartKeyGroup(), > keyGroupRange.getEndKeyGroup()); > } > } > static Object readField(Object object, String name) { > try { > Field field = getField(name, object.getClass()); > field.setAccessible(true); > return field.get(object); > } catch (IllegalAccessException | NoSuchFieldException | > SecurityException e) { > throw new RuntimeException( > String.format( > "Cannot read %s from %s %s", name, > object.getClass().getSimpleName(), object), > e); > } > } > static Field getField(String name, Class<?> clazz) throws > NoSuchFieldException { > try { > return clazz.getDeclaredField(name); > } catch (NoSuchFieldException e) { > if (clazz.getSuperclass() == null) { > throw e; > } else { > return getField(name, clazz.getSuperclass()); > } > } > } > public static void main(String[] args) throws Exception { > JobThatFails job = new JobThatFails(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > job.defineGraph(env); > JobExecutionResult result = > env.execute(JobThatFails.class.getSimpleName()); > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)