pnowojski commented on a change in pull request #14839:
URL: https://github.com/apache/flink/pull/14839#discussion_r586375585



##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeSet.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Comparator.comparingInt;
+import static java.util.EnumSet.noneOf;
+import static java.util.EnumSet.of;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.CANCELLED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.CONFIRMED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.FAILED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.MATERIALIZED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.SCHEDULED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.SENT_TO_JM;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.UPLOADED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.UPLOADING;
+import static org.apache.flink.util.Preconditions.checkState;
+
+class StateChangeSet {
+    private final UUID logId;
+    private final AtomicReference<Status> status;

Review comment:
       `AtomicReference`? What's the threading model here? Is this class thread 
safe? Also this class lacks a bit of an explanation what does it represent and 
how is it supposed to be used.

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
+/**
+ * A {@link StateChangeStore} that waits for some configured amount of time 
before passing the
+ * accumulated state changes to the actual store.
+ */
+class BatchingStateChangeStore implements StateChangeStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchingStateChangeStore.class);
+
+    private final ScheduledExecutorService scheduler;
+    private final long scheduleDelayMs;
+    private final BlockingQueue<StateChangeSet> scheduled;
+    private final AtomicBoolean drainScheduled;
+    private final RetryPolicy retryPolicy;
+    private volatile Throwable error;
+    private final int sizeThreshold;
+    private final RetryingExecutor retryingExecutor;
+    private final StateChangeStore delegate;
+
+    BatchingStateChangeStore(
+            long persistDelayMs,
+            int sizeThreshold,
+            int requestQueueCapacity,
+            RetryPolicy retryPolicy,
+            StateChangeStore delegate) {
+        this.scheduleDelayMs = persistDelayMs;
+        this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true);
+        this.scheduler = SchedulerFactory.create(1, "ChangelogRetryScheduler", 
LOG);

Review comment:
       This creates a yet another thread for IO operations. Maybe good enough 
for MVP, but we should probably have a single thread pool for IO operations?

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
+/**
+ * A {@link StateChangeStore} that waits for some configured amount of time 
before passing the
+ * accumulated state changes to the actual store.
+ */
+class BatchingStateChangeStore implements StateChangeStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchingStateChangeStore.class);
+
+    private final ScheduledExecutorService scheduler;
+    private final long scheduleDelayMs;
+    private final BlockingQueue<StateChangeSet> scheduled;
+    private final AtomicBoolean drainScheduled;
+    private final RetryPolicy retryPolicy;
+    private volatile Throwable error;
+    private final int sizeThreshold;
+    private final RetryingExecutor retryingExecutor;
+    private final StateChangeStore delegate;
+
+    BatchingStateChangeStore(
+            long persistDelayMs,
+            int sizeThreshold,
+            int requestQueueCapacity,
+            RetryPolicy retryPolicy,
+            StateChangeStore delegate) {
+        this.scheduleDelayMs = persistDelayMs;
+        this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true);
+        this.scheduler = SchedulerFactory.create(1, "ChangelogRetryScheduler", 
LOG);
+        this.drainScheduled = new AtomicBoolean(false);
+        this.retryPolicy = retryPolicy;
+        this.retryingExecutor = new RetryingExecutor();
+        this.sizeThreshold = sizeThreshold;
+        this.delegate = delegate;
+    }
+
+    BatchingStateChangeStore(
+            long persistDelayMs,
+            int sizeThreshold,
+            int requestQueueCapacity,
+            RetryPolicy retryPolicy,
+            StateChangeStore delegate,
+            ScheduledExecutorService scheduler,
+            RetryingExecutor retryingExecutor) {
+        this.scheduleDelayMs = persistDelayMs;
+        this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true);
+        this.scheduler = scheduler;
+        this.drainScheduled = new AtomicBoolean(false);
+        this.retryPolicy = retryPolicy;
+        this.retryingExecutor = retryingExecutor;
+        this.sizeThreshold = sizeThreshold;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void save(Collection<StateChangeSet> changeSets) {
+        if (error != null) {
+            LOG.debug("don't persist {} changesets, already failed", 
changeSets.size());
+            changeSets.forEach(cs -> cs.setFailed(error));
+            return;
+        }
+        LOG.debug("persist {} changeSets", changeSets.size());
+        try {
+            for (StateChangeSet changeSet : changeSets) {
+                scheduled.put(changeSet); // blocks if no space in the queue
+            }
+            scheduleUploadIfNeeded();
+        } catch (Exception e) {
+            changeSets.forEach(cs -> cs.setFailed(e));
+            rethrow(e);
+        }
+    }
+
+    private void scheduleUploadIfNeeded() {
+        if (scheduleDelayMs == 0 || scheduled.size() >= sizeThreshold) {

Review comment:
       I think this size limit should be in bytes Could be a follow up ticket, 
but in that case I would just hardcode some number (`100`?) here, and in a 
later ticket implement proper size limit.

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING;
+import static org.apache.flink.runtime.concurrent.FutureUtils.combineAll;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@NotThreadSafe

Review comment:
       I guess this belongs to the base `StateChangelogWriter` interface

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING;
+import static org.apache.flink.runtime.concurrent.FutureUtils.combineAll;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@NotThreadSafe
+class FsStateChangelogWriter implements 
StateChangelogWriter<StateChangelogHandleStreamImpl> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FsStateChangelogWriter.class);
+
+    private final UUID logId;
+    private final KeyGroupRange keyGroupRange;
+    private final StateChangeStore store;
+    private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = 
new TreeMap<>();
+    private List<StateChange> activeChangeSet = new ArrayList<>();
+    private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L);
+    private boolean closed;
+
+    FsStateChangelogWriter(UUID logId, KeyGroupRange keyGroupRange, 
StateChangeStore store) {
+        this.logId = logId;
+        this.keyGroupRange = keyGroupRange;
+        this.store = store;
+    }
+
+    @Override
+    public void append(int keyGroup, byte[] value) {
+        LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, 
value.length);
+        checkState(!closed, "%s is closed", logId);
+        activeChangeSet.add(new StateChange(keyGroup, value));

Review comment:
       Why don't we start writing changes here, as on happy path we need to 
persist everything, right? In `persist()` we should just `flush()` the writes 
and return a future when the writes are completed. 

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeSet.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Comparator.comparingInt;
+import static java.util.EnumSet.noneOf;
+import static java.util.EnumSet.of;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.CANCELLED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.CONFIRMED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.FAILED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.MATERIALIZED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.SCHEDULED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.SENT_TO_JM;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.UPLOADED;
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.UPLOADING;
+import static org.apache.flink.util.Preconditions.checkState;
+
+class StateChangeSet {
+    private final UUID logId;
+    private final AtomicReference<Status> status;
+    private final List<StateChange> changes;
+    private final SequenceNumber sequenceNumber;
+    private final CompletableFuture<StoreResult> storeResultFuture;

Review comment:
       `CompletableFuture<StoreResult>` inside `StateChangeSet` sounds strange 
and makes threading boundaries hard to define (which methods of 
`StateChangeSet` are thread safe?).
   
   Why `CompletableFuture<StoreResult>` is not a returned from the 
`save(Collection< StateChangeSet>)` method?

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
+/**
+ * A {@link StateChangeStore} that waits for some configured amount of time 
before passing the
+ * accumulated state changes to the actual store.
+ */
+class BatchingStateChangeStore implements StateChangeStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchingStateChangeStore.class);
+
+    private final ScheduledExecutorService scheduler;
+    private final long scheduleDelayMs;
+    private final BlockingQueue<StateChangeSet> scheduled;
+    private final AtomicBoolean drainScheduled;
+    private final RetryPolicy retryPolicy;
+    private volatile Throwable error;

Review comment:
       Why complicate the synchronisation model and use many small locks 
instead of one large one? 
   
   It's already for me non obvious if there is or there is not race condition 
between completing previous `drainAndSave` and scheduling new one via checking 
`drainScheduled`.

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING;
+import static org.apache.flink.runtime.concurrent.FutureUtils.combineAll;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@NotThreadSafe
+class FsStateChangelogWriter implements 
StateChangelogWriter<StateChangelogHandleStreamImpl> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FsStateChangelogWriter.class);
+
+    private final UUID logId;
+    private final KeyGroupRange keyGroupRange;
+    private final StateChangeStore store;
+    private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = 
new TreeMap<>();
+    private List<StateChange> activeChangeSet = new ArrayList<>();
+    private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L);
+    private boolean closed;
+
+    FsStateChangelogWriter(UUID logId, KeyGroupRange keyGroupRange, 
StateChangeStore store) {
+        this.logId = logId;
+        this.keyGroupRange = keyGroupRange;
+        this.store = store;
+    }
+
+    @Override
+    public void append(int keyGroup, byte[] value) {
+        LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, 
value.length);
+        checkState(!closed, "%s is closed", logId);
+        activeChangeSet.add(new StateChange(keyGroup, value));
+        // size threshold could be added to call persist when reached. 
considerations:
+        // 0. can actually degrade performance by amplifying number of requests
+        // 1. which range to persist?
+        // 2. how to deal with retries/aborts?
+    }
+
+    @Override
+    public SequenceNumber lastAppendedSequenceNumber() {
+        LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber);

Review comment:
       logged `lastAppendedSequenceNumber` is different compared to the 
returned value. I'm not sure if that's the intention.

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
+/**
+ * A {@link StateChangeStore} that waits for some configured amount of time 
before passing the
+ * accumulated state changes to the actual store.
+ */
+class BatchingStateChangeStore implements StateChangeStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchingStateChangeStore.class);
+
+    private final ScheduledExecutorService scheduler;
+    private final long scheduleDelayMs;
+    private final BlockingQueue<StateChangeSet> scheduled;
+    private final AtomicBoolean drainScheduled;
+    private final RetryPolicy retryPolicy;
+    private volatile Throwable error;
+    private final int sizeThreshold;
+    private final RetryingExecutor retryingExecutor;
+    private final StateChangeStore delegate;
+
+    BatchingStateChangeStore(
+            long persistDelayMs,
+            int sizeThreshold,
+            int requestQueueCapacity,
+            RetryPolicy retryPolicy,
+            StateChangeStore delegate) {
+        this.scheduleDelayMs = persistDelayMs;
+        this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true);
+        this.scheduler = SchedulerFactory.create(1, "ChangelogRetryScheduler", 
LOG);
+        this.drainScheduled = new AtomicBoolean(false);
+        this.retryPolicy = retryPolicy;
+        this.retryingExecutor = new RetryingExecutor();
+        this.sizeThreshold = sizeThreshold;
+        this.delegate = delegate;
+    }
+
+    BatchingStateChangeStore(
+            long persistDelayMs,
+            int sizeThreshold,
+            int requestQueueCapacity,
+            RetryPolicy retryPolicy,
+            StateChangeStore delegate,
+            ScheduledExecutorService scheduler,
+            RetryingExecutor retryingExecutor) {
+        this.scheduleDelayMs = persistDelayMs;
+        this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true);
+        this.scheduler = scheduler;
+        this.drainScheduled = new AtomicBoolean(false);
+        this.retryPolicy = retryPolicy;
+        this.retryingExecutor = retryingExecutor;
+        this.sizeThreshold = sizeThreshold;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void save(Collection<StateChangeSet> changeSets) {
+        if (error != null) {
+            LOG.debug("don't persist {} changesets, already failed", 
changeSets.size());
+            changeSets.forEach(cs -> cs.setFailed(error));
+            return;
+        }
+        LOG.debug("persist {} changeSets", changeSets.size());
+        try {
+            for (StateChangeSet changeSet : changeSets) {
+                scheduled.put(changeSet); // blocks if no space in the queue
+            }
+            scheduleUploadIfNeeded();
+        } catch (Exception e) {
+            changeSets.forEach(cs -> cs.setFailed(e));
+            rethrow(e);
+        }

Review comment:
       Why `rethrow` as `RuntimeException`?

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING;
+import static org.apache.flink.runtime.concurrent.FutureUtils.combineAll;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@NotThreadSafe
+class FsStateChangelogWriter implements 
StateChangelogWriter<StateChangelogHandleStreamImpl> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FsStateChangelogWriter.class);
+
+    private final UUID logId;
+    private final KeyGroupRange keyGroupRange;
+    private final StateChangeStore store;
+    private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = 
new TreeMap<>();
+    private List<StateChange> activeChangeSet = new ArrayList<>();
+    private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L);
+    private boolean closed;
+
+    FsStateChangelogWriter(UUID logId, KeyGroupRange keyGroupRange, 
StateChangeStore store) {
+        this.logId = logId;
+        this.keyGroupRange = keyGroupRange;
+        this.store = store;
+    }
+
+    @Override
+    public void append(int keyGroup, byte[] value) {
+        LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, 
value.length);
+        checkState(!closed, "%s is closed", logId);
+        activeChangeSet.add(new StateChange(keyGroup, value));
+        // size threshold could be added to call persist when reached. 
considerations:
+        // 0. can actually degrade performance by amplifying number of requests
+        // 1. which range to persist?
+        // 2. how to deal with retries/aborts?
+    }
+
+    @Override
+    public SequenceNumber lastAppendedSequenceNumber() {
+        LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber);
+        rollover();
+        return lastAppendedSequenceNumber;
+    }

Review comment:
       Why do we need to rollover for a getter?

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
+/**
+ * A {@link StateChangeStore} that waits for some configured amount of time 
before passing the
+ * accumulated state changes to the actual store.
+ */
+class BatchingStateChangeStore implements StateChangeStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchingStateChangeStore.class);
+
+    private final ScheduledExecutorService scheduler;
+    private final long scheduleDelayMs;
+    private final BlockingQueue<StateChangeSet> scheduled;
+    private final AtomicBoolean drainScheduled;
+    private final RetryPolicy retryPolicy;
+    private volatile Throwable error;
+    private final int sizeThreshold;
+    private final RetryingExecutor retryingExecutor;
+    private final StateChangeStore delegate;
+
+    BatchingStateChangeStore(
+            long persistDelayMs,
+            int sizeThreshold,
+            int requestQueueCapacity,
+            RetryPolicy retryPolicy,
+            StateChangeStore delegate) {
+        this.scheduleDelayMs = persistDelayMs;
+        this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true);
+        this.scheduler = SchedulerFactory.create(1, "ChangelogRetryScheduler", 
LOG);
+        this.drainScheduled = new AtomicBoolean(false);
+        this.retryPolicy = retryPolicy;
+        this.retryingExecutor = new RetryingExecutor();
+        this.sizeThreshold = sizeThreshold;
+        this.delegate = delegate;
+    }
+
+    BatchingStateChangeStore(
+            long persistDelayMs,
+            int sizeThreshold,
+            int requestQueueCapacity,
+            RetryPolicy retryPolicy,
+            StateChangeStore delegate,
+            ScheduledExecutorService scheduler,
+            RetryingExecutor retryingExecutor) {

Review comment:
       Can you explain why do we need both of them? Why isn't a single executor 
enough? 

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING;
+import static org.apache.flink.runtime.concurrent.FutureUtils.combineAll;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@NotThreadSafe
+class FsStateChangelogWriter implements 
StateChangelogWriter<StateChangelogHandleStreamImpl> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FsStateChangelogWriter.class);
+
+    private final UUID logId;
+    private final KeyGroupRange keyGroupRange;
+    private final StateChangeStore store;
+    private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = 
new TreeMap<>();
+    private List<StateChange> activeChangeSet = new ArrayList<>();
+    private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L);
+    private boolean closed;
+
+    FsStateChangelogWriter(UUID logId, KeyGroupRange keyGroupRange, 
StateChangeStore store) {
+        this.logId = logId;
+        this.keyGroupRange = keyGroupRange;
+        this.store = store;
+    }
+
+    @Override
+    public void append(int keyGroup, byte[] value) {
+        LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, 
value.length);
+        checkState(!closed, "%s is closed", logId);
+        activeChangeSet.add(new StateChange(keyGroup, value));
+        // size threshold could be added to call persist when reached. 
considerations:
+        // 0. can actually degrade performance by amplifying number of requests
+        // 1. which range to persist?
+        // 2. how to deal with retries/aborts?
+    }
+
+    @Override
+    public SequenceNumber lastAppendedSequenceNumber() {
+        LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber);
+        rollover();
+        return lastAppendedSequenceNumber;
+    }
+
+    @Override
+    public CompletableFuture<StateChangelogHandleStreamImpl> 
persist(SequenceNumber from)
+            throws IOException {
+        LOG.debug("persist {} from {}", logId, from);
+        checkNotNull(from);
+        checkArgument(
+                lastAppendedSequenceNumber.next().equals(from) || 
changeSets.containsKey(from),
+                "sequence number %s to persist from not in range (%s:%s/%s)",
+                from,
+                changeSets.isEmpty() ? null : changeSets.firstKey(),
+                changeSets.isEmpty() ? null : changeSets.lastKey(),
+                lastAppendedSequenceNumber.next());
+
+        rollover();
+        Collected collected = collect(from);
+        collected.toRetry.forEach(
+                changeSet -> changeSets.put(changeSet.getSequenceNumber(), 
changeSet));
+        LOG.debug("collected {}", collected);
+        store.save(collected.toUpload);
+        return asHandle(collected.toReturn);
+    }
+
+    private Collected collect(SequenceNumber from) {
+        Collected result = new Collected();
+        changeSets
+                .tailMap(from, true)
+                .values()
+                .forEach(
+                        changeSet -> {
+                            if (changeSet.isConfirmed()) {
+                                result.toReturn.add(changeSet);
+                            } else if (changeSet.setScheduled()) {
+                                result.toUpload.add(changeSet);
+                            } else {
+                                // we also re-upload any 
scheduled/uploading/uploaded changes
+                                // even if they were not sent to the JM yet 
because this can happen
+                                // in the meantime and then JM can decide to 
discard them
+                                result.toRetry.add(changeSet.forRetry());
+                            }
+                        });
+        result.toUpload.addAll(result.toRetry);
+        result.toReturn.addAll(result.toUpload);
+        return result;
+    }
+
+    @Override
+    public void close() {
+        LOG.debug("close {}", logId);
+        checkState(!closed);
+        closed = true;
+        activeChangeSet.clear();
+        changeSets.values().forEach(StateChangeSet::setCancelled);
+        // todo in MVP or later: cleanup if transition succeeded and had 
non-shared state
+        changeSets.clear();
+        // the store is closed from the owning FsStateChangelogClient
+    }
+
+    @Override
+    public void confirm(SequenceNumber from, SequenceNumber to) {
+        LOG.debug("confirm {} from {} to {}", logId, from, to);
+        changeSets.subMap(from, true, to, 
false).values().forEach(StateChangeSet::setConfirmed);
+    }
+
+    @Override
+    public void reset(SequenceNumber from, SequenceNumber to) {
+        LOG.debug("reset {} from {} to {}", logId, from, to);
+        changeSets.subMap(from, to).forEach((key, value) -> 
value.setAborted());
+        // todo in MVP or later: cleanup if change to aborted succeeded and 
had non-shared state
+        // For now, relying on manual cleanup.
+        // If a checkpoint that is aborted uses the changes uploaded for 
another checkpoint
+        // which was completed on JM but not confirmed to this TM
+        // then discarding those changes would invalidate that previously 
completed checkpoint.
+        // Solution:
+        // 1. pass last completed checkpoint id in barriers, trigger RPC, and 
abort RPC
+        // 2. confirm for the id above
+        // 3. make sure that at most 1 checkpoint in flight (CC config)
+    }
+
+    @Override
+    public void truncate(SequenceNumber to) {
+        LOG.debug("truncate {} to {}", logId, to);
+        if (to.compareTo(lastAppendedSequenceNumber) > 0) {
+            // can happen if client calls truncate(prevSqn.next())
+            rollover();
+        }
+        NavigableMap<SequenceNumber, StateChangeSet> headMap = 
changeSets.headMap(to, false);
+        headMap.values().forEach(StateChangeSet::setTruncated);
+        headMap.clear();
+    }

Review comment:
       Are we holding in memory all state changes until we call truncate?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to