[
https://issues.apache.org/jira/browse/FLINK-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17066286#comment-17066286
]
Benchao Li commented on FLINK-16571:
------------------------------------
[~0x26dres] Thanks for reporting this. IMHO, this is by design for now, and the
document has pointed this out, see keyBy section in
[https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations]
> Throw exception when current key is out of range
> ------------------------------------------------
>
> Key: FLINK-16571
> URL: https://issues.apache.org/jira/browse/FLINK-16571
> Project: Flink
> Issue Type: Improvement
> Components: API / Core
> Affects Versions: 1.9.2
> Reporter: &res
> Priority: Trivial
>
> * I've got a stream of records, that are "keyed by" using a class whose
> hashCode isn't stable across jvm instances.
> * The records are then processed by a parallel operator, which is running on
> several task managers on the cluster.
> * A given task manager receive a record, calculates the hashCode of the key
> and sends it to another task manager instance according to it's slot
> allocation
> * The other task manager instance receives the record for a given parallel
> slot (allocated by the other instance), calculate the hash for the record
> (which doesn't match the original hash) and then I get this error:
> {code}
> java.lang.NullPointerException
> at
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
> at
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
> at
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90)
> {code}
> It took me a while to figure out what's going on, and while the issue is on
> my side, it would have helped if the InternalKeyContextImpl had complained in
> the first place that the hashCode was out of range.
> InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when
> currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash
> code function isn't stable.
> A few notes:
> * The reason I'm getting the issue is because on oracle jdk, Enum hashcode
> use the memory address of the enum. I should use ordinal when hashing to
> ensure stability
> * The issue is more likely to happen is cluster.evenly-spread-out-slots is on
> (which forces the job to be distributed to different instances)
> Here's an example that replicates the issue. It has to run using oracle jvm
> on a cluster of a few nodes, using luster.evenly-spread-out-slots:
> {code:java}
> import java.lang.reflect.Field;
> import java.util.Random;
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.KeyGroupRange;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> public class JobThatFails {
> private static final Logger LOG =
> LoggerFactory.getLogger(JobThatFails.class);
> private enum EnumKey {
> ONE,
> TWO,
> THREE,
> FOUR,
> FIVE,
> SIX,
> SEVEN,
> EIGHT,
> NINE,
> TEN
> }
> void defineGraph(StreamExecutionEnvironment env) {
> int parallelism = 4;
> DataStream<Tuple2<EnumKey, Integer>> src = null;
> for (int i = 0; i < 10; ++i) {
> DataStream<Tuple2<EnumKey, Integer>> eachSrc = env.addSource(new
> IntegerSourceFunction());
> if (src == null) {
> src = eachSrc;
> } else {
> src = src.union(eachSrc);
> }
> }
> src.keyBy(p -> p.f0)
> .map(new StatefulCounter())
> .name(StatefulCounter.class.getSimpleName())
> .setParallelism(parallelism)
> .addSink(
> new SinkFunction<Integer>() {
> @Override
> public void invoke(Integer value, Context context) {}
> });
> }
> private static class IntegerSourceFunction implements
> SourceFunction<Tuple2<EnumKey, Integer>> {
> volatile boolean running;
> private IntegerSourceFunction() {}
> @Override
> public void run(SourceContext<Tuple2<EnumKey, Integer>> ctx) throws
> Exception {
> running = true;
> Random random = new Random();
> while (running) {
> for (EnumKey e : EnumKey.values()) {
> ctx.collect(Tuple2.of(e, random.nextInt(10)));
> }
> Thread.sleep(1000);
> }
> }
> @Override
> public void cancel() {
> running = false;
> }
> }
> class StatefulCounter extends RichMapFunction<Tuple2<EnumKey, Integer>,
> Integer> {
> private transient ValueState<Integer> stateHandle;
> @Override
> public Integer map(Tuple2<EnumKey, Integer> value) throws Exception {
> LOG.info(getDebugMessage());
> Integer state = stateHandle.value();
> if (state == null) {
> state = 0;
> }
> state = state + value.f1;
> stateHandle.update(state);
> return state;
> }
> @Override
> public void open(Configuration configuration) {
> stateHandle =
> getRuntimeContext()
> .getState(
> new ValueStateDescriptor<>(
> JobThatFails.class.getSimpleName(),
> TypeInformation.of(Integer.class)));
> }
> private String getDebugMessage() {
> Object stateTable = JobThatFails.readField(stateHandle, "stateTable");
> Object keyContext = JobThatFails.readField(stateTable, "keyContext");
> Object currentNamespace = JobThatFails.readField(stateHandle,
> "currentNamespace");
> Object length = ((Object[]) JobThatFails.readField(stateTable,
> "keyGroupedStateMaps")).length;
> Object keyGroupOffset = JobThatFails.readField(stateTable,
> "keyGroupOffset");
> JobThatFails.readField(keyContext, "currentKeyGroupIndex");
> KeyGroupRange keyGroupRange =
> (KeyGroupRange) JobThatFails.readField(keyContext, "keyGroupRange");
> return String.format(
> "numberOfKeyGroups=%s currentKey=%s currentKeyGroupIndex=%s
> keyGroupOffset=%s currentNamespace=%s length=%s startKeyGroup=%s
> endKeyGroup=%s",
> JobThatFails.readField(keyContext, "numberOfKeyGroups"),
> JobThatFails.readField(keyContext, "currentKey"),
> JobThatFails.readField(keyContext, "currentKeyGroupIndex"),
> keyGroupOffset,
> currentNamespace,
> length,
> keyGroupRange.getStartKeyGroup(),
> keyGroupRange.getEndKeyGroup());
> }
> }
> static Object readField(Object object, String name) {
> try {
> Field field = getField(name, object.getClass());
> field.setAccessible(true);
> return field.get(object);
> } catch (IllegalAccessException | NoSuchFieldException |
> SecurityException e) {
> throw new RuntimeException(
> String.format(
> "Cannot read %s from %s %s", name,
> object.getClass().getSimpleName(), object),
> e);
> }
> }
> static Field getField(String name, Class<?> clazz) throws
> NoSuchFieldException {
> try {
> return clazz.getDeclaredField(name);
> } catch (NoSuchFieldException e) {
> if (clazz.getSuperclass() == null) {
> throw e;
> } else {
> return getField(name, clazz.getSuperclass());
> }
> }
> }
> public static void main(String[] args) throws Exception {
> JobThatFails job = new JobThatFails();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> job.defineGraph(env);
> JobExecutionResult result =
> env.execute(JobThatFails.class.getSimpleName());
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)