asfgit closed pull request #6413: [FLINK-8993] [tests] Let general purpose
DataStream job uses KryoSerializer via type extraction
URL: https://github.com/apache/flink/pull/6413
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 123807297ef..fb92960bb86 100644
---
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -22,6 +22,8 @@
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
@@ -382,13 +384,38 @@ static boolean isSimulateFailures(ParameterTool pt) {
static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT>
createArtificialKeyedStateMapper(
MapFunction<IN, OUT> mapFunction,
JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
- List<TypeSerializer<STATE>> stateSerializers) {
+ List<TypeSerializer<STATE>> stateSerializers,
+ List<Class<STATE>> stateClasses) {
List<ArtificialStateBuilder<IN>> artificialStateBuilders = new
ArrayList<>(stateSerializers.size());
for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
-
artificialStateBuilders.add(createValueStateBuilder(inputAndOldStateToNewState,
typeSerializer));
-
artificialStateBuilders.add(createListStateBuilder(inputAndOldStateToNewState,
typeSerializer));
+ artificialStateBuilders.add(createValueStateBuilder(
+ inputAndOldStateToNewState,
+ new ValueStateDescriptor<>(
+ "valueState-" +
typeSerializer.getClass().getSimpleName(),
+ typeSerializer)));
+
+ artificialStateBuilders.add(createListStateBuilder(
+ inputAndOldStateToNewState,
+ new ListStateDescriptor<>(
+ "listState-" +
typeSerializer.getClass().getSimpleName(),
+ typeSerializer)));
}
+
+ for (Class<STATE> stateClass : stateClasses) {
+ artificialStateBuilders.add(createValueStateBuilder(
+ inputAndOldStateToNewState,
+ new ValueStateDescriptor<>(
+ "valueState-" +
stateClass.getSimpleName(),
+ stateClass)));
+
+ artificialStateBuilders.add(createListStateBuilder(
+ inputAndOldStateToNewState,
+ new ListStateDescriptor<>(
+ "listState-" +
stateClass.getSimpleName(),
+ stateClass)));
+ }
+
return new ArtificialKeyedStateMapper<>(mapFunction,
artificialStateBuilders);
}
@@ -400,17 +427,17 @@ static boolean isSimulateFailures(ParameterTool pt) {
static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
- TypeSerializer<STATE> typeSerializer) {
+ ValueStateDescriptor<STATE> valueStateDescriptor) {
return new ArtificialValueStateBuilder<>(
- "valueState-" +
typeSerializer.getClass().getSimpleName(),
+ valueStateDescriptor.getName(),
inputAndOldStateToNewState,
- typeSerializer);
+ valueStateDescriptor);
}
static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
- TypeSerializer<STATE> typeSerializer) {
+ ListStateDescriptor<STATE> listStateDescriptor) {
JoinFunction<IN, Iterable<STATE>, List<STATE>>
listStateGenerator = (first, second) -> {
List<STATE> newState = new ArrayList<>();
@@ -421,9 +448,9 @@ static boolean isSimulateFailures(ParameterTool pt) {
};
return new ArtificialListStateBuilder<>(
- "listState-" +
typeSerializer.getClass().getSimpleName(),
+ listStateDescriptor.getName(),
listStateGenerator,
listStateGenerator,
- typeSerializer);
+ listStateDescriptor);
}
}
diff --git
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index ea90e655103..30c1c24ee3f 100644
---
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -86,7 +86,8 @@ public static void main(String[] args) throws Exception {
return new
ComplexPayload(first, KEYED_STATE_OPER_NAME);
},
Collections.singletonList(
- new
KryoSerializer<>(ComplexPayload.class, env.getConfig()))
+ new
KryoSerializer<>(ComplexPayload.class, env.getConfig())), // custom
KryoSerializer
+
Collections.singletonList(ComplexPayload.class) // KryoSerializer via type
extraction
)
)
.name(KEYED_STATE_OPER_NAME)
diff --git
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
index a2c63877dd3..b29e535cfb2 100644
---
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
+++
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
@@ -21,8 +21,8 @@
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.Preconditions;
import java.util.List;
@@ -35,7 +35,7 @@
private transient ListState<STATE> listOperatorState;
private transient ListState<STATE> listKeyedState;
- private final TypeSerializer<STATE> typeSerializer;
+ private final ListStateDescriptor<STATE> listStateDescriptor;
private final JoinFunction<IN, Iterable<STATE>, List<STATE>>
keyedStateGenerator;
private final JoinFunction<IN, Iterable<STATE>, List<STATE>>
operatorStateGenerator;
@@ -43,11 +43,11 @@ public ArtificialListStateBuilder(
String stateName,
JoinFunction<IN, Iterable<STATE>, List<STATE>>
keyedStateGenerator,
JoinFunction<IN, Iterable<STATE>, List<STATE>>
operatorStateGenerator,
- TypeSerializer<STATE> typeSerializer) {
+ ListStateDescriptor<STATE> listStateDescriptor) {
super(stateName);
- this.typeSerializer = typeSerializer;
- this.keyedStateGenerator = keyedStateGenerator;
- this.operatorStateGenerator = operatorStateGenerator;
+ this.listStateDescriptor =
Preconditions.checkNotNull(listStateDescriptor);
+ this.keyedStateGenerator =
Preconditions.checkNotNull(keyedStateGenerator);
+ this.operatorStateGenerator =
Preconditions.checkNotNull(operatorStateGenerator);
}
@Override
@@ -58,7 +58,6 @@ public void artificialStateForElement(IN event) throws
Exception {
@Override
public void initialize(FunctionInitializationContext
initializationContext) throws Exception {
- ListStateDescriptor<STATE> listStateDescriptor = new
ListStateDescriptor<>(stateName, typeSerializer);
listOperatorState =
initializationContext.getOperatorStateStore().getListState(listStateDescriptor);
listKeyedState =
initializationContext.getKeyedStateStore().getListState(listStateDescriptor);
}
diff --git
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
index 6d74e0964f5..421a682351d 100644
---
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
+++
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
@@ -21,8 +21,8 @@
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.Preconditions;
/**
* An {@link ArtificialStateBuilder} for user {@link ValueState}s.
@@ -32,16 +32,16 @@
private static final long serialVersionUID = -1205814329756790916L;
private transient ValueState<STATE> valueState;
- private final TypeSerializer<STATE> typeSerializer;
+ private final ValueStateDescriptor<STATE> valueStateDescriptor;
private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
public ArtificialValueStateBuilder(
String stateName,
JoinFunction<IN, STATE, STATE> stateValueGenerator,
- TypeSerializer<STATE> typeSerializer) {
+ ValueStateDescriptor<STATE> valueStateDescriptor) {
super(stateName);
- this.typeSerializer = typeSerializer;
- this.stateValueGenerator = stateValueGenerator;
+ this.valueStateDescriptor =
Preconditions.checkNotNull(valueStateDescriptor);
+ this.stateValueGenerator =
Preconditions.checkNotNull(stateValueGenerator);
}
@Override
@@ -51,8 +51,6 @@ public void artificialStateForElement(IN event) throws
Exception {
@Override
public void initialize(FunctionInitializationContext
initializationContext) {
- ValueStateDescriptor<STATE> valueStateDescriptor =
- new ValueStateDescriptor<>(stateName, typeSerializer);
valueState =
initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
}
}
diff --git
a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
index 0b3b5ed4b89..4f77f954d3e 100644
---
a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
+++
b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -84,8 +84,8 @@ public static void main(String[] args) throws Exception {
Collections.singletonList(new
KryoSerializer<>(ComplexPayload.class, env.getConfig()));
KeyedStream<Event, Integer> afterStatefulOperations =
isOriginalJobVariant(pt) ?
- applyOriginalStatefulOperations(source, stateSer) :
- applyUpgradedStatefulOperations(source, stateSer);
+ applyOriginalStatefulOperations(source, stateSer,
Collections.emptyList()) :
+ applyUpgradedStatefulOperations(source, stateSer,
Collections.emptyList());
afterStatefulOperations
.flatMap(createSemanticsCheckMapper(pt))
@@ -109,26 +109,29 @@ private static boolean isOriginalJobVariant(final
ParameterTool pt) {
private static KeyedStream<Event, Integer>
applyOriginalStatefulOperations(
KeyedStream<Event, Integer> source,
- List<TypeSerializer<ComplexPayload>> stateSer) {
- source = applyTestStatefulOperator("stateMap1",
simpleStateUpdate("stateMap1"), source, stateSer);
- return applyTestStatefulOperator("stateMap2",
lastStateUpdate("stateMap2"), source, stateSer);
+ List<TypeSerializer<ComplexPayload>> stateSer,
+ List<Class<ComplexPayload>> stateClass) {
+ source = applyTestStatefulOperator("stateMap1",
simpleStateUpdate("stateMap1"), source, stateSer, stateClass);
+ return applyTestStatefulOperator("stateMap2",
lastStateUpdate("stateMap2"), source, stateSer, stateClass);
}
private static KeyedStream<Event, Integer>
applyUpgradedStatefulOperations(
KeyedStream<Event, Integer> source,
- List<TypeSerializer<ComplexPayload>> stateSer) {
- source = applyTestStatefulOperator("stateMap2",
simpleStateUpdate("stateMap2"), source, stateSer);
- source = applyTestStatefulOperator("stateMap1",
lastStateUpdate("stateMap1"), source, stateSer);
- return applyTestStatefulOperator("stateMap3",
simpleStateUpdate("stateMap3"), source, stateSer);
+ List<TypeSerializer<ComplexPayload>> stateSer,
+ List<Class<ComplexPayload>> stateClass) {
+ source = applyTestStatefulOperator("stateMap2",
simpleStateUpdate("stateMap2"), source, stateSer, stateClass);
+ source = applyTestStatefulOperator("stateMap1",
lastStateUpdate("stateMap1"), source, stateSer, stateClass);
+ return applyTestStatefulOperator("stateMap3",
simpleStateUpdate("stateMap3"), source, stateSer, stateClass);
}
private static KeyedStream<Event, Integer> applyTestStatefulOperator(
String name,
JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
KeyedStream<Event, Integer> source,
- List<TypeSerializer<ComplexPayload>> stateSer) {
+ List<TypeSerializer<ComplexPayload>> stateSer,
+ List<Class<ComplexPayload>> stateClass) {
return source
- .map(createArtificialKeyedStateMapper(e -> e,
stateFunc, stateSer))
+ .map(createArtificialKeyedStateMapper(e -> e,
stateFunc, stateSer, stateClass))
.name(name)
.uid(name)
.returns(Event.class)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services