rkhachatryan commented on a change in pull request #15371: URL: https://github.com/apache/flink/pull/15371#discussion_r602353460
########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.StateChangeStore.StoreTask; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory.ChangelogCallbackExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Basic implementation of {@link StateChangelogWriter}. It's main purpose is to keep track which + * changes are required for the given {@link #persist(SequenceNumber)} call. Especially it takes + * care of re-uploading changes from the previous {@link #persist(SequenceNumber)} call, if those + * changes haven't been yet {@link #confirm(SequenceNumber, SequenceNumber)}'ed. This is crucial as + * until changes are {@link #confirm(SequenceNumber, SequenceNumber)}, they still can be aborted and + * removed/deleted. + * + * <p>For example if checkpoint N-1 fails and is disposed, after checkpoint N has already started. + * In this case, when we are persisting {@link StateChangeSet}s for checkpoint N, we need to + * re-upload {@link StateChangeSet}s that belonged to checkpoint N-1. + */ +@NotThreadSafe +class FsStateChangelogWriter implements StateChangelogWriter<StateChangelogHandleStreamImpl> { + private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogWriter.class); + + private final UUID logId; + private final KeyGroupRange keyGroupRange; + private final StateChangeStore store; + private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = new TreeMap<>(); + private final NavigableMap<SequenceNumber, StoreResult> uploaded = new TreeMap<>(); + private final NavigableMap<SequenceNumber, StoreResult> confirmed = new TreeMap<>(); + private List<StateChange> activeChangeSet = new ArrayList<>(); + private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L); + private boolean closed; + private final ChangelogCallbackExecutor executor; + + FsStateChangelogWriter( + UUID logId, + KeyGroupRange keyGroupRange, + StateChangeStore store, + ChangelogCallbackExecutor executor) { + this.logId = logId; + this.keyGroupRange = keyGroupRange; + this.store = store; + this.executor = checkNotNull(executor); + } + + @Override + public void append(int keyGroup, byte[] value) { + LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, value.length); + checkState(!closed, "%s is closed", logId); + activeChangeSet.add(new StateChange(keyGroup, value)); + // size threshold could be added to call persist when reached. considerations: + // 0. can actually degrade performance by amplifying number of requests + // 1. which range to persist? + // 2. how to deal with retries/aborts? + } + + @Override + public SequenceNumber lastAppendedSequenceNumber() { + rollover(); + LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber); + return lastAppendedSequenceNumber; + } + + @Override + public CompletableFuture<StateChangelogHandleStreamImpl> persist(SequenceNumber from) + throws IOException { + LOG.debug("persist {} from {}", logId, from); + checkNotNull(from); + // todo: check range + + rollover(); + Collection<StoreResult> readyToReturn = confirmed.tailMap(from, true).values(); + Collection<StateChangeSet> toUpload = changeSets.tailMap(from, true).values(); + LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn, toUpload); + + StoreTask task = new StoreTask(toUpload); + // Handle the result result in supplied executor (mailbox) thread + CompletableFuture<StateChangelogHandleStreamImpl> resultFuture = + task.getResultFuture() + .thenApplyAsync( + (results) -> { + results.forEach(e -> uploaded.put(e.sequenceNumber, e)); Review comment: I think we need to check here whether the upload result is still relevant (or another upload was started in the meantime). For example, it could have been aborted, or another checkpoint (with the overlapping change range) was started. (this was one of concerns when I published #15322) ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +import static java.lang.Thread.holdsLock; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link StateChangeStore} that waits for some configured amount of time before passing the + * accumulated state changes to the actual store. The writes are executed asynchronously. + */ +@ThreadSafe +class BatchingStateChangeStore implements StateChangeStore { + private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeStore.class); + + private final StateChangeStore delegate; + private final ScheduledExecutorService scheduler; + private final long scheduleDelayMs; + private final long sizeThresholdBytes; + + @GuardedBy("scheduled") + private final Queue<StoreTask> scheduled; + + @GuardedBy("scheduled") + private long scheduledSizeInBytes; + + @GuardedBy("scheduled") + private Future<?> scheduledFuture = CompletableFuture.completedFuture(null); + + private AtomicReference<Throwable> error = new AtomicReference<>(null); + + BatchingStateChangeStore( + long persistDelayMs, long sizeThresholdBytes, StateChangeStore delegate) { + this( + persistDelayMs, + sizeThresholdBytes, + delegate, + SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG)); + } + + BatchingStateChangeStore( + long persistDelayMs, + long sizeThresholdBytes, + StateChangeStore delegate, + ScheduledExecutorService scheduler) { + this.scheduleDelayMs = persistDelayMs; + this.scheduled = new LinkedList<>(); + this.scheduler = scheduler; + this.sizeThresholdBytes = sizeThresholdBytes; + this.delegate = delegate; + } + + @Override + public void save(StoreTask storeTask) { + Collection<StateChangeSet> changeSets = storeTask.changeSets; + if (error.get() != null) { + LOG.debug("don't persist {} changesets, already failed", changeSets.size()); + storeTask.fail(error.get()); + return; + } + LOG.debug("persist {} changeSets", changeSets.size()); + try { + synchronized (scheduled) { + scheduled.add(storeTask); + scheduledSizeInBytes += storeTask.getSize(); + scheduleUploadIfNeeded(); + } + } catch (Exception e) { + storeTask.fail(e); + throw e; + } + } + + private void scheduleUploadIfNeeded() { + checkState(holdsLock(scheduled)); + if (scheduledFuture.isDone() || isOverSizeThresholdAndCancellationSucceded()) { + scheduleUpload(); Review comment: I think there is a race condition here which could lead to upload not to be scheduled: 1. Future is runnig in `drainAndSave()`, enters `scheduleUploadIfNeeded()` and sees no need to reschedule (no data added) 2. It exits the `synchronized` section but not the task yet (so future isn't finished yet) 3. Another thread adds data and sees future.isDone == false, so it doesn't schedule neither WDYT? (the related issue below is about `scheduledFuture.cancel`, this one is about `scheduledFuture.isDone()`; I guess fix will address both of them) ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +import static java.lang.Thread.holdsLock; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link StateChangeStore} that waits for some configured amount of time before passing the + * accumulated state changes to the actual store. The writes are executed asynchronously. + */ +@ThreadSafe +class BatchingStateChangeStore implements StateChangeStore { + private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeStore.class); + + private final StateChangeStore delegate; + private final ScheduledExecutorService scheduler; + private final long scheduleDelayMs; + private final long sizeThresholdBytes; + + @GuardedBy("scheduled") + private final Queue<StoreTask> scheduled; + + @GuardedBy("scheduled") + private long scheduledSizeInBytes; + + @GuardedBy("scheduled") + private Future<?> scheduledFuture = CompletableFuture.completedFuture(null); + + private AtomicReference<Throwable> error = new AtomicReference<>(null); + + BatchingStateChangeStore( + long persistDelayMs, long sizeThresholdBytes, StateChangeStore delegate) { + this( + persistDelayMs, + sizeThresholdBytes, + delegate, + SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG)); + } + + BatchingStateChangeStore( + long persistDelayMs, + long sizeThresholdBytes, + StateChangeStore delegate, + ScheduledExecutorService scheduler) { + this.scheduleDelayMs = persistDelayMs; + this.scheduled = new LinkedList<>(); + this.scheduler = scheduler; + this.sizeThresholdBytes = sizeThresholdBytes; + this.delegate = delegate; + } + + @Override + public void save(StoreTask storeTask) { + Collection<StateChangeSet> changeSets = storeTask.changeSets; + if (error.get() != null) { + LOG.debug("don't persist {} changesets, already failed", changeSets.size()); + storeTask.fail(error.get()); + return; + } + LOG.debug("persist {} changeSets", changeSets.size()); + try { + synchronized (scheduled) { + scheduled.add(storeTask); + scheduledSizeInBytes += storeTask.getSize(); + scheduleUploadIfNeeded(); + } + } catch (Exception e) { + storeTask.fail(e); + throw e; + } + } + + private void scheduleUploadIfNeeded() { + checkState(holdsLock(scheduled)); + if (scheduledFuture.isDone() || isOverSizeThresholdAndCancellationSucceded()) { + scheduleUpload(); + } + } + + private boolean isOverSizeThresholdAndCancellationSucceded() { + return scheduledSizeInBytes >= sizeThresholdBytes && scheduledFuture.cancel(false); + } Review comment: I think the 2nd check may result in threshold being exceeded but not scheduled for upload: 1. T1: add 1 byte, schedule future 1 2. Executor: waits and eventually runs future 1, calls `scheduleUploadIfNeeded` and returns without scheduling; now thread gets suspended by the OS (so future 1 isn't completed) 2. T1: add more bytes exceeding the threshold; try to cancel future 1 and fail => future2 is not scheduled by T1 4. Executor also doesn't schedule any future WDYT? ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java ########## @@ -136,21 +133,23 @@ private void drainAndSave() { tasks = new ArrayList<>(scheduled); scheduled.clear(); scheduledSizeInBytes = 0; - scheduledFuture = null; } try { - if (error != null) { - tasks.forEach(task -> task.fail(error)); + if (error.get() != null) { + tasks.forEach(task -> task.fail(error.get())); return; } - retryingExecutor.execute(retryPolicy, () -> delegate.save(tasks)); + delegate.save(tasks); + + synchronized (scheduled) { + scheduleUploadIfNeeded(); + } } catch (Throwable t) { tasks.forEach(task -> task.fail(t)); if (findThrowable(t, IOException.class).isPresent()) { LOG.warn("Caught IO exception while uploading", t); } else { - error = t; - throw t; + error.compareAndSet(null, t); Review comment: Could you explain the removal of `throw t;` ? Without it: 1. error can be not logged at all - if no more upload scheduled 2. or if upload errors are not logged later 3. it can be logged with a delay (complicated debug) ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java ########## @@ -136,21 +133,23 @@ private void drainAndSave() { tasks = new ArrayList<>(scheduled); scheduled.clear(); scheduledSizeInBytes = 0; - scheduledFuture = null; } try { - if (error != null) { - tasks.forEach(task -> task.fail(error)); + if (error.get() != null) { + tasks.forEach(task -> task.fail(error.get())); return; } - retryingExecutor.execute(retryPolicy, () -> delegate.save(tasks)); + delegate.save(tasks); + + synchronized (scheduled) { + scheduleUploadIfNeeded(); + } Review comment: This won't be executed in case of exception (but I guess it will change if concurrency issues are addressed). But more important, no future uploads will start because `this.error` is set later in `catch` block. I guess the assumption is that a single failure leads to the whole job failover? But that shouldn't be the case, as tasks should tolerate checkpoint failures and it's the JM who decides how to handle checkpoint failures. ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +import static java.lang.Thread.holdsLock; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link StateChangeStore} that waits for some configured amount of time before passing the + * accumulated state changes to the actual store. The writes are executed asynchronously. + */ +@ThreadSafe +class BatchingStateChangeStore implements StateChangeStore { + private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeStore.class); + + private final StateChangeStore delegate; + private final ScheduledExecutorService scheduler; + private final long scheduleDelayMs; + private final long sizeThresholdBytes; + + @GuardedBy("scheduled") + private final Queue<StoreTask> scheduled; + + @GuardedBy("scheduled") + private long scheduledSizeInBytes; + + @GuardedBy("scheduled") + private Future<?> scheduledFuture = CompletableFuture.completedFuture(null); + + private AtomicReference<Throwable> error = new AtomicReference<>(null); + + BatchingStateChangeStore( + long persistDelayMs, long sizeThresholdBytes, StateChangeStore delegate) { + this( + persistDelayMs, + sizeThresholdBytes, + delegate, + SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG)); + } + + BatchingStateChangeStore( + long persistDelayMs, + long sizeThresholdBytes, + StateChangeStore delegate, + ScheduledExecutorService scheduler) { + this.scheduleDelayMs = persistDelayMs; + this.scheduled = new LinkedList<>(); + this.scheduler = scheduler; + this.sizeThresholdBytes = sizeThresholdBytes; + this.delegate = delegate; + } + + @Override + public void save(StoreTask storeTask) { + Collection<StateChangeSet> changeSets = storeTask.changeSets; + if (error.get() != null) { + LOG.debug("don't persist {} changesets, already failed", changeSets.size()); + storeTask.fail(error.get()); + return; + } + LOG.debug("persist {} changeSets", changeSets.size()); + try { + synchronized (scheduled) { + scheduled.add(storeTask); + scheduledSizeInBytes += storeTask.getSize(); + scheduleUploadIfNeeded(); + } + } catch (Exception e) { + storeTask.fail(e); + throw e; + } + } + + private void scheduleUploadIfNeeded() { + checkState(holdsLock(scheduled)); + if (scheduledFuture.isDone() || isOverSizeThresholdAndCancellationSucceded()) { + scheduleUpload(); + } + } + + private boolean isOverSizeThresholdAndCancellationSucceded() { + return scheduledSizeInBytes >= sizeThresholdBytes && scheduledFuture.cancel(false); + } + + private void scheduleUpload() { + checkState(scheduledFuture.isDone()); + long delay = scheduleDelayMs; + if (scheduleDelayMs == 0 || scheduledSizeInBytes >= sizeThresholdBytes) { + scheduledFuture = scheduler.submit(this::drainAndSave); + } else { + scheduledFuture = scheduler.schedule(this::drainAndSave, delay, MILLISECONDS); + } + } + + private void drainAndSave() { + Collection<StoreTask> tasks; + synchronized (scheduled) { + tasks = new ArrayList<>(scheduled); + scheduled.clear(); + scheduledSizeInBytes = 0; + } + try { + if (error.get() != null) { + tasks.forEach(task -> task.fail(error.get())); + return; + } + delegate.save(tasks); Review comment: This is a single-threaded upload now, I think we need more threads as discussed offline. ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.StateChangeStore.StoreTask; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory.ChangelogCallbackExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Basic implementation of {@link StateChangelogWriter}. It's main purpose is to keep track which + * changes are required for the given {@link #persist(SequenceNumber)} call. Especially it takes + * care of re-uploading changes from the previous {@link #persist(SequenceNumber)} call, if those + * changes haven't been yet {@link #confirm(SequenceNumber, SequenceNumber)}'ed. This is crucial as + * until changes are {@link #confirm(SequenceNumber, SequenceNumber)}, they still can be aborted and + * removed/deleted. + * + * <p>For example if checkpoint N-1 fails and is disposed, after checkpoint N has already started. + * In this case, when we are persisting {@link StateChangeSet}s for checkpoint N, we need to + * re-upload {@link StateChangeSet}s that belonged to checkpoint N-1. + */ +@NotThreadSafe +class FsStateChangelogWriter implements StateChangelogWriter<StateChangelogHandleStreamImpl> { + private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogWriter.class); + + private final UUID logId; + private final KeyGroupRange keyGroupRange; + private final StateChangeStore store; + private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = new TreeMap<>(); + private final NavigableMap<SequenceNumber, StoreResult> uploaded = new TreeMap<>(); + private final NavigableMap<SequenceNumber, StoreResult> confirmed = new TreeMap<>(); + private List<StateChange> activeChangeSet = new ArrayList<>(); + private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L); + private boolean closed; + private final ChangelogCallbackExecutor executor; + + FsStateChangelogWriter( + UUID logId, + KeyGroupRange keyGroupRange, + StateChangeStore store, + ChangelogCallbackExecutor executor) { + this.logId = logId; + this.keyGroupRange = keyGroupRange; + this.store = store; + this.executor = checkNotNull(executor); + } + + @Override + public void append(int keyGroup, byte[] value) { + LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, value.length); + checkState(!closed, "%s is closed", logId); + activeChangeSet.add(new StateChange(keyGroup, value)); + // size threshold could be added to call persist when reached. considerations: + // 0. can actually degrade performance by amplifying number of requests + // 1. which range to persist? + // 2. how to deal with retries/aborts? + } + + @Override + public SequenceNumber lastAppendedSequenceNumber() { + rollover(); + LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber); + return lastAppendedSequenceNumber; + } + + @Override + public CompletableFuture<StateChangelogHandleStreamImpl> persist(SequenceNumber from) + throws IOException { + LOG.debug("persist {} from {}", logId, from); + checkNotNull(from); + // todo: check range + + rollover(); + Collection<StoreResult> readyToReturn = confirmed.tailMap(from, true).values(); + Collection<StateChangeSet> toUpload = changeSets.tailMap(from, true).values(); + LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn, toUpload); + + StoreTask task = new StoreTask(toUpload); + // Handle the result result in supplied executor (mailbox) thread + CompletableFuture<StateChangelogHandleStreamImpl> resultFuture = + task.getResultFuture() + .thenApplyAsync( + (results) -> { + results.forEach(e -> uploaded.put(e.sequenceNumber, e)); + return buildHandle(results, readyToReturn); + }, + executor::execute); + store.save(task); + return resultFuture; + } + + @Override + public void close() { + LOG.debug("close {}", logId); + checkState(!closed); + closed = true; + activeChangeSet.clear(); + // todo in MVP or later: cleanup if transition succeeded and had non-shared state + changeSets.clear(); + uploaded.clear(); + confirmed.clear(); + // the store is closed from the owning FsStateChangelogClient + } + + @Override + public void confirm(SequenceNumber from, SequenceNumber to) { + LOG.debug("confirm {} from {} to {}", logId, from, to); + uploaded.subMap(from, true, to, false) + .forEach( + (sequenceNumber, changeSetAndResult) -> { + changeSets.remove(sequenceNumber); + uploaded.remove(sequenceNumber); + confirmed.put(sequenceNumber, changeSetAndResult); Review comment: I think that we need to check whether this range is still relevant (or was truncated in the meantime). (this was one of concerns when I published #15322) ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.StateChangeStore.StoreTask; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; +import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory.ChangelogCallbackExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Basic implementation of {@link StateChangelogWriter}. It's main purpose is to keep track which + * changes are required for the given {@link #persist(SequenceNumber)} call. Especially it takes + * care of re-uploading changes from the previous {@link #persist(SequenceNumber)} call, if those + * changes haven't been yet {@link #confirm(SequenceNumber, SequenceNumber)}'ed. This is crucial as + * until changes are {@link #confirm(SequenceNumber, SequenceNumber)}, they still can be aborted and + * removed/deleted. + * + * <p>For example if checkpoint N-1 fails and is disposed, after checkpoint N has already started. + * In this case, when we are persisting {@link StateChangeSet}s for checkpoint N, we need to + * re-upload {@link StateChangeSet}s that belonged to checkpoint N-1. + */ +@NotThreadSafe +class FsStateChangelogWriter implements StateChangelogWriter<StateChangelogHandleStreamImpl> { + private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogWriter.class); + + private final UUID logId; + private final KeyGroupRange keyGroupRange; + private final StateChangeStore store; + private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = new TreeMap<>(); + private final NavigableMap<SequenceNumber, StoreResult> uploaded = new TreeMap<>(); + private final NavigableMap<SequenceNumber, StoreResult> confirmed = new TreeMap<>(); + private List<StateChange> activeChangeSet = new ArrayList<>(); + private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L); + private boolean closed; + private final ChangelogCallbackExecutor executor; + + FsStateChangelogWriter( + UUID logId, + KeyGroupRange keyGroupRange, + StateChangeStore store, + ChangelogCallbackExecutor executor) { + this.logId = logId; + this.keyGroupRange = keyGroupRange; + this.store = store; + this.executor = checkNotNull(executor); + } + + @Override + public void append(int keyGroup, byte[] value) { + LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, value.length); + checkState(!closed, "%s is closed", logId); + activeChangeSet.add(new StateChange(keyGroup, value)); + // size threshold could be added to call persist when reached. considerations: + // 0. can actually degrade performance by amplifying number of requests + // 1. which range to persist? + // 2. how to deal with retries/aborts? + } + + @Override + public SequenceNumber lastAppendedSequenceNumber() { + rollover(); + LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber); + return lastAppendedSequenceNumber; + } + + @Override + public CompletableFuture<StateChangelogHandleStreamImpl> persist(SequenceNumber from) + throws IOException { + LOG.debug("persist {} from {}", logId, from); + checkNotNull(from); + // todo: check range + + rollover(); + Collection<StoreResult> readyToReturn = confirmed.tailMap(from, true).values(); + Collection<StateChangeSet> toUpload = changeSets.tailMap(from, true).values(); Review comment: I'm thinking how could we later implement pre-emptive upload (i.e. call `persist()` from `append()`). In this version (#15371 or #15322 as opposed to #14839), we need to filter out already uploaded changes (`uploaded.subMap`) from `toUpload`. But we also don't want to include the changes from the previous, not yet confirmed/nor aborted checkpoint. Which will also reside in `uploaded`. Do you have any idea how to implement this? ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java ########## @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.changelog.fs; - -import org.apache.flink.util.function.RunnableWithException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -/** - * A {@link RunnableWithException} executor that schedules a next attempt upon timeout based on - * {@link RetryPolicy}. Aimed to curb tail latencies - */ -class RetryingExecutor implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class); Review comment: I'd rather like to see this removal as a separate commit. I'm also not sure that it should be removed. Are you planning to resurrect it in a subsequent PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
