[
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057506#comment-17057506
]
Felix Wollschläger edited comment on FLINK-11774 at 3/12/20, 1:14 AM:
----------------------------------------------------------------------
Maybe updating the `KeySelector`-Interface could help us using Enum-Types
inside Tuples as keys by adding a `buildHashCode`-Method:
{code}
package example;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.Function;
import java.io.Serializable;
@FunctionalInterface
@Public
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN element) throws Exception;
default int buildHashCode(KEY key) throws Exception {
return key.hashCode();
}
}
{code}
For example:
{code}
import org.apache.flink.annotation.Public;
@FunctionalInterface
@Public
public interface HashStrategy<T> {
default void initialize() throws Exception {}
int buildHashCode(T key) throws Exception;
}
{code}
{code}
package example;
public enum EnumHashStrategy implements HashStrategy<Enum<?>> {
ORDINAL {
@Override
public int buildHashCode(Enum<?> value) {
return 31 * value.ordinal();
}
},
NAME {
@Override
public int buildHashCode(Enum<?> value) {
return value.name().hashCode();
}
}
}
{code}
{code}
package example;
import org.apache.flink.api.java.tuple.Tuple2;
import java.time.DayOfWeek;
public class MyKeySelector implements KeySelector<Tuple2<Tuple2<DayOfWeek,
Long>, String>, Tuple2<DayOfWeek, Long>> {
@Override
public Tuple2<DayOfWeek, Long> getKey(Tuple2<Tuple2<DayOfWeek, Long>,
String> element) throws Exception {
return element.f0;
}
@Override
public int buildHashCode(Tuple2<DayOfWeek, Long> key) throws Exception {
int result = EnumHashStrategy.ORDINAL.buildHashCode(key.f0);
return result * 31 + key.f1.hashCode();
}
}
{code}
My initial thoughts actually went a bit further than this:
{code}
package example;
public class ComposedHashStrategy<T> implements HashStrategy<T> {
private final Iterable<HashStrategy<Object>> hashStrategies;
public ComposedHashStrategy(Iterable<HashStrategy<Object>> hashStrategies) {
this.hashStrategies = hashStrategies;
}
@Override
public void initialize() throws Exception {
for (HashStrategy<Object> hashStrategy : this.hashStrategies) {
hashStrategy.initialize();
}
}
@Override
public int buildHashCode(T key) throws Exception {
int result = 0;
for (HashStrategy<Object> hashStrategy : this.hashStrategies) {
result = 31 * result + hashStrategy.buildHashCode(key);
}
return result;
}
}
{code}
{code}
package dev.codeflush.streaming;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.util.Arrays;
import java.util.List;
public class TypeAwareHashStrategy<T> implements HashStrategy<T> {
private final EnumHashStrategy enumHashStrategy;
private final TypeInformation<T> typeInformation;
private transient HashStrategy<Object> runtimeHashStrategy;
public TypeAwareHashStrategy(TypeInformation<T> typeInformation,
EnumHashStrategy enumHashStrategy) {
this.enumHashStrategy = enumHashStrategy;
this.typeInformation = typeInformation;
}
@Override
public void initialize() {
this.runtimeHashStrategy =
buildHashStrategyForClass(this.typeInformation.getTypeClass());
}
private HashStrategy<Object> buildHashStrategyForClass(Class<?> clazz) {
final HashStrategy<Object> result;
if (clazz.isEnum()) {
result = v -> this.enumHashStrategy.buildHashCode((Enum<?>) v);
} else if (clazz.isArray()) {
result = v -> Arrays.deepHashCode((Object[]) v);
} else if (clazz.isPrimitive()) {
result = Object::hashCode;
} else {
result = new
ComposedHashStrategy<>(buildHashStrategiesForFieldsOfClass(clazz));
}
return result;
}
private List<HashStrategy<Object>>
buildHashStrategiesForFieldsOfClass(Class<?> clazz) {
// build a list of HashStrategies
// one HashStrategy for every member field of the given class
(recursively until java.lang.Object is reached)
// using the enumStrategy if the member is an Enum-Type, some
Array-Strategy if its any array and so on
return null;
}
@Override
public int buildHashCode(T key) throws Exception {
return this.runtimeHashStrategy.buildHashCode(key);
}
}
{code}
{code}
package dev.codeflush.streaming;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
public abstract class TypeAwareKeySelector<IN, KEY> implements KeySelector<IN,
KEY>, RichFunction {
private final HashStrategy<KEY> hashStrategy;
public TypeAwareKeySelector(TypeInformation<KEY> keyTypeInformation,
EnumHashStrategy hashStrategy) {
this(new TypeAwareHashStrategy<>(keyTypeInformation, hashStrategy));
}
public TypeAwareKeySelector(HashStrategy<KEY> hashStrategy) {
this.hashStrategy = hashStrategy;
}
@Override
public void open(Configuration configuration) throws Exception {
this.hashStrategy.initialize();
}
@Override
public int buildHashCode(KEY key) throws Exception {
return this.hashStrategy.buildHashCode(key);
}
}
{code}
But when I was looking at the clock I decided I'd rather go to bed now :-)
was (Author: fwollsch):
Maybe updating the `KeySelector`-Interface could help us using Enum-Types
inside Tuples as keys by adding a `buildHashCode`-Method:
{code}
package example;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.Function;
import java.io.Serializable;
@FunctionalInterface
@Public
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN element) throws Exception;
default int buildHashCode(KEY key) throws Exception {
return key.hashCode();
}
}
{code}
For example:
{code}
import org.apache.flink.annotation.Public;
@FunctionalInterface
@Public
public interface HashStrategy<T> {
default void initialize() throws Exception {}
int buildHashCode(T key) throws Exception;
}
{code}
{code}
package example;
public enum EnumHashStrategy implements HashStrategy<Enum<?>> {
ORDINAL {
@Override
public int buildHashCode(Enum<?> value) {
return 31 * value.ordinal();
}
},
NAME {
@Override
public int buildHashCode(Enum<?> value) {
return value.name().hashCode();
}
}
}
{code}
{code}
package example;
import org.apache.flink.api.java.tuple.Tuple2;
import java.time.DayOfWeek;
public class MyKeySelector implements KeySelector<Tuple2<Tuple2<DayOfWeek,
Long>, String>, Tuple2<DayOfWeek, Long>> {
@Override
public Tuple2<DayOfWeek, Long> getKey(Tuple2<Tuple2<DayOfWeek, Long>,
String> element) throws Exception {
return element.f0;
}
@Override
public int buildHashCode(Tuple2<DayOfWeek, Long> key) throws Exception {
int result = EnumHashStrategy.ORDINAL.buildHashCode(key.f0);
return result * 31 + key.f1.hashCode();
}
}
{code}
> IllegalArgumentException in HeapPriorityQueueSet
> ------------------------------------------------
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>
> OS: macOS 10.14.3
> Java: 1.8.0_202
>
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
> Reporter: Kirill Vainer
> Priority: Blocker
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::new)
> .keyBy(Aggregate::getKey)
> .timeWindow(Time.seconds(2))
> .reduce(Aggregate::reduce)
> .addSink(new CollectSink());
> env.execute();
> }
> private static class Aggregate {
> private Key key = new Key();
> public Aggregate(long number) {
> }
> public static Aggregate reduce(Aggregate a, Aggregate b) {
> return new Aggregate(0);
> }
> public Key getKey() {
> return key;
> }
> }
> public static class Key {
> }
> private static class CollectSink implements SinkFunction<Aggregate> {
> private static final long serialVersionUID = 1;
> @SuppressWarnings("rawtypes")
> @Override
> public void invoke(Aggregate value, Context ctx) throws Exception {
> }
> }
> }
> {code}
> Attached is the project that can be executed with {{./gradlew run}} showing
> the problem, or you can run the attached {{flink-bug-dist.zip}} which is
> prepackaged with the dependencies.
> Thanks in advance
--
This message was sent by Atlassian Jira
(v8.3.4#803005)