This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7dc0c6f562a7c87e26aa5d0f1f824c14505928c3 Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Jun 2 22:40:11 2021 +0200 [hotfix][state/changelog] Rename WriterFactory to Storage after adding Reader --- .../flink/runtime/state/changelog/SequenceNumber.java | 2 +- ...gWriterFactory.java => StateChangelogStorage.java} | 5 ++--- ...ryLoader.java => StateChangelogStorageLoader.java} | 12 ++++++------ ...actory.java => InMemoryStateChangelogStorage.java} | 8 ++++---- ...ink.runtime.state.changelog.StateChangelogStorage} | 2 +- ...Test.java => StateChangelogStorageLoaderTest.java} | 19 +++++++++---------- ...actoryTest.java => StateChangelogStorageTest.java} | 12 ++++++------ .../flink/state/changelog/ChangelogStateBackend.java | 8 +++----- 8 files changed, 32 insertions(+), 36 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java index c6556fa..413535f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java @@ -27,7 +27,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * A logical timestamp to draw a boundary between the materialized and non-materialized changes. * Maintained by the state backend but implementations may choose to move its generation to {@link - * StateChangelogWriterFactory} as an optimization. + * StateChangelogStorage} as an optimization. */ @Internal public interface SequenceNumber extends Comparable<SequenceNumber> { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java similarity index 87% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java index 072cd15..699d585 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java @@ -23,11 +23,10 @@ import org.apache.flink.runtime.state.KeyGroupRange; /** * A factory for {@link StateChangelogWriter} and {@link StateChangelogHandleReader}. Please use - * {@link StateChangelogWriterFactoryLoader} to obtain an instance. + * {@link StateChangelogStorageLoader} to obtain an instance. */ @Internal -public interface StateChangelogWriterFactory<Handle extends StateChangelogHandle> - extends AutoCloseable { +public interface StateChangelogStorage<Handle extends StateChangelogHandle> extends AutoCloseable { StateChangelogWriter<Handle> createWriter(String operatorID, KeyGroupRange keyGroupRange); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java similarity index 78% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java index 8e0bffa..c40ec57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java @@ -25,19 +25,19 @@ import java.util.ServiceLoader; import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterators.concat; -/** A thin wrapper around {@link PluginManager} to load {@link StateChangelogWriterFactory}. */ +/** A thin wrapper around {@link PluginManager} to load {@link StateChangelogStorage}. */ @Internal -public class StateChangelogWriterFactoryLoader { +public class StateChangelogStorageLoader { private final PluginManager pluginManager; - public StateChangelogWriterFactoryLoader(PluginManager pluginManager) { + public StateChangelogStorageLoader(PluginManager pluginManager) { this.pluginManager = pluginManager; } @SuppressWarnings({"rawtypes"}) - public Iterator<StateChangelogWriterFactory> load() { + public Iterator<StateChangelogStorage> load() { return concat( - pluginManager.load(StateChangelogWriterFactory.class), - ServiceLoader.load(StateChangelogWriterFactory.class).iterator()); + pluginManager.load(StateChangelogStorage.class), + ServiceLoader.load(StateChangelogStorage.class).iterator()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java similarity index 86% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java index 9383c26..2a751165 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.state.changelog.inmemory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader; -import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory; +import org.apache.flink.runtime.state.changelog.StateChangelogStorage; import org.apache.flink.util.CloseableIterator; -/** An in-memory (non-production) implementation of {@link StateChangelogWriterFactory}. */ -public class InMemoryStateChangelogWriterFactory - implements StateChangelogWriterFactory<InMemoryStateChangelogHandle> { +/** An in-memory (non-production) implementation of {@link StateChangelogStorage}. */ +public class InMemoryStateChangelogStorage + implements StateChangelogStorage<InMemoryStateChangelogHandle> { @Override public InMemoryStateChangelogWriter createWriter( diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogStorage similarity index 97% rename from flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory rename to flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogStorage index bf1a4d0..54f284c 100644 --- a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory +++ b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogStorage @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogWriterFactory +org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java similarity index 72% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java index 88d6f03..f1e5740 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.state.changelog.inmemory; import org.apache.flink.core.plugin.PluginManager; -import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory; -import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactoryLoader; +import org.apache.flink.runtime.state.changelog.StateChangelogStorage; +import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader; import org.junit.Test; @@ -31,12 +31,12 @@ import static org.apache.flink.shaded.curator4.com.google.common.collect.Immutab import static org.apache.flink.util.Preconditions.checkArgument; import static org.junit.Assert.assertTrue; -public class StateChangelogWriterFactoryLoaderTest { +public class StateChangelogStorageLoaderTest { @Test public void testLoadSpiImplementation() { assertTrue( - new StateChangelogWriterFactoryLoader(getPluginManager(emptyIterator())) + new StateChangelogStorageLoader(getPluginManager(emptyIterator())) .load() .hasNext()); } @@ -44,20 +44,19 @@ public class StateChangelogWriterFactoryLoaderTest { @Test @SuppressWarnings("rawtypes") public void testLoadPluginImplementation() { - StateChangelogWriterFactory<?> impl = new InMemoryStateChangelogWriterFactory(); + StateChangelogStorage<?> impl = new InMemoryStateChangelogStorage(); PluginManager pluginManager = getPluginManager(singletonList(impl).iterator()); - Iterator<StateChangelogWriterFactory> loaded = - new StateChangelogWriterFactoryLoader(pluginManager).load(); + Iterator<StateChangelogStorage> loaded = + new StateChangelogStorageLoader(pluginManager).load(); assertTrue(copyOf(loaded).contains(impl)); } - private PluginManager getPluginManager( - Iterator<? extends StateChangelogWriterFactory<?>> iterator) { + private PluginManager getPluginManager(Iterator<? extends StateChangelogStorage<?>> iterator) { return new PluginManager() { @Override public <P> Iterator<P> load(Class<P> service) { - checkArgument(service.equals(StateChangelogWriterFactory.class)); + checkArgument(service.equals(StateChangelogStorage.class)); //noinspection unchecked return (Iterator<P>) iterator; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java similarity index 92% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java index 01cc322..cb31c38 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java @@ -23,8 +23,8 @@ import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; import org.apache.flink.runtime.state.changelog.StateChangelogHandle; import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader; +import org.apache.flink.runtime.state.changelog.StateChangelogStorage; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; -import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory; import org.apache.flink.util.CloseableIterator; import org.junit.Rule; @@ -48,8 +48,8 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -/** {@link InMemoryStateChangelogWriterFactory} test. */ -public class StateChangelogWriterFactoryTest<T extends StateChangelogHandle> { +/** {@link InMemoryStateChangelogStorage} test. */ +public class StateChangelogStorageTest<T extends StateChangelogHandle> { private final Random random = new Random(); @@ -68,7 +68,7 @@ public class StateChangelogWriterFactoryTest<T extends StateChangelogHandle> { KeyGroupRange kgRange = KeyGroupRange.of(0, 5); Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange, 10, 20); - try (StateChangelogWriterFactory<T> client = getFactory(); + try (StateChangelogStorage<T> client = getFactory(); StateChangelogWriter<T> writer = client.createWriter(new OperatorID().toString(), kgRange)) { SequenceNumber prev = writer.initialSequenceNumber(); @@ -133,7 +133,7 @@ public class StateChangelogWriterFactoryTest<T extends StateChangelogHandle> { return bytes; } - protected StateChangelogWriterFactory<T> getFactory() { - return (StateChangelogWriterFactory<T>) new InMemoryStateChangelogWriterFactory(); + protected StateChangelogStorage<T> getFactory() { + return (StateChangelogStorage<T>) new InMemoryStateChangelogStorage(); } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java index a98ab5b..36a7a59 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogWriterFactory; +import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage; import org.apache.flink.runtime.state.delegate.DelegatingStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.Preconditions; @@ -108,8 +108,7 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab stateHandles, cancelStreamRegistry); // todo: FLINK-21804 get from Environment.getTaskStateManager - InMemoryStateChangelogWriterFactory changelogWriterFactory = - new InMemoryStateChangelogWriterFactory(); + InMemoryStateChangelogStorage changelogWriterFactory = new InMemoryStateChangelogStorage(); return new ChangelogKeyedStateBackend<>( keyedStateBackend, env.getExecutionConfig(), @@ -150,8 +149,7 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab managedMemoryFraction); // todo: FLINK-21804 get from Environment.getTaskStateManager - InMemoryStateChangelogWriterFactory changelogWriterFactory = - new InMemoryStateChangelogWriterFactory(); + InMemoryStateChangelogStorage changelogWriterFactory = new InMemoryStateChangelogStorage(); return new ChangelogKeyedStateBackend<>( keyedStateBackend, env.getExecutionConfig(),
