curcur commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r655950985
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
##########
@@ -53,7 +53,8 @@
public abstract class AbstractKeyedStateBackend<K>
implements CheckpointableKeyedStateBackend<K>,
CheckpointListener,
- TestableKeyedStateBackend<K> {
+ TestableKeyedStateBackend<K>,
+ InternalKeyContext<K> {
Review comment:
Would you remind me what is the implemented `InternalKeyContext<K>`
interface used for in cp and recovery?
It seems nothing wrong if I remove it. Would you please remove it if not
relevant?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
##########
@@ -394,4 +395,9 @@ public boolean
requiresLegacySynchronousTimerSnapshots(CheckpointType checkpoint
final StateDescriptor<S, ?> stateDescriptor)
throws Exception;
}
+
+ @Override
+ public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+ keyContext.setCurrentKeyGroupIndex(currentKeyGroupIndex);
+ }
Review comment:
same as above
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
##########
@@ -19,14 +19,14 @@
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
-import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.util.CloseableIterator;
import java.io.Serializable;
-/** An in-memory (non-production) implementation of {@link
StateChangelogWriterFactory}. */
-public class InMemoryStateChangelogWriterFactory
- implements StateChangelogWriterFactory<InMemoryStateChangelogHandle>,
Serializable {
+/** An in-memory (non-production) implementation of {@link
StateChangelogStorage}. */
+public class InMemoryStateChangelogStorage
+ implements StateChangelogStorage<InMemoryStateChangelogHandle>,
Serializable {
Review comment:
`StateChangelogStorage` already Serializable
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateStateChangeLogApplier.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+
+class MapStateStateChangeLogApplier<K, N, UK, UV> extends
KvStateStateChangeApplier<K, N> {
Review comment:
Why this includes two `State` and names differently than other applier?
=> `MapStateChangeApplier` ?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+class ListStateChangeApplier<K, N, T> extends KvStateStateChangeApplier<K, N> {
+ private final InternalListState<K, N, T> state;
+
+ protected ListStateChangeApplier(
+ InternalKeyContext<K> keyContext, InternalListState<K, N, T>
state) {
+ super(keyContext);
+ this.state = state;
+ }
+
+ @Override
+ protected void applyStateUpdate(DataInputView in) throws Exception {
+ state.update(state.getValueSerializer().deserialize(in));
+ }
+
+ @Override
+ protected void applyStateUpdateInternal(DataInputView in) throws Exception
{
+ state.updateInternal(state.getValueSerializer().deserialize(in));
+ }
+
+ @Override
+ protected void applyStateAdded(DataInputView in) throws Exception {
+ state.addAll(state.getValueSerializer().deserialize(in));
+ }
+
+ @Override
+ protected void applyStateElementAdded(DataInputView in) throws Exception {
+ TypeSerializer<List<T>> valueSerializer = state.getValueSerializer();
+ if (valueSerializer instanceof ListSerializer) {
+ state.add(((ListSerializer<T>)
valueSerializer).getElementSerializer().deserialize(in));
+ } else {
+ state.addAll(valueSerializer.deserialize(in));
+ }
+ }
+
+ @Override
+ protected void applyStateElementAddedOrUpdated(DataInputView in) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected InternalKvState<K, N, ?> getState() {
+ return state;
+ }
+
+ @Override
+ protected void applyNameSpaceMerge(DataInputView in) throws Exception {
+ N target = state.getNamespaceSerializer().deserialize(in);
+ int sourcesSize = in.readInt();
+ Collection<N> sources = new ArrayList<>(sourcesSize);
+ for (int i = 0; i < sourcesSize; i++) {
+ sources.add(state.getNamespaceSerializer().deserialize(in));
+ }
+ state.mergeNamespaces(target, sources);
+ }
Review comment:
It seems the implementation of `applyNameSpaceMerge` in all three
places is the same.
Extract them to the abstract class KvState....?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+class ListStateChangeApplier<K, N, T> extends KvStateStateChangeApplier<K, N> {
+ private final InternalListState<K, N, T> state;
+
+ protected ListStateChangeApplier(
+ InternalKeyContext<K> keyContext, InternalListState<K, N, T>
state) {
+ super(keyContext);
+ this.state = state;
+ }
+
+ @Override
+ protected void applyStateUpdate(DataInputView in) throws Exception {
+ state.update(state.getValueSerializer().deserialize(in));
+ }
+
+ @Override
+ protected void applyStateUpdateInternal(DataInputView in) throws Exception
{
+ state.updateInternal(state.getValueSerializer().deserialize(in));
+ }
+
+ @Override
+ protected void applyStateAdded(DataInputView in) throws Exception {
+ state.addAll(state.getValueSerializer().deserialize(in));
+ }
+
+ @Override
+ protected void applyStateElementAdded(DataInputView in) throws Exception {
+ TypeSerializer<List<T>> valueSerializer = state.getValueSerializer();
+ if (valueSerializer instanceof ListSerializer) {
+ state.add(((ListSerializer<T>)
valueSerializer).getElementSerializer().deserialize(in));
+ } else {
+ state.addAll(valueSerializer.deserialize(in));
+ }
+ }
Review comment:
`valueSerializer` is assumed to always be `ListSerializer<T>` in
`ChangelogListState.add()`. Why it is different here?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateStateChangeApplier.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.state.changelog.StateChangeOperation;
+
+abstract class KvStateStateChangeApplier<K, N> implements StateChangeApplier {
Review comment:
`KvStateStateChangeApplier` => `KvStateChangeApplier`?
it is strange two consecutive state together, LOL
BTW, I like how this class is structured.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
##########
@@ -151,10 +160,11 @@ private void logMetaIfNeeded() throws IOException {
throws IOException {
return serializeRaw(
wrapper -> {
- wrapper.writeByte(op.code);
+ wrapper.writeByte(op.getCode());
// todo: optimize in FLINK-22944 by either writing short
code or grouping and
// writing once (same for key, ns)
wrapper.writeUTF(metaInfo.getName());
+ wrapper.writeByte(stateType.getCode());
Review comment:
Is it necessary that each entry has to log the state type?
Can different state types (KV, PQ, OP...) write to the same log?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+
+class ValueStateChangeApplier<K, N, T> extends KvStateStateChangeApplier<K, N>
{
+ private final InternalValueState<K, N, T> state;
+
+ protected ValueStateChangeApplier(
+ InternalKeyContext<K> keyContext, InternalValueState<K, N, T>
state) {
+ super(keyContext);
+ this.state = state;
+ }
+
+ @Override
+ protected void applyStateUpdate(DataInputView in) throws Exception {
+ state.update(state.getValueSerializer().deserialize(in));
+ }
+
+ @Override
+ protected void applyStateUpdateInternal(DataInputView in) throws Exception
{
+ applyStateUpdate(in);
+ }
Review comment:
Same as `applyStateUpdate`, I remember you said in the last PR that
there are slight differences in between? Or I even do not find this method in
`ChangelogValueState`
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+
+class ValueStateChangeApplier<K, N, T> extends KvStateStateChangeApplier<K, N>
{
+ private final InternalValueState<K, N, T> state;
+
+ protected ValueStateChangeApplier(
+ InternalKeyContext<K> keyContext, InternalValueState<K, N, T>
state) {
+ super(keyContext);
+ this.state = state;
+ }
+
+ @Override
+ protected void applyStateUpdate(DataInputView in) throws Exception {
+ state.update(state.getValueSerializer().deserialize(in));
+ }
+
+ @Override
+ protected void applyStateUpdateInternal(DataInputView in) throws Exception
{
+ applyStateUpdate(in);
+ }
+
+ @Override
+ protected void applyStateAdded(DataInputView in) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void applyStateElementAdded(DataInputView in) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void applyStateElementAddedOrUpdated(DataInputView in) throws
Exception {
+ state.update(state.getValueSerializer().deserialize(in));
+ }
Review comment:
If I remember correctly, this is not used in ValueState?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]