Myasuka commented on a change in pull request #15420: URL: https://github.com/apache/flink/pull/15420#discussion_r656907721
########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters; +import org.apache.flink.state.changelog.ChangelogKeyedStateBackend; +import org.apache.flink.state.changelog.ChangelogState; +import org.apache.flink.state.changelog.StateChangeOperation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE; +import static org.apache.flink.state.changelog.StateChangeOperation.METADATA; +import static org.apache.flink.state.changelog.restore.FunctionDelegationHelper.delegateAggregateFunction; +import static org.apache.flink.state.changelog.restore.FunctionDelegationHelper.delegateReduceFunction; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Applies {@link StateChange}'s to a {@link ChangelogKeyedStateBackend}. */ Review comment: A typo, I think the correct javadoc should be `Applies {@link StateChange}s to a {@link ChangelogKeyedStateBackend}.` ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ########## @@ -369,7 +509,73 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { // Factory function interface private interface StateFactory { <K, N, SV, S extends State, IS extends S> IS create( - InternalKvState<K, N, SV> kvState, KvStateChangeLogger<SV, N> changeLogger) + InternalKvState<K, N, SV> kvState, + KvStateChangeLogger<SV, N> changeLogger, + InternalKeyContext<K> keyContext) throws Exception; } + + /** + * @param name state name + * @param type state type (the only supported type currently are: {@link + * BackendStateType#KEY_VALUE key value}, {@link BackendStateType#PRIORITY_QUEUE priority + * queue}) + * @return an existing state, i.e. the one that was already created + * @throws NoSuchElementException if the state wasn't created + * @throws UnsupportedOperationException if state type is not supported + */ + public ChangelogState getExistingState(String name, BackendStateType type) + throws NoSuchElementException, UnsupportedOperationException { + ChangelogState state; + switch (type) { + case KEY_VALUE: + state = (ChangelogState) keyValueStatesByName.get(name); Review comment: I think there exists something wrong here. `keyValueStatesByName` which is like `AbstractKeyedStateBackend#keyValueStatesByName`, is only updated during `KeyedStateBackend#getOrCreateKeyedState`. However, this place is not somewhere we restore and build the keyed state backend but only for the first user access. For RocksDB state backend, `kvStateInformation` is the actual place to store restored state. And for heap keyed state backend, `registeredKVStates` is the place to store restored data. That is to say, we cannot get existing state when try to apply state changes during restore. Moreover, we also need to consider state migration, the state changes should be appiled by previous state descriptor (mainly the value seralizer) not current state descriptior (as they might not be the same). ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ########## @@ -116,16 +140,54 @@ /** For caching the last accessed partitioned state. */ private String lastName; + private final FunctionDelegationHelper functionDelegationHelper = + new FunctionDelegationHelper(); + + /** Updated initially on restore and later upon materialization (after FLINK-21356). */ + private final List<KeyedStateHandle> materialized = new ArrayList<>(); + + /** Updated initially on restore and later cleared upon materialization (after FLINK-21356). */ Review comment: What does `after FLINK-21356` means? If I understand correctly, this `restoredNonMaterialized` would be cleared upon materialization which might be implemented in FLINK-21357, right? If so, why not change the doc to `in FLINK-21357`? The same comment for `materialized` field. ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters; +import org.apache.flink.state.changelog.ChangelogKeyedStateBackend; +import org.apache.flink.state.changelog.ChangelogState; +import org.apache.flink.state.changelog.StateChangeOperation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE; +import static org.apache.flink.state.changelog.StateChangeOperation.METADATA; +import static org.apache.flink.state.changelog.restore.FunctionDelegationHelper.delegateAggregateFunction; +import static org.apache.flink.state.changelog.restore.FunctionDelegationHelper.delegateReduceFunction; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Applies {@link StateChange}'s to a {@link ChangelogKeyedStateBackend}. */ +@SuppressWarnings({"rawtypes", "unchecked"}) +class ChangelogBackendLogApplier { + private static final Logger LOG = LoggerFactory.getLogger(ChangelogBackendLogApplier.class); + + public static void apply( + StateChange stateChange, + ChangelogKeyedStateBackend<?> changelogBackend, + ClassLoader classLoader) + throws Exception { + DataInputViewStreamWrapper in = + new DataInputViewStreamWrapper(new ByteArrayInputStream(stateChange.getChange())); + applyOperation( + StateChangeOperation.byCode(in.readByte()), + stateChange.getKeyGroup(), + changelogBackend, + in, + classLoader, + new ChangelogApplierFactoryImpl()); Review comment: I think a singleton instance of `ChangelogApplierFactoryImpl` should be better here. ########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ########## @@ -116,16 +140,54 @@ /** For caching the last accessed partitioned state. */ private String lastName; + private final FunctionDelegationHelper functionDelegationHelper = + new FunctionDelegationHelper(); + + /** Updated initially on restore and later upon materialization (after FLINK-21356). */ + private final List<KeyedStateHandle> materialized = new ArrayList<>(); + + /** Updated initially on restore and later cleared upon materialization (after FLINK-21356). */ + private final List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>(); + + /** + * {@link SequenceNumber} denoting last upload range <b>start</b>, inclusive. Updated to {@link + * #materializedTo} when {@link #snapshot(long, long, CheckpointStreamFactory, + * CheckpointOptions) starting snapshot}. Used to notify {@link #stateChangelogWriter} about + * changelog ranges that were confirmed or aborted by JM. + */ + @Nullable private SequenceNumber lastUploadedFrom; + /** + * {@link SequenceNumber} denoting last upload range <b>end</b>, exclusive. Updated to {@link + * org.apache.flink.runtime.state.changelog.StateChangelogWriter#lastAppendedSequenceNumber} + * when {@link #snapshot(long, long, CheckpointStreamFactory, CheckpointOptions) starting + * snapshot}. Used to notify {@link #stateChangelogWriter} about changelog ranges that were + * confirmed or aborted by JM. + */ + @Nullable private SequenceNumber lastUploadedTo; + /** + * The {@link SequenceNumber} up to which the state is materialized, exclusive. The log should + * be truncated accordingly. + * + * <p>WARN: currently not updated - to be changed in FLINK-21356. Review comment: This PR targets for FLINK-21356 and field `materializedTo` is still not updated, this really confuse me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
