rkhachatryan commented on a change in pull request #15371:
URL: https://github.com/apache/flink/pull/15371#discussion_r602353460



##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.StateChangeStore.StoreTask;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import 
org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory.ChangelogCallbackExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Basic implementation of {@link StateChangelogWriter}. It's main purpose is 
to keep track which
+ * changes are required for the given {@link #persist(SequenceNumber)} call. 
Especially it takes
+ * care of re-uploading changes from the previous {@link 
#persist(SequenceNumber)} call, if those
+ * changes haven't been yet {@link #confirm(SequenceNumber, 
SequenceNumber)}'ed. This is crucial as
+ * until changes are {@link #confirm(SequenceNumber, SequenceNumber)}, they 
still can be aborted and
+ * removed/deleted.
+ *
+ * <p>For example if checkpoint N-1 fails and is disposed, after checkpoint N 
has already started.
+ * In this case, when we are persisting {@link StateChangeSet}s for checkpoint 
N, we need to
+ * re-upload {@link StateChangeSet}s that belonged to checkpoint N-1.
+ */
+@NotThreadSafe
+class FsStateChangelogWriter implements 
StateChangelogWriter<StateChangelogHandleStreamImpl> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FsStateChangelogWriter.class);
+
+    private final UUID logId;
+    private final KeyGroupRange keyGroupRange;
+    private final StateChangeStore store;
+    private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = 
new TreeMap<>();
+    private final NavigableMap<SequenceNumber, StoreResult> uploaded = new 
TreeMap<>();
+    private final NavigableMap<SequenceNumber, StoreResult> confirmed = new 
TreeMap<>();
+    private List<StateChange> activeChangeSet = new ArrayList<>();
+    private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L);
+    private boolean closed;
+    private final ChangelogCallbackExecutor executor;
+
+    FsStateChangelogWriter(
+            UUID logId,
+            KeyGroupRange keyGroupRange,
+            StateChangeStore store,
+            ChangelogCallbackExecutor executor) {
+        this.logId = logId;
+        this.keyGroupRange = keyGroupRange;
+        this.store = store;
+        this.executor = checkNotNull(executor);
+    }
+
+    @Override
+    public void append(int keyGroup, byte[] value) {
+        LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, 
value.length);
+        checkState(!closed, "%s is closed", logId);
+        activeChangeSet.add(new StateChange(keyGroup, value));
+        // size threshold could be added to call persist when reached. 
considerations:
+        // 0. can actually degrade performance by amplifying number of requests
+        // 1. which range to persist?
+        // 2. how to deal with retries/aborts?
+    }
+
+    @Override
+    public SequenceNumber lastAppendedSequenceNumber() {
+        rollover();
+        LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber);
+        return lastAppendedSequenceNumber;
+    }
+
+    @Override
+    public CompletableFuture<StateChangelogHandleStreamImpl> 
persist(SequenceNumber from)
+            throws IOException {
+        LOG.debug("persist {} from {}", logId, from);
+        checkNotNull(from);
+        // todo: check range
+
+        rollover();
+        Collection<StoreResult> readyToReturn = confirmed.tailMap(from, 
true).values();
+        Collection<StateChangeSet> toUpload = changeSets.tailMap(from, 
true).values();
+        LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn, 
toUpload);
+
+        StoreTask task = new StoreTask(toUpload);
+        // Handle the result result in supplied executor (mailbox) thread
+        CompletableFuture<StateChangelogHandleStreamImpl> resultFuture =
+                task.getResultFuture()
+                        .thenApplyAsync(
+                                (results) -> {
+                                    results.forEach(e -> 
uploaded.put(e.sequenceNumber, e));

Review comment:
       I think we need to check here whether the upload result is still 
relevant (or another upload was started in the meantime). For example, it could 
have been aborted, or another checkpoint (with the overlapping change range) 
was started.
   
   (this was one of concerns when I published #15322)

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.lang.Thread.holdsLock;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link StateChangeStore} that waits for some configured amount of time 
before passing the
+ * accumulated state changes to the actual store. The writes are executed 
asynchronously.
+ */
+@ThreadSafe
+class BatchingStateChangeStore implements StateChangeStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchingStateChangeStore.class);
+
+    private final StateChangeStore delegate;
+    private final ScheduledExecutorService scheduler;
+    private final long scheduleDelayMs;
+    private final long sizeThresholdBytes;
+
+    @GuardedBy("scheduled")
+    private final Queue<StoreTask> scheduled;
+
+    @GuardedBy("scheduled")
+    private long scheduledSizeInBytes;
+
+    @GuardedBy("scheduled")
+    private Future<?> scheduledFuture = 
CompletableFuture.completedFuture(null);
+
+    private AtomicReference<Throwable> error = new AtomicReference<>(null);
+
+    BatchingStateChangeStore(
+            long persistDelayMs, long sizeThresholdBytes, StateChangeStore 
delegate) {
+        this(
+                persistDelayMs,
+                sizeThresholdBytes,
+                delegate,
+                SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG));
+    }
+
+    BatchingStateChangeStore(
+            long persistDelayMs,
+            long sizeThresholdBytes,
+            StateChangeStore delegate,
+            ScheduledExecutorService scheduler) {
+        this.scheduleDelayMs = persistDelayMs;
+        this.scheduled = new LinkedList<>();
+        this.scheduler = scheduler;
+        this.sizeThresholdBytes = sizeThresholdBytes;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void save(StoreTask storeTask) {
+        Collection<StateChangeSet> changeSets = storeTask.changeSets;
+        if (error.get() != null) {
+            LOG.debug("don't persist {} changesets, already failed", 
changeSets.size());
+            storeTask.fail(error.get());
+            return;
+        }
+        LOG.debug("persist {} changeSets", changeSets.size());
+        try {
+            synchronized (scheduled) {
+                scheduled.add(storeTask);
+                scheduledSizeInBytes += storeTask.getSize();
+                scheduleUploadIfNeeded();
+            }
+        } catch (Exception e) {
+            storeTask.fail(e);
+            throw e;
+        }
+    }
+
+    private void scheduleUploadIfNeeded() {
+        checkState(holdsLock(scheduled));
+        if (scheduledFuture.isDone() || 
isOverSizeThresholdAndCancellationSucceded()) {
+            scheduleUpload();

Review comment:
       I think there is a race condition here which could lead to upload not to 
be scheduled:
   1. Future is runnig in `drainAndSave()`, enters `scheduleUploadIfNeeded()` 
and sees no need to reschedule (no data added)
   2. It exits the `synchronized` section but not the task yet (so future isn't 
finished yet)
   3. Another thread adds data and sees future.isDone == false, so it doesn't 
schedule neither
   
   WDYT?
   
   (the related issue below is about `scheduledFuture.cancel`, this one is 
about `scheduledFuture.isDone()`; I guess fix will address both of them)

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.lang.Thread.holdsLock;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link StateChangeStore} that waits for some configured amount of time 
before passing the
+ * accumulated state changes to the actual store. The writes are executed 
asynchronously.
+ */
+@ThreadSafe
+class BatchingStateChangeStore implements StateChangeStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchingStateChangeStore.class);
+
+    private final StateChangeStore delegate;
+    private final ScheduledExecutorService scheduler;
+    private final long scheduleDelayMs;
+    private final long sizeThresholdBytes;
+
+    @GuardedBy("scheduled")
+    private final Queue<StoreTask> scheduled;
+
+    @GuardedBy("scheduled")
+    private long scheduledSizeInBytes;
+
+    @GuardedBy("scheduled")
+    private Future<?> scheduledFuture = 
CompletableFuture.completedFuture(null);
+
+    private AtomicReference<Throwable> error = new AtomicReference<>(null);
+
+    BatchingStateChangeStore(
+            long persistDelayMs, long sizeThresholdBytes, StateChangeStore 
delegate) {
+        this(
+                persistDelayMs,
+                sizeThresholdBytes,
+                delegate,
+                SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG));
+    }
+
+    BatchingStateChangeStore(
+            long persistDelayMs,
+            long sizeThresholdBytes,
+            StateChangeStore delegate,
+            ScheduledExecutorService scheduler) {
+        this.scheduleDelayMs = persistDelayMs;
+        this.scheduled = new LinkedList<>();
+        this.scheduler = scheduler;
+        this.sizeThresholdBytes = sizeThresholdBytes;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void save(StoreTask storeTask) {
+        Collection<StateChangeSet> changeSets = storeTask.changeSets;
+        if (error.get() != null) {
+            LOG.debug("don't persist {} changesets, already failed", 
changeSets.size());
+            storeTask.fail(error.get());
+            return;
+        }
+        LOG.debug("persist {} changeSets", changeSets.size());
+        try {
+            synchronized (scheduled) {
+                scheduled.add(storeTask);
+                scheduledSizeInBytes += storeTask.getSize();
+                scheduleUploadIfNeeded();
+            }
+        } catch (Exception e) {
+            storeTask.fail(e);
+            throw e;
+        }
+    }
+
+    private void scheduleUploadIfNeeded() {
+        checkState(holdsLock(scheduled));
+        if (scheduledFuture.isDone() || 
isOverSizeThresholdAndCancellationSucceded()) {
+            scheduleUpload();
+        }
+    }
+
+    private boolean isOverSizeThresholdAndCancellationSucceded() {
+        return scheduledSizeInBytes >= sizeThresholdBytes && 
scheduledFuture.cancel(false);
+    }

Review comment:
       I think the 2nd check may result in threshold being exceeded but not 
scheduled for upload:
   1. T1: add 1 byte, schedule future 1
   2. Executor: waits and eventually runs future 1, calls 
`scheduleUploadIfNeeded` and returns without scheduling; now thread gets 
suspended by the OS (so future 1 isn't completed)
   2. T1: add more bytes exceeding the threshold; try to cancel future 1 and 
fail => future2 is not scheduled by T1
   4. Executor also doesn't schedule any future 
   
   WDYT?

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java
##########
@@ -136,21 +133,23 @@ private void drainAndSave() {
             tasks = new ArrayList<>(scheduled);
             scheduled.clear();
             scheduledSizeInBytes = 0;
-            scheduledFuture = null;
         }
         try {
-            if (error != null) {
-                tasks.forEach(task -> task.fail(error));
+            if (error.get() != null) {
+                tasks.forEach(task -> task.fail(error.get()));
                 return;
             }
-            retryingExecutor.execute(retryPolicy, () -> delegate.save(tasks));
+            delegate.save(tasks);
+
+            synchronized (scheduled) {
+                scheduleUploadIfNeeded();
+            }
         } catch (Throwable t) {
             tasks.forEach(task -> task.fail(t));
             if (findThrowable(t, IOException.class).isPresent()) {
                 LOG.warn("Caught IO exception while uploading", t);
             } else {
-                error = t;
-                throw t;
+                error.compareAndSet(null, t);

Review comment:
       Could you explain the removal of `throw t;` ?
   Without it:
   1. error can be not logged at all - if no more upload scheduled
   2. or if upload errors are not logged later
   3. it can be logged with a delay (complicated debug)

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java
##########
@@ -136,21 +133,23 @@ private void drainAndSave() {
             tasks = new ArrayList<>(scheduled);
             scheduled.clear();
             scheduledSizeInBytes = 0;
-            scheduledFuture = null;
         }
         try {
-            if (error != null) {
-                tasks.forEach(task -> task.fail(error));
+            if (error.get() != null) {
+                tasks.forEach(task -> task.fail(error.get()));
                 return;
             }
-            retryingExecutor.execute(retryPolicy, () -> delegate.save(tasks));
+            delegate.save(tasks);
+
+            synchronized (scheduled) {
+                scheduleUploadIfNeeded();
+            }

Review comment:
       This won't be executed in case of exception (but I guess it will change 
if concurrency issues are addressed).
   
   But more important, no future uploads will start because `this.error` is set 
later in `catch` block. I guess the assumption is that a single failure leads 
to the whole job failover?
   
   But that shouldn't be the case, as tasks should tolerate checkpoint failures 
and it's the JM who decides how to handle checkpoint failures.

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.lang.Thread.holdsLock;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link StateChangeStore} that waits for some configured amount of time 
before passing the
+ * accumulated state changes to the actual store. The writes are executed 
asynchronously.
+ */
+@ThreadSafe
+class BatchingStateChangeStore implements StateChangeStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchingStateChangeStore.class);
+
+    private final StateChangeStore delegate;
+    private final ScheduledExecutorService scheduler;
+    private final long scheduleDelayMs;
+    private final long sizeThresholdBytes;
+
+    @GuardedBy("scheduled")
+    private final Queue<StoreTask> scheduled;
+
+    @GuardedBy("scheduled")
+    private long scheduledSizeInBytes;
+
+    @GuardedBy("scheduled")
+    private Future<?> scheduledFuture = 
CompletableFuture.completedFuture(null);
+
+    private AtomicReference<Throwable> error = new AtomicReference<>(null);
+
+    BatchingStateChangeStore(
+            long persistDelayMs, long sizeThresholdBytes, StateChangeStore 
delegate) {
+        this(
+                persistDelayMs,
+                sizeThresholdBytes,
+                delegate,
+                SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG));
+    }
+
+    BatchingStateChangeStore(
+            long persistDelayMs,
+            long sizeThresholdBytes,
+            StateChangeStore delegate,
+            ScheduledExecutorService scheduler) {
+        this.scheduleDelayMs = persistDelayMs;
+        this.scheduled = new LinkedList<>();
+        this.scheduler = scheduler;
+        this.sizeThresholdBytes = sizeThresholdBytes;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void save(StoreTask storeTask) {
+        Collection<StateChangeSet> changeSets = storeTask.changeSets;
+        if (error.get() != null) {
+            LOG.debug("don't persist {} changesets, already failed", 
changeSets.size());
+            storeTask.fail(error.get());
+            return;
+        }
+        LOG.debug("persist {} changeSets", changeSets.size());
+        try {
+            synchronized (scheduled) {
+                scheduled.add(storeTask);
+                scheduledSizeInBytes += storeTask.getSize();
+                scheduleUploadIfNeeded();
+            }
+        } catch (Exception e) {
+            storeTask.fail(e);
+            throw e;
+        }
+    }
+
+    private void scheduleUploadIfNeeded() {
+        checkState(holdsLock(scheduled));
+        if (scheduledFuture.isDone() || 
isOverSizeThresholdAndCancellationSucceded()) {
+            scheduleUpload();
+        }
+    }
+
+    private boolean isOverSizeThresholdAndCancellationSucceded() {
+        return scheduledSizeInBytes >= sizeThresholdBytes && 
scheduledFuture.cancel(false);
+    }
+
+    private void scheduleUpload() {
+        checkState(scheduledFuture.isDone());
+        long delay = scheduleDelayMs;
+        if (scheduleDelayMs == 0 || scheduledSizeInBytes >= 
sizeThresholdBytes) {
+            scheduledFuture = scheduler.submit(this::drainAndSave);
+        } else {
+            scheduledFuture = scheduler.schedule(this::drainAndSave, delay, 
MILLISECONDS);
+        }
+    }
+
+    private void drainAndSave() {
+        Collection<StoreTask> tasks;
+        synchronized (scheduled) {
+            tasks = new ArrayList<>(scheduled);
+            scheduled.clear();
+            scheduledSizeInBytes = 0;
+        }
+        try {
+            if (error.get() != null) {
+                tasks.forEach(task -> task.fail(error.get()));
+                return;
+            }
+            delegate.save(tasks);

Review comment:
       This is a single-threaded upload now, I think we need more threads as 
discussed offline.

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.StateChangeStore.StoreTask;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import 
org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory.ChangelogCallbackExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Basic implementation of {@link StateChangelogWriter}. It's main purpose is 
to keep track which
+ * changes are required for the given {@link #persist(SequenceNumber)} call. 
Especially it takes
+ * care of re-uploading changes from the previous {@link 
#persist(SequenceNumber)} call, if those
+ * changes haven't been yet {@link #confirm(SequenceNumber, 
SequenceNumber)}'ed. This is crucial as
+ * until changes are {@link #confirm(SequenceNumber, SequenceNumber)}, they 
still can be aborted and
+ * removed/deleted.
+ *
+ * <p>For example if checkpoint N-1 fails and is disposed, after checkpoint N 
has already started.
+ * In this case, when we are persisting {@link StateChangeSet}s for checkpoint 
N, we need to
+ * re-upload {@link StateChangeSet}s that belonged to checkpoint N-1.
+ */
+@NotThreadSafe
+class FsStateChangelogWriter implements 
StateChangelogWriter<StateChangelogHandleStreamImpl> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FsStateChangelogWriter.class);
+
+    private final UUID logId;
+    private final KeyGroupRange keyGroupRange;
+    private final StateChangeStore store;
+    private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = 
new TreeMap<>();
+    private final NavigableMap<SequenceNumber, StoreResult> uploaded = new 
TreeMap<>();
+    private final NavigableMap<SequenceNumber, StoreResult> confirmed = new 
TreeMap<>();
+    private List<StateChange> activeChangeSet = new ArrayList<>();
+    private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L);
+    private boolean closed;
+    private final ChangelogCallbackExecutor executor;
+
+    FsStateChangelogWriter(
+            UUID logId,
+            KeyGroupRange keyGroupRange,
+            StateChangeStore store,
+            ChangelogCallbackExecutor executor) {
+        this.logId = logId;
+        this.keyGroupRange = keyGroupRange;
+        this.store = store;
+        this.executor = checkNotNull(executor);
+    }
+
+    @Override
+    public void append(int keyGroup, byte[] value) {
+        LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, 
value.length);
+        checkState(!closed, "%s is closed", logId);
+        activeChangeSet.add(new StateChange(keyGroup, value));
+        // size threshold could be added to call persist when reached. 
considerations:
+        // 0. can actually degrade performance by amplifying number of requests
+        // 1. which range to persist?
+        // 2. how to deal with retries/aborts?
+    }
+
+    @Override
+    public SequenceNumber lastAppendedSequenceNumber() {
+        rollover();
+        LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber);
+        return lastAppendedSequenceNumber;
+    }
+
+    @Override
+    public CompletableFuture<StateChangelogHandleStreamImpl> 
persist(SequenceNumber from)
+            throws IOException {
+        LOG.debug("persist {} from {}", logId, from);
+        checkNotNull(from);
+        // todo: check range
+
+        rollover();
+        Collection<StoreResult> readyToReturn = confirmed.tailMap(from, 
true).values();
+        Collection<StateChangeSet> toUpload = changeSets.tailMap(from, 
true).values();
+        LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn, 
toUpload);
+
+        StoreTask task = new StoreTask(toUpload);
+        // Handle the result result in supplied executor (mailbox) thread
+        CompletableFuture<StateChangelogHandleStreamImpl> resultFuture =
+                task.getResultFuture()
+                        .thenApplyAsync(
+                                (results) -> {
+                                    results.forEach(e -> 
uploaded.put(e.sequenceNumber, e));
+                                    return buildHandle(results, readyToReturn);
+                                },
+                                executor::execute);
+        store.save(task);
+        return resultFuture;
+    }
+
+    @Override
+    public void close() {
+        LOG.debug("close {}", logId);
+        checkState(!closed);
+        closed = true;
+        activeChangeSet.clear();
+        // todo in MVP or later: cleanup if transition succeeded and had 
non-shared state
+        changeSets.clear();
+        uploaded.clear();
+        confirmed.clear();
+        // the store is closed from the owning FsStateChangelogClient
+    }
+
+    @Override
+    public void confirm(SequenceNumber from, SequenceNumber to) {
+        LOG.debug("confirm {} from {} to {}", logId, from, to);
+        uploaded.subMap(from, true, to, false)
+                .forEach(
+                        (sequenceNumber, changeSetAndResult) -> {
+                            changeSets.remove(sequenceNumber);
+                            uploaded.remove(sequenceNumber);
+                            confirmed.put(sequenceNumber, changeSetAndResult);

Review comment:
       I think that we need to check whether this range is still relevant (or 
was truncated in the meantime).
   
   (this was one of concerns when I published #15322)

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.StateChangeStore.StoreTask;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import 
org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory.ChangelogCallbackExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Basic implementation of {@link StateChangelogWriter}. It's main purpose is 
to keep track which
+ * changes are required for the given {@link #persist(SequenceNumber)} call. 
Especially it takes
+ * care of re-uploading changes from the previous {@link 
#persist(SequenceNumber)} call, if those
+ * changes haven't been yet {@link #confirm(SequenceNumber, 
SequenceNumber)}'ed. This is crucial as
+ * until changes are {@link #confirm(SequenceNumber, SequenceNumber)}, they 
still can be aborted and
+ * removed/deleted.
+ *
+ * <p>For example if checkpoint N-1 fails and is disposed, after checkpoint N 
has already started.
+ * In this case, when we are persisting {@link StateChangeSet}s for checkpoint 
N, we need to
+ * re-upload {@link StateChangeSet}s that belonged to checkpoint N-1.
+ */
+@NotThreadSafe
+class FsStateChangelogWriter implements 
StateChangelogWriter<StateChangelogHandleStreamImpl> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FsStateChangelogWriter.class);
+
+    private final UUID logId;
+    private final KeyGroupRange keyGroupRange;
+    private final StateChangeStore store;
+    private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = 
new TreeMap<>();
+    private final NavigableMap<SequenceNumber, StoreResult> uploaded = new 
TreeMap<>();
+    private final NavigableMap<SequenceNumber, StoreResult> confirmed = new 
TreeMap<>();
+    private List<StateChange> activeChangeSet = new ArrayList<>();
+    private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L);
+    private boolean closed;
+    private final ChangelogCallbackExecutor executor;
+
+    FsStateChangelogWriter(
+            UUID logId,
+            KeyGroupRange keyGroupRange,
+            StateChangeStore store,
+            ChangelogCallbackExecutor executor) {
+        this.logId = logId;
+        this.keyGroupRange = keyGroupRange;
+        this.store = store;
+        this.executor = checkNotNull(executor);
+    }
+
+    @Override
+    public void append(int keyGroup, byte[] value) {
+        LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, 
value.length);
+        checkState(!closed, "%s is closed", logId);
+        activeChangeSet.add(new StateChange(keyGroup, value));
+        // size threshold could be added to call persist when reached. 
considerations:
+        // 0. can actually degrade performance by amplifying number of requests
+        // 1. which range to persist?
+        // 2. how to deal with retries/aborts?
+    }
+
+    @Override
+    public SequenceNumber lastAppendedSequenceNumber() {
+        rollover();
+        LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber);
+        return lastAppendedSequenceNumber;
+    }
+
+    @Override
+    public CompletableFuture<StateChangelogHandleStreamImpl> 
persist(SequenceNumber from)
+            throws IOException {
+        LOG.debug("persist {} from {}", logId, from);
+        checkNotNull(from);
+        // todo: check range
+
+        rollover();
+        Collection<StoreResult> readyToReturn = confirmed.tailMap(from, 
true).values();
+        Collection<StateChangeSet> toUpload = changeSets.tailMap(from, 
true).values();

Review comment:
       I'm thinking how could we later implement pre-emptive upload (i.e. call 
`persist()` from `append()`).
   
   In this version (#15371 or #15322 as opposed to #14839), we need to filter 
out already uploaded changes (`uploaded.subMap`) from `toUpload`.
   
   But we also don't want to include the changes from the previous, not yet 
confirmed/nor aborted checkpoint. Which will also reside in `uploaded`. Do you 
have any idea how to implement this?

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.changelog.fs;
-
-import org.apache.flink.util.function.RunnableWithException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-/**
- * A {@link RunnableWithException} executor that schedules a next attempt upon 
timeout based on
- * {@link RetryPolicy}. Aimed to curb tail latencies
- */
-class RetryingExecutor implements AutoCloseable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(RetryingExecutor.class);

Review comment:
       I'd rather like to see this removal as a separate commit.
   I'm also not sure that it should be removed. Are you planning to resurrect 
it in a subsequent PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to