snuyanzin commented on code in PR #28561:
URL: https://github.com/apache/flink/pull/28561#discussion_r3488245966
##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java:
##########
@@ -112,89 +105,7 @@ void testMaxParallelismRespected(boolean asyncState)
throws Exception {
new KeyedStateReaderOperator<>(new ReaderFunction(),
Types.INT),
new ExecutionConfig());
KeyGroupRangeInputSplit[] splits = format.createInputSplits(129);
- Assert.assertEquals(
- "Failed to properly partition operator state into input
splits",
- 128,
- splits.length);
- }
-
- @ParameterizedTest(name = "Enable async state = {0}")
- @ValueSource(booleans = {false, true})
- void testExactKeyFilterPrunesInputSplits(boolean asyncState) throws
Exception {
- OperatorID operatorID = OperatorIDGenerator.fromUid("uid");
-
- OperatorSubtaskState state =
- createOperatorSubtaskState(createFlatMap(asyncState),
asyncState);
- OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 128);
- operatorState.putState(0, state);
-
- KeyedStateInputFormat<?, ?, ?> format =
- new KeyedStateInputFormat<>(
- operatorState,
- new HashMapStateBackend(),
- new Configuration(),
- new KeyedStateReaderOperator<>(new ReaderFunction(),
Types.INT),
- new ExecutionConfig(),
- SavepointKeyFilter.exact(5));
- KeyGroupRangeInputSplit[] splits = format.createInputSplits(10);
-
- assertThat(splits)
- .as("Single-key exact filter maps to exactly one key group
range")
- .hasSize(1);
- }
-
- @ParameterizedTest(name = "Enable async state = {0}")
- @ValueSource(booleans = {false, true})
- void testEmptyFilterProducesNoInputSplits(boolean asyncState) throws
Exception {
- OperatorID operatorID = OperatorIDGenerator.fromUid("uid");
-
- OperatorSubtaskState state =
- createOperatorSubtaskState(createFlatMap(asyncState),
asyncState);
- OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 128);
- operatorState.putState(0, state);
-
- KeyedStateInputFormat<?, ?, ?> format =
- new KeyedStateInputFormat<>(
- operatorState,
- new HashMapStateBackend(),
- new Configuration(),
- new KeyedStateReaderOperator<>(new ReaderFunction(),
Types.INT),
- new ExecutionConfig(),
- SavepointKeyFilter.empty());
- KeyGroupRangeInputSplit[] splits = format.createInputSplits(10);
-
- assertThat(splits).isEmpty();
- }
-
- @ParameterizedTest(name = "Enable async state = {0}")
- @ValueSource(booleans = {false, true})
- void testRangeFilterDoesNotPruneInputSplits(boolean asyncState) throws
Exception {
- OperatorID operatorID = OperatorIDGenerator.fromUid("uid");
-
- OperatorSubtaskState state =
- createOperatorSubtaskState(createFlatMap(asyncState),
asyncState);
- OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 128);
- operatorState.putState(0, state);
-
- KeyedStateInputFormat<?, ?, ?> formatRange =
- new KeyedStateInputFormat<>(
- operatorState,
- new HashMapStateBackend(),
- new Configuration(),
- new KeyedStateReaderOperator<>(new ReaderFunction(),
Types.INT),
- new ExecutionConfig(),
- SavepointKeyFilter.range(3, true, 7, true));
- KeyedStateInputFormat<?, ?, ?> formatNoFilter =
- new KeyedStateInputFormat<>(
- operatorState,
- new HashMapStateBackend(),
- new Configuration(),
- new KeyedStateReaderOperator<>(new ReaderFunction(),
Types.INT),
- new ExecutionConfig());
-
- assertThat(formatRange.createInputSplits(10))
- .as("Range filters cannot prune key-group splits")
- .hasSize(formatNoFilter.createInputSplits(10).length);
+ assertThat(splits.length).isEqualTo(128);
Review Comment:
why not has size of?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]