This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7dc0c6f562a7c87e26aa5d0f1f824c14505928c3
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jun 2 22:40:11 2021 +0200

    [hotfix][state/changelog] Rename WriterFactory to Storage after adding 
Reader
---
 .../flink/runtime/state/changelog/SequenceNumber.java |  2 +-
 ...gWriterFactory.java => StateChangelogStorage.java} |  5 ++---
 ...ryLoader.java => StateChangelogStorageLoader.java} | 12 ++++++------
 ...actory.java => InMemoryStateChangelogStorage.java} |  8 ++++----
 ...ink.runtime.state.changelog.StateChangelogStorage} |  2 +-
 ...Test.java => StateChangelogStorageLoaderTest.java} | 19 +++++++++----------
 ...actoryTest.java => StateChangelogStorageTest.java} | 12 ++++++------
 .../flink/state/changelog/ChangelogStateBackend.java  |  8 +++-----
 8 files changed, 32 insertions(+), 36 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java
index c6556fa..413535f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java
@@ -27,7 +27,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * A logical timestamp to draw a boundary between the materialized and 
non-materialized changes.
  * Maintained by the state backend but implementations may choose to move its 
generation to {@link
- * StateChangelogWriterFactory} as an optimization.
+ * StateChangelogStorage} as an optimization.
  */
 @Internal
 public interface SequenceNumber extends Comparable<SequenceNumber> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
similarity index 87%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
index 072cd15..699d585 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
@@ -23,11 +23,10 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 
 /**
  * A factory for {@link StateChangelogWriter} and {@link 
StateChangelogHandleReader}. Please use
- * {@link StateChangelogWriterFactoryLoader} to obtain an instance.
+ * {@link StateChangelogStorageLoader} to obtain an instance.
  */
 @Internal
-public interface StateChangelogWriterFactory<Handle extends 
StateChangelogHandle>
-        extends AutoCloseable {
+public interface StateChangelogStorage<Handle extends StateChangelogHandle> 
extends AutoCloseable {
 
     StateChangelogWriter<Handle> createWriter(String operatorID, KeyGroupRange 
keyGroupRange);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
similarity index 78%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
index 8e0bffa..c40ec57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
@@ -25,19 +25,19 @@ import java.util.ServiceLoader;
 
 import static 
org.apache.flink.shaded.guava18.com.google.common.collect.Iterators.concat;
 
-/** A thin wrapper around {@link PluginManager} to load {@link 
StateChangelogWriterFactory}. */
+/** A thin wrapper around {@link PluginManager} to load {@link 
StateChangelogStorage}. */
 @Internal
-public class StateChangelogWriterFactoryLoader {
+public class StateChangelogStorageLoader {
     private final PluginManager pluginManager;
 
-    public StateChangelogWriterFactoryLoader(PluginManager pluginManager) {
+    public StateChangelogStorageLoader(PluginManager pluginManager) {
         this.pluginManager = pluginManager;
     }
 
     @SuppressWarnings({"rawtypes"})
-    public Iterator<StateChangelogWriterFactory> load() {
+    public Iterator<StateChangelogStorage> load() {
         return concat(
-                pluginManager.load(StateChangelogWriterFactory.class),
-                
ServiceLoader.load(StateChangelogWriterFactory.class).iterator());
+                pluginManager.load(StateChangelogStorage.class),
+                ServiceLoader.load(StateChangelogStorage.class).iterator());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
similarity index 86%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
index 9383c26..2a751165 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
@@ -19,12 +19,12 @@ package org.apache.flink.runtime.state.changelog.inmemory;
 
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
-import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.util.CloseableIterator;
 
-/** An in-memory (non-production) implementation of {@link 
StateChangelogWriterFactory}. */
-public class InMemoryStateChangelogWriterFactory
-        implements StateChangelogWriterFactory<InMemoryStateChangelogHandle> {
+/** An in-memory (non-production) implementation of {@link 
StateChangelogStorage}. */
+public class InMemoryStateChangelogStorage
+        implements StateChangelogStorage<InMemoryStateChangelogHandle> {
 
     @Override
     public InMemoryStateChangelogWriter createWriter(
diff --git 
a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory
 
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogStorage
similarity index 97%
rename from 
flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory
rename to 
flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogStorage
index bf1a4d0..54f284c 100644
--- 
a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory
+++ 
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogStorage
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogWriterFactory
+org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
similarity index 72%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
index 88d6f03..f1e5740 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
@@ -18,8 +18,8 @@
 package org.apache.flink.runtime.state.changelog.inmemory;
 
 import org.apache.flink.core.plugin.PluginManager;
-import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
-import 
org.apache.flink.runtime.state.changelog.StateChangelogWriterFactoryLoader;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
 
 import org.junit.Test;
 
@@ -31,12 +31,12 @@ import static 
org.apache.flink.shaded.curator4.com.google.common.collect.Immutab
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertTrue;
 
-public class StateChangelogWriterFactoryLoaderTest {
+public class StateChangelogStorageLoaderTest {
 
     @Test
     public void testLoadSpiImplementation() {
         assertTrue(
-                new 
StateChangelogWriterFactoryLoader(getPluginManager(emptyIterator()))
+                new 
StateChangelogStorageLoader(getPluginManager(emptyIterator()))
                         .load()
                         .hasNext());
     }
@@ -44,20 +44,19 @@ public class StateChangelogWriterFactoryLoaderTest {
     @Test
     @SuppressWarnings("rawtypes")
     public void testLoadPluginImplementation() {
-        StateChangelogWriterFactory<?> impl = new 
InMemoryStateChangelogWriterFactory();
+        StateChangelogStorage<?> impl = new InMemoryStateChangelogStorage();
         PluginManager pluginManager = 
getPluginManager(singletonList(impl).iterator());
-        Iterator<StateChangelogWriterFactory> loaded =
-                new StateChangelogWriterFactoryLoader(pluginManager).load();
+        Iterator<StateChangelogStorage> loaded =
+                new StateChangelogStorageLoader(pluginManager).load();
         assertTrue(copyOf(loaded).contains(impl));
     }
 
-    private PluginManager getPluginManager(
-            Iterator<? extends StateChangelogWriterFactory<?>> iterator) {
+    private PluginManager getPluginManager(Iterator<? extends 
StateChangelogStorage<?>> iterator) {
         return new PluginManager() {
 
             @Override
             public <P> Iterator<P> load(Class<P> service) {
-                
checkArgument(service.equals(StateChangelogWriterFactory.class));
+                checkArgument(service.equals(StateChangelogStorage.class));
                 //noinspection unchecked
                 return (Iterator<P>) iterator;
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
similarity index 92%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index 01cc322..cb31c38 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -23,8 +23,8 @@ import 
org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
-import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
 import org.apache.flink.util.CloseableIterator;
 
 import org.junit.Rule;
@@ -48,8 +48,8 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
-/** {@link InMemoryStateChangelogWriterFactory} test. */
-public class StateChangelogWriterFactoryTest<T extends StateChangelogHandle> {
+/** {@link InMemoryStateChangelogStorage} test. */
+public class StateChangelogStorageTest<T extends StateChangelogHandle> {
 
     private final Random random = new Random();
 
@@ -68,7 +68,7 @@ public class StateChangelogWriterFactoryTest<T extends 
StateChangelogHandle> {
         KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
         Map<Integer, List<byte[]>> appendsByKeyGroup = 
generateAppends(kgRange, 10, 20);
 
-        try (StateChangelogWriterFactory<T> client = getFactory();
+        try (StateChangelogStorage<T> client = getFactory();
                 StateChangelogWriter<T> writer =
                         client.createWriter(new OperatorID().toString(), 
kgRange)) {
             SequenceNumber prev = writer.initialSequenceNumber();
@@ -133,7 +133,7 @@ public class StateChangelogWriterFactoryTest<T extends 
StateChangelogHandle> {
         return bytes;
     }
 
-    protected StateChangelogWriterFactory<T> getFactory() {
-        return (StateChangelogWriterFactory<T>) new 
InMemoryStateChangelogWriterFactory();
+    protected StateChangelogStorage<T> getFactory() {
+        return (StateChangelogStorage<T>) new InMemoryStateChangelogStorage();
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
index a98ab5b..36a7a59 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
-import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogWriterFactory;
+import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
 import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.Preconditions;
@@ -108,8 +108,7 @@ public class ChangelogStateBackend implements 
DelegatingStateBackend, Configurab
                                 stateHandles,
                                 cancelStreamRegistry);
         // todo: FLINK-21804 get from Environment.getTaskStateManager
-        InMemoryStateChangelogWriterFactory changelogWriterFactory =
-                new InMemoryStateChangelogWriterFactory();
+        InMemoryStateChangelogStorage changelogWriterFactory = new 
InMemoryStateChangelogStorage();
         return new ChangelogKeyedStateBackend<>(
                 keyedStateBackend,
                 env.getExecutionConfig(),
@@ -150,8 +149,7 @@ public class ChangelogStateBackend implements 
DelegatingStateBackend, Configurab
                                 managedMemoryFraction);
 
         // todo: FLINK-21804 get from Environment.getTaskStateManager
-        InMemoryStateChangelogWriterFactory changelogWriterFactory =
-                new InMemoryStateChangelogWriterFactory();
+        InMemoryStateChangelogStorage changelogWriterFactory = new 
InMemoryStateChangelogStorage();
         return new ChangelogKeyedStateBackend<>(
                 keyedStateBackend,
                 env.getExecutionConfig(),

Reply via email to