pnowojski commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1447579407


##########
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java:
##########
@@ -58,42 +46,16 @@ final class StubStateBackend implements StateBackend {
 
     @Override
     public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
-            Environment env,
-            JobID jobID,
-            String operatorIdentifier,
-            TypeSerializer<K> keySerializer,
-            int numberOfKeyGroups,
-            KeyGroupRange keyGroupRange,
-            TaskKvStateRegistry kvStateRegistry,
-            TtlTimeProvider ttlTimeProvider,
-            MetricGroup metricGroup,
-            @Nonnull Collection<KeyedStateHandle> stateHandles,
-            CloseableRegistry cancelStreamRegistry)
-            throws Exception {
-
+            KeyedStateBackendParameters<K> parameters) throws Exception {
         return backend.createKeyedStateBackend(
-                env,
-                jobID,
-                operatorIdentifier,
-                keySerializer,
-                numberOfKeyGroups,
-                keyGroupRange,
-                kvStateRegistry,
-                this.ttlTimeProvider,
-                metricGroup,
-                stateHandles,
-                cancelStreamRegistry);
+                new KeyedStateBackendParametersImpl<>(parameters)
+                        .setTtlTimeProvider(ttlTimeProvider));

Review Comment:
   bug was fixed here



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogStateBackend.java:
##########
@@ -83,89 +77,25 @@ public abstract class AbstractChangelogStateBackend
 
     @Override
     public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
-            Environment env,
-            JobID jobID,
-            String operatorIdentifier,
-            TypeSerializer<K> keySerializer,
-            int numberOfKeyGroups,
-            KeyGroupRange keyGroupRange,
-            TaskKvStateRegistry kvStateRegistry,
-            TtlTimeProvider ttlTimeProvider,
-            MetricGroup metricGroup,
-            @Nonnull Collection<KeyedStateHandle> stateHandles,
-            CloseableRegistry cancelStreamRegistry)
-            throws Exception {
+            KeyedStateBackendParameters<K> parameters) throws Exception {
         return restore(
-                env,
-                operatorIdentifier,
-                keyGroupRange,
-                ttlTimeProvider,
-                metricGroup,
-                castHandles(stateHandles),
+                parameters.getEnv(),
+                parameters.getOperatorIdentifier(),
+                parameters.getKeyGroupRange(),
+                parameters.getTtlTimeProvider(),
+                parameters.getMetricGroup(),
+                castHandles(parameters.getStateHandles()),
                 baseHandles ->
                         (AbstractKeyedStateBackend<K>)
                                 delegatedStateBackend.createKeyedStateBackend(
-                                        env,
-                                        jobID,
-                                        operatorIdentifier,
-                                        keySerializer,
-                                        numberOfKeyGroups,
-                                        keyGroupRange,
-                                        kvStateRegistry,
-                                        ttlTimeProvider,
-                                        metricGroup,
-                                        baseHandles,
-                                        cancelStreamRegistry));
-    }
-
-    @Override
-    public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
-            Environment env,
-            JobID jobID,
-            String operatorIdentifier,
-            TypeSerializer<K> keySerializer,
-            int numberOfKeyGroups,
-            KeyGroupRange keyGroupRange,
-            TaskKvStateRegistry kvStateRegistry,
-            TtlTimeProvider ttlTimeProvider,
-            MetricGroup metricGroup,
-            @Nonnull Collection<KeyedStateHandle> stateHandles,
-            CloseableRegistry cancelStreamRegistry,
-            double managedMemoryFraction)
-            throws Exception {
-        return restore(
-                env,
-                operatorIdentifier,
-                keyGroupRange,
-                ttlTimeProvider,
-                metricGroup,
-                castHandles(stateHandles),
-                baseHandles ->
-                        (AbstractKeyedStateBackend<K>)
-                                delegatedStateBackend.createKeyedStateBackend(
-                                        env,
-                                        jobID,
-                                        operatorIdentifier,
-                                        keySerializer,
-                                        numberOfKeyGroups,
-                                        keyGroupRange,
-                                        kvStateRegistry,
-                                        ttlTimeProvider,
-                                        metricGroup,
-                                        baseHandles,
-                                        cancelStreamRegistry,
-                                        managedMemoryFraction));
+                                        new 
KeyedStateBackendParametersImpl(parameters)
+                                                
.setStateHandles(baseHandles)));

Review Comment:
   Bug fix is here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to