curcur commented on a change in pull request #14838:
URL: https://github.com/apache/flink/pull/14838#discussion_r572613154



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateChange.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/** Change of state of a keyed operator. Used for generic incremental 
checkpoints. */
+@Internal
+public class StateChange {

Review comment:
       Maybe rename this to `KeyedStateChange` since it relates changes to a 
key group?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateChangelogHandleStreamImpl.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/** {@link StateChangelogHandle} implementation based on {@link 
StreamStateHandle}. */
+@Internal
+public final class StateChangelogHandleStreamImpl
+        implements 
StateChangelogHandle<StateChangelogHandleStreamImpl.StateChangeStreamReader> {
+    private static final long serialVersionUID = -8070326169926626355L;
+
+    private final KeyGroupRange keyGroupRange;
+    /** NOTE: order is important as it reflects the order of changes. */
+    private final List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets;

Review comment:
       `Tuple2<StreamStateHandle, Long>` represents a series of state changes 
of previous persistent StateChangeLog checkpoints/snapshot I guess.
   
   What is the second field, offset to the file? 
   Would it be more clear to make it a separate class, with each field a 
specific name and java doc/comment?
   
   The structure might also change if we want to use something else (DB) 
instead of a blob DFS? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateChangelogHandleStreamImpl.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/** {@link StateChangelogHandle} implementation based on {@link 
StreamStateHandle}. */
+@Internal
+public final class StateChangelogHandleStreamImpl
+        implements 
StateChangelogHandle<StateChangelogHandleStreamImpl.StateChangeStreamReader> {
+    private static final long serialVersionUID = -8070326169926626355L;
+
+    private final KeyGroupRange keyGroupRange;
+    /** NOTE: order is important as it reflects the order of changes. */
+    private final List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets;
+
+    private transient SharedStateRegistry stateRegistry;
+
+    public StateChangelogHandleStreamImpl(
+            List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets, 
KeyGroupRange keyGroupRange) {
+        this.handlesAndOffsets = handlesAndOffsets;
+        this.keyGroupRange = keyGroupRange;
+    }
+
+    @Override
+    public void registerSharedStates(SharedStateRegistry stateRegistry) {
+        this.stateRegistry = stateRegistry;
+        handlesAndOffsets.forEach(
+                handleAndOffset ->
+                        stateRegistry.registerReference(
+                                getKey(handleAndOffset.f0), 
handleAndOffset.f0));

Review comment:
       see, confusing here; ".f0", same comment to make the Tuple2 -> Class

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateChangelogHandleStreamImpl.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/** {@link StateChangelogHandle} implementation based on {@link 
StreamStateHandle}. */
+@Internal
+public final class StateChangelogHandleStreamImpl
+        implements 
StateChangelogHandle<StateChangelogHandleStreamImpl.StateChangeStreamReader> {
+    private static final long serialVersionUID = -8070326169926626355L;
+
+    private final KeyGroupRange keyGroupRange;
+    /** NOTE: order is important as it reflects the order of changes. */
+    private final List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets;
+
+    private transient SharedStateRegistry stateRegistry;
+
+    public StateChangelogHandleStreamImpl(
+            List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets, 
KeyGroupRange keyGroupRange) {
+        this.handlesAndOffsets = handlesAndOffsets;
+        this.keyGroupRange = keyGroupRange;
+    }
+
+    @Override
+    public void registerSharedStates(SharedStateRegistry stateRegistry) {
+        this.stateRegistry = stateRegistry;
+        handlesAndOffsets.forEach(
+                handleAndOffset ->
+                        stateRegistry.registerReference(
+                                getKey(handleAndOffset.f0), 
handleAndOffset.f0));
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyGroupRange;
+    }
+
+    @Nullable
+    @Override
+    public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+        KeyGroupRange offsets = keyGroupRange.getIntersection(keyGroupRange);
+        if (offsets.getNumberOfKeyGroups() == 0) {
+            return null;
+        }
+        return new StateChangelogHandleStreamImpl(handlesAndOffsets, offsets);
+    }
+
+    @Override
+    public CloseableIterator<StateChange> getChanges(StateChangeStreamReader 
reader) {
+        return new CloseableIterator<StateChange>() {
+            private final Iterator<Tuple2<StreamStateHandle, Long>> 
handleIterator =
+                    handlesAndOffsets.iterator();
+
+            private CloseableIterator<StateChange> current = 
CloseableIterator.empty();
+
+            @Override
+            public boolean hasNext() {
+                advance();
+                return current.hasNext();
+            }
+
+            @Override
+            public StateChange next() {
+                advance();
+                return current.next();
+            }
+
+            private void advance() {
+                while (!current.hasNext() && handleIterator.hasNext()) {
+                    Tuple2<StreamStateHandle, Long> tuple2 = 
handleIterator.next();
+                    try {
+                        current = reader.read(tuple2.f0, tuple2.f1);
+                    } catch (IOException e) {
+                        ExceptionUtils.rethrow(e);
+                    }
+                }
+            }
+
+            @Override
+            public void close() throws Exception {
+                current.close();
+            }
+        };
+    }
+
+    @Override
+    public void discardState() {
+        handlesAndOffsets.forEach(
+                handleAndOffset -> 
stateRegistry.unregisterReference(getKey(handleAndOffset.f0)));
+    }
+
+    @Override
+    public long getStateSize() {
+        return 0;
+    }
+
+    private static SharedStateRegistryKey getKey(StreamStateHandle 
stateHandle) {

Review comment:
       I am not quite familiar with how sharing is exactly working, so forgive 
me if I am asking obvious questions:
   
   1. Sharing is per state, or per task, or per state backend or multiple state 
backends? I understand it is at least per state changelog, and state changelog 
is mapping to each state backend. So it is per state backend or multiple state 
backends.
   
   2. I understand "at least" changes from the same state backend would be 
sharable, right? So what is the problem if using backend UUID? Or multiple 
backends bundles state changes altogether in a file?
   
   If I can understand the above two questions, I think I can understand why 
state backend UUID can not be used as a key. And maybe rewrite the comment this 
way?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/changelog/SequenceNumber.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A logical timestamp to draw a boundary between the materialized and 
non-materialized changes.
+ * Maintained by the state backend but implementations may choose to move its 
generation to {@link

Review comment:
       "Maintained by the state backend", hmm. I was thinking about whether 
this info (last materialized sequence number) has to be maintained within the 
state backend. 
   
   If it is simply a boundary between materialized and non-materialized changes 
for state changelog, why we can not put it within the state changelog? Is it 
used in some other places?
   

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/changelog/StateChangelogWriter.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StateChangelogHandle;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/** Allows to write data to the log. Scoped to a single writer (e.g. state 
backend). */
+@Internal
+public interface StateChangelogWriter<Handle extends StateChangelogHandle<?>>
+        extends AutoCloseable {
+
+    /**
+     * Get {@link SequenceNumber} of the last element added by {@link 
#append(int, byte[]) append}.
+     */
+    SequenceNumber lastAppendedSqn();
+
+    /** Appends the provided data to this log. No persistency guarantees. */
+    void append(int keyGroup, byte[] value);

Review comment:
       This is the method called from Statebackend when updating a state?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/changelog/StateChangelogWriter.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StateChangelogHandle;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/** Allows to write data to the log. Scoped to a single writer (e.g. state 
backend). */
+@Internal
+public interface StateChangelogWriter<Handle extends StateChangelogHandle<?>>
+        extends AutoCloseable {
+
+    /**
+     * Get {@link SequenceNumber} of the last element added by {@link 
#append(int, byte[]) append}.
+     */
+    SequenceNumber lastAppendedSqn();
+
+    /** Appends the provided data to this log. No persistency guarantees. */
+    void append(int keyGroup, byte[] value);
+
+    /**
+     * Durably persist previously {@link #append(int, byte[]) appended} data. 
After this call, one
+     * of {@link #confirm(SequenceNumber, SequenceNumber) confirm}, {@link 
#reset(SequenceNumber,
+     * SequenceNumber) reset}, or {@link #truncate(SequenceNumber) truncate} 
eventually must be
+     * called for the corresponding change set. with reset/truncate/confirm 
methods?
+     *
+     * @param from inclusive
+     */
+    CompletableFuture<Handle> persist(SequenceNumber from) throws IOException;

Review comment:
       This is the method to call when snapshotting is called from the state 
backend?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to