dawidwys commented on a change in pull request #15204:
URL: https://github.com/apache/flink/pull/15204#discussion_r595956973



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java
##########
@@ -50,7 +51,7 @@
     private final TypeSerializer<K> keySerializer;
     private final int totalKeyGroups;
 
-    private HeapSnapshotResources(
+    protected HeapSnapshotResources(

Review comment:
       Why is it protected? The class is final...
   
   Could we use composition instead of inheritance? Inheritance is pretty much 
always a wrong choice if used for code deduplication, as it creates complex 
hierarchies that are later on loosely coupled. I think we can pass the method 
`snapshot(long, CheckpointOptions, StateSnapshotRestore)` instead of extending 
the class.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointableKeyedStateBackend.java
##########
@@ -45,5 +46,6 @@
      * write out a savepoint in the common/unified format.
      */
     @Nonnull
-    SavepointResources<K> savepoint() throws Exception;
+    SavepointResources<K> savepoint(long checkpointId, CheckpointOptions 
checkpointOptions)

Review comment:
       Is it really necessary to have the two arguments in this method? 
Especially the `CheckpointOptions`? Can't we build the SavepointResources in a 
way that we don't need these parameters?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
##########
@@ -18,53 +18,47 @@
 
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.StateSerializerProvider;
-import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.function.SupplierWithException;
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
-import java.io.OutputStream;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.createDuplicatingStream;
 import static 
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.createSimpleStream;
-import static 
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult;
 
 /** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
-class HeapSnapshotStrategy<K>
+@Internal
+public class HeapSnapshotStrategy<K>
         implements SnapshotStrategy<KeyedStateHandle, 
HeapSnapshotResources<K>> {
 
-    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
-    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates;
-    private final StreamCompressionDecorator keyGroupCompressionDecorator;
-    private final LocalRecoveryConfig localRecoveryConfig;
-    private final KeyGroupRange keyGroupRange;
-    private final StateSerializerProvider<K> keySerializerProvider;
-    private final int totalKeyGroups;
+    protected final Map<String, StateTable<K, ?, ?>> registeredKVStates;

Review comment:
       Let's not do this. For me it's a clear indication we should not use 
inheritance here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to