Myasuka commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r646164327



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import static 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType.KEY_VALUE;
+import static 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE;
+import static 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION;
+import static org.apache.flink.state.changelog.StateChangeOperation.ADD;
+import static 
org.apache.flink.state.changelog.StateChangeOperation.CHANGE_ELEMENT;
+import static org.apache.flink.state.changelog.StateChangeOperation.CLEAR;
+import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
+import static 
org.apache.flink.state.changelog.StateChangeOperation.REMOVE_ELEMENT;
+import static org.apache.flink.state.changelog.StateChangeOperation.SET;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+abstract class AbstractStateChangeLogger<Key, State, Ns> implements 
StateChangeLogger<State, Ns> {
+    private static final int COMMON_KEY_GROUP = -1;
+    protected final StateChangelogWriter<?> stateChangelogWriter;
+    protected final InternalKeyContext<Key> keyContext;
+    protected final RegisteredStateMetaInfoBase metaInfo;
+    private final StateMetaInfoSnapshot.BackendStateType stateType;
+    private boolean metaDataWritten = false;
+
+    public AbstractStateChangeLogger(
+            StateChangelogWriter<?> stateChangelogWriter,
+            InternalKeyContext<Key> keyContext,
+            RegisteredStateMetaInfoBase metaInfo) {
+        this.stateChangelogWriter = checkNotNull(stateChangelogWriter);
+        this.keyContext = checkNotNull(keyContext);
+        this.metaInfo = checkNotNull(metaInfo);
+        if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
+            this.stateType = KEY_VALUE;
+        } else if (metaInfo instanceof 
RegisteredPriorityQueueStateBackendMetaInfo) {
+            this.stateType = PRIORITY_QUEUE;
+        } else {
+            throw new IllegalArgumentException("Unsupported state type: " + 
metaInfo);
+        }
+    }
+
+    @Override
+    public void stateUpdated(State newState, Ns ns) throws IOException {
+        if (newState == null) {
+            stateCleared(ns);
+        } else {
+            log(SET, out -> serializeState(newState, out), ns);
+        }
+    }
+
+    protected abstract void serializeState(State state, 
DataOutputViewStreamWrapper out)
+            throws IOException;
+
+    @Override
+    public void stateAdded(State addedState, Ns ns) throws IOException {
+        log(ADD, out -> serializeState(addedState, out), ns);
+    }
+
+    @Override
+    public void stateCleared(Ns ns) throws IOException {
+        log(CLEAR, out -> {}, ns);
+    }
+
+    @Override
+    public void stateElementChanged(
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Ns ns)
+            throws IOException {
+        log(CHANGE_ELEMENT, dataSerializer, ns);
+    }
+
+    @Override
+    public void stateElementRemoved(
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Ns ns)
+            throws IOException {
+        log(REMOVE_ELEMENT, dataSerializer, ns);
+    }
+
+    protected void log(
+            StateChangeOperation op,
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter,
+            Ns ns)
+            throws IOException {
+        logMetaIfNeeded();
+        stateChangelogWriter.append(
+                keyContext.getCurrentKeyGroupIndex(), serialize(op, ns, 
dataWriter));
+    }
+
+    private void logMetaIfNeeded() throws IOException {
+        if (metaDataWritten) {
+            return;
+        }
+        stateChangelogWriter.append(
+                COMMON_KEY_GROUP,
+                serializeRaw(
+                        out -> {
+                            out.writeByte(METADATA.getCode());
+                            
out.writeInt(CURRENT_STATE_META_INFO_SNAPSHOT_VERSION);
+                            StateMetaInfoSnapshotReadersWriters.getWriter()
+                                    
.writeStateMetaInfoSnapshot(metaInfo.snapshot(), out);
+                        }));
+        metaDataWritten = true;
+    }
+
+    private byte[] serialize(
+            StateChangeOperation op,
+            Ns ns,
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter)
+            throws IOException {
+        return serializeRaw(
+                wrapper -> {
+                    wrapper.writeByte(op.getCode());
+                    // todo: wrapper.writeShort(stateId); and/or sort and 
write once (same for key,
+                    // ns?)
+                    wrapper.writeUTF(metaInfo.getName());
+                    wrapper.writeByte(stateType.getCode());
+                    serializeScope(ns, wrapper);
+                    dataWriter.accept(wrapper);
+                });
+    }
+
+    protected abstract void serializeScope(Ns ns, DataOutputViewStreamWrapper 
out)
+            throws IOException;
+
+    private byte[] serializeRaw(
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter)
+            throws IOException {
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream();

Review comment:
       Maybe we could add a TODO here to optimize the performance to reuse an 
output stream, which is like what `SerializedCompositeKeyBuilder` did.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/FunctionDelegationHelper.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link DelegatingFunction Delegating functions} are used to create metadata 
on recovery when the
+ * actual function code is not known yet. Once the actual function is known, 
backend updates the
+ * delegate which starts receiving the calls.
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+@Internal
+public class FunctionDelegationHelper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FunctionDelegationHelper.class);
+
+    public static <T> ReduceFunction<T> delegateReduceFunction() {
+        return new DelegatingReduceFunction<>();
+    }
+
+    public static <IN, ACC, OUT> AggregateFunction<IN, ACC, OUT> 
delegateAggregateFunction() {
+        return new DelegatingAggregateFunction<>();
+    }
+
+    private interface DelegatingFunction<F> extends Function {
+        void delegateIfNeeded(F delegated);
+    }
+
+    private final Map<String, DelegatingFunction> delegatingFunctions = new 
HashMap<>();
+
+    public <T, S extends State, F> void addOrUpdate(StateDescriptor<S, T> 
stateDescriptor) {
+        F function = tryGetFunction(stateDescriptor);
+        String name = stateDescriptor.getName();
+        if (function instanceof DelegatingFunction) {
+            LOG.debug("add delegate: {}", name);
+            delegatingFunctions.putIfAbsent(name, (DelegatingFunction<?>) 
function);
+        } else {
+            DelegatingFunction<F> delegating = delegatingFunctions.get(name);
+            if (delegating != null) {
+                LOG.debug("update delegate: {}", name);
+                checkState(function != null, "unable to extract function for 
state " + name);
+                delegating.delegateIfNeeded(function);
+            }
+        }
+    }
+
+    @Nullable
+    private static <F extends Function> F tryGetFunction(StateDescriptor<?, ?> 
stateDescriptor) {
+        if (stateDescriptor instanceof ReducingStateDescriptor) {
+            return (F) ((ReducingStateDescriptor) 
stateDescriptor).getReduceFunction();
+        } else if (stateDescriptor instanceof AggregatingStateDescriptor) {
+            return (F) ((AggregatingStateDescriptor) 
stateDescriptor).getAggregateFunction();
+        } else {
+            return null;
+        }
+    }
+
+    static class DelegatingAggregateFunction<IN, ACC, OUT>
+            implements AggregateFunction<IN, ACC, OUT>,
+                    DelegatingFunction<AggregateFunction<IN, ACC, OUT>> {
+        @Nullable private AggregateFunction<IN, ACC, OUT> delegated;
+
+        @Override
+        public void delegateIfNeeded(AggregateFunction<IN, ACC, OUT> 
delegated) {
+            if (this.delegated == null) {
+                this.delegated = checkNotNull(delegated);
+            }
+        }
+
+        @Override
+        public ACC createAccumulator() {
+            checkNotNull(delegated);
+            return delegated.createAccumulator();
+        }
+
+        @Override
+        public ACC add(IN value, ACC accumulator) {
+            checkNotNull(delegated);
+            return delegated.add(value, accumulator);
+        }
+
+        @Override
+        public OUT getResult(ACC accumulator) {
+            checkNotNull(delegated);
+            return delegated.getResult(accumulator);
+        }
+
+        @Override
+        public ACC merge(ACC a, ACC b) {
+            checkNotNull(delegated);
+            return delegated.merge(a, b);
+        }
+    }
+
+    static class DelegatingReduceFunction<T>

Review comment:
       I think adding a `serialVersionUID` is better.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * A reader for {@link StateChangelogHandleStreamImpl} that iterates over its 
underlying {@link
+ * StreamStateHandle stream handles} and offsets. Starting from each offset, 
it enumerates the
+ * {@link StateChange state changes} using the provided {@link 
StateChangeIterator}. Different
+ * {@link StateChangelogStorage} implementations may have different 
<b>iterator</b> implementations.
+ * Using a different {@link StateChangelogHandle} (and reader) is problematic 
as it needs to be
+ * serialized.
+ */
+@Internal
+public class StateChangelogHandleStreamHandleReader
+        implements StateChangelogHandleReader<StateChangelogHandleStreamImpl> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateChangelogHandleStreamImpl.class);
+
+    /** Reads a stream of state changes starting from a specified offset. */
+    public interface StateChangeIterator {

Review comment:
       Since no `StateChangeIterator` implementation yet, why not adding a TODO 
here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
##########
@@ -29,28 +30,45 @@
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** {@link StateChangelogHandle} implementation based on {@link 
StreamStateHandle}. */
 @Internal
 public final class StateChangelogHandleStreamImpl
         implements 
StateChangelogHandle<StateChangelogHandleStreamImpl.StateChangeStreamReader> {
     private static final long serialVersionUID = -8070326169926626355L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateChangelogHandleStreamImpl.class);
 
     private final KeyGroupRange keyGroupRange;
     /** NOTE: order is important as it reflects the order of changes. */
     private final List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets;
 
     private transient SharedStateRegistry stateRegistry;
+    private final long size;
 
     public StateChangelogHandleStreamImpl(
-            List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets, 
KeyGroupRange keyGroupRange) {
+            List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets,
+            KeyGroupRange keyGroupRange,
+            long size) {
         this.handlesAndOffsets = handlesAndOffsets;
         this.keyGroupRange = keyGroupRange;
+        this.size = size;
+    }
+
+    public StateChangelogHandleStreamImpl(

Review comment:
       I think this constructor is not used currently.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * A reader for {@link StateChangelogHandleStreamImpl} that iterates over its 
underlying {@link
+ * StreamStateHandle stream handles} and offsets. Starting from each offset, 
it enumerates the
+ * {@link StateChange state changes} using the provided {@link 
StateChangeIterator}. Different
+ * {@link StateChangelogStorage} implementations may have different 
<b>iterator</b> implementations.
+ * Using a different {@link StateChangelogHandle} (and reader) is problematic 
as it needs to be
+ * serialized.
+ */
+@Internal
+public class StateChangelogHandleStreamHandleReader
+        implements StateChangelogHandleReader<StateChangelogHandleStreamImpl> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateChangelogHandleStreamImpl.class);

Review comment:
       Why not use `StateChangelogHandleStreamHandleReader` as logger class?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
##########
@@ -29,28 +30,45 @@
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** {@link StateChangelogHandle} implementation based on {@link 
StreamStateHandle}. */
 @Internal
 public final class StateChangelogHandleStreamImpl

Review comment:
       Since we already have `InMemoryStateChangelogHandle`, why not make this 
class as `FileStateChangelogHandle`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
##########
@@ -70,11 +72,26 @@ public KeyGroupRange getKeyGroupRange() {
     @Nullable
     @Override
     public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-        throw new UnsupportedOperationException();
+        return 
changes.stream().mapToInt(StateChange::getKeyGroup).anyMatch(keyGroupRange::contains)
+                ? this
+                : null;
     }
 
     @Override
     public void registerSharedStates(SharedStateRegistry stateRegistry) {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public String toString() {
+        return String.format("from %s to %s: %s", from, to, changes);
+    }
+
+    public long getFrom() {
+        return ((SequenceNumber.GenericSequenceNumber) from).number;

Review comment:
       Why not adding a method such as `long number()` in interface 
`SequenceNumber`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to