pnowojski commented on a change in pull request #14839: URL: https://github.com/apache/flink/pull/14839#discussion_r586375585
########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeSet.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Comparator.comparingInt; +import static java.util.EnumSet.noneOf; +import static java.util.EnumSet.of; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.CANCELLED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.CONFIRMED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.FAILED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.MATERIALIZED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.SCHEDULED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.SENT_TO_JM; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.UPLOADED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.UPLOADING; +import static org.apache.flink.util.Preconditions.checkState; + +class StateChangeSet { + private final UUID logId; + private final AtomicReference<Status> status; Review comment: `AtomicReference`? What's the threading model here? Is this class thread safe? Also this class lacks a bit of an explanation what does it represent and how is it supposed to be used. ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.ExceptionUtils.rethrow; + +/** + * A {@link StateChangeStore} that waits for some configured amount of time before passing the + * accumulated state changes to the actual store. + */ +class BatchingStateChangeStore implements StateChangeStore { + private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeStore.class); + + private final ScheduledExecutorService scheduler; + private final long scheduleDelayMs; + private final BlockingQueue<StateChangeSet> scheduled; + private final AtomicBoolean drainScheduled; + private final RetryPolicy retryPolicy; + private volatile Throwable error; + private final int sizeThreshold; + private final RetryingExecutor retryingExecutor; + private final StateChangeStore delegate; + + BatchingStateChangeStore( + long persistDelayMs, + int sizeThreshold, + int requestQueueCapacity, + RetryPolicy retryPolicy, + StateChangeStore delegate) { + this.scheduleDelayMs = persistDelayMs; + this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true); + this.scheduler = SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG); Review comment: This creates a yet another thread for IO operations. Maybe good enough for MVP, but we should probably have a single thread pool for IO operations? ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.ExceptionUtils.rethrow; + +/** + * A {@link StateChangeStore} that waits for some configured amount of time before passing the + * accumulated state changes to the actual store. + */ +class BatchingStateChangeStore implements StateChangeStore { + private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeStore.class); + + private final ScheduledExecutorService scheduler; + private final long scheduleDelayMs; + private final BlockingQueue<StateChangeSet> scheduled; + private final AtomicBoolean drainScheduled; + private final RetryPolicy retryPolicy; + private volatile Throwable error; + private final int sizeThreshold; + private final RetryingExecutor retryingExecutor; + private final StateChangeStore delegate; + + BatchingStateChangeStore( + long persistDelayMs, + int sizeThreshold, + int requestQueueCapacity, + RetryPolicy retryPolicy, + StateChangeStore delegate) { + this.scheduleDelayMs = persistDelayMs; + this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true); + this.scheduler = SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG); + this.drainScheduled = new AtomicBoolean(false); + this.retryPolicy = retryPolicy; + this.retryingExecutor = new RetryingExecutor(); + this.sizeThreshold = sizeThreshold; + this.delegate = delegate; + } + + BatchingStateChangeStore( + long persistDelayMs, + int sizeThreshold, + int requestQueueCapacity, + RetryPolicy retryPolicy, + StateChangeStore delegate, + ScheduledExecutorService scheduler, + RetryingExecutor retryingExecutor) { + this.scheduleDelayMs = persistDelayMs; + this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true); + this.scheduler = scheduler; + this.drainScheduled = new AtomicBoolean(false); + this.retryPolicy = retryPolicy; + this.retryingExecutor = retryingExecutor; + this.sizeThreshold = sizeThreshold; + this.delegate = delegate; + } + + @Override + public void save(Collection<StateChangeSet> changeSets) { + if (error != null) { + LOG.debug("don't persist {} changesets, already failed", changeSets.size()); + changeSets.forEach(cs -> cs.setFailed(error)); + return; + } + LOG.debug("persist {} changeSets", changeSets.size()); + try { + for (StateChangeSet changeSet : changeSets) { + scheduled.put(changeSet); // blocks if no space in the queue + } + scheduleUploadIfNeeded(); + } catch (Exception e) { + changeSets.forEach(cs -> cs.setFailed(e)); + rethrow(e); + } + } + + private void scheduleUploadIfNeeded() { + if (scheduleDelayMs == 0 || scheduled.size() >= sizeThreshold) { Review comment: I think this size limit should be in bytes Could be a follow up ticket, but in that case I would just hardcode some number (`100`?) here, and in a later ticket implement proper size limit. ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING; +import static org.apache.flink.runtime.concurrent.FutureUtils.combineAll; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +@NotThreadSafe Review comment: I guess this belongs to the base `StateChangelogWriter` interface ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING; +import static org.apache.flink.runtime.concurrent.FutureUtils.combineAll; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +@NotThreadSafe +class FsStateChangelogWriter implements StateChangelogWriter<StateChangelogHandleStreamImpl> { + private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogWriter.class); + + private final UUID logId; + private final KeyGroupRange keyGroupRange; + private final StateChangeStore store; + private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = new TreeMap<>(); + private List<StateChange> activeChangeSet = new ArrayList<>(); + private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L); + private boolean closed; + + FsStateChangelogWriter(UUID logId, KeyGroupRange keyGroupRange, StateChangeStore store) { + this.logId = logId; + this.keyGroupRange = keyGroupRange; + this.store = store; + } + + @Override + public void append(int keyGroup, byte[] value) { + LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, value.length); + checkState(!closed, "%s is closed", logId); + activeChangeSet.add(new StateChange(keyGroup, value)); Review comment: Why don't we start writing changes here, as on happy path we need to persist everything, right? In `persist()` we should just `flush()` the writes and return a future when the writes are completed. ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeSet.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Comparator.comparingInt; +import static java.util.EnumSet.noneOf; +import static java.util.EnumSet.of; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.CANCELLED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.CONFIRMED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.FAILED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.MATERIALIZED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.SCHEDULED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.SENT_TO_JM; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.UPLOADED; +import static org.apache.flink.changelog.fs.StateChangeSet.Status.UPLOADING; +import static org.apache.flink.util.Preconditions.checkState; + +class StateChangeSet { + private final UUID logId; + private final AtomicReference<Status> status; + private final List<StateChange> changes; + private final SequenceNumber sequenceNumber; + private final CompletableFuture<StoreResult> storeResultFuture; Review comment: `CompletableFuture<StoreResult>` inside `StateChangeSet` sounds strange and makes threading boundaries hard to define (which methods of `StateChangeSet` are thread safe?). Why `CompletableFuture<StoreResult>` is not a returned from the `save(Collection< StateChangeSet>)` method? ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.ExceptionUtils.rethrow; + +/** + * A {@link StateChangeStore} that waits for some configured amount of time before passing the + * accumulated state changes to the actual store. + */ +class BatchingStateChangeStore implements StateChangeStore { + private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeStore.class); + + private final ScheduledExecutorService scheduler; + private final long scheduleDelayMs; + private final BlockingQueue<StateChangeSet> scheduled; + private final AtomicBoolean drainScheduled; + private final RetryPolicy retryPolicy; + private volatile Throwable error; Review comment: Why complicate the synchronisation model and use many small locks instead of one large one? It's already for me non obvious if there is or there is not race condition between completing previous `drainAndSave` and scheduling new one via checking `drainScheduled`. ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING; +import static org.apache.flink.runtime.concurrent.FutureUtils.combineAll; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +@NotThreadSafe +class FsStateChangelogWriter implements StateChangelogWriter<StateChangelogHandleStreamImpl> { + private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogWriter.class); + + private final UUID logId; + private final KeyGroupRange keyGroupRange; + private final StateChangeStore store; + private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = new TreeMap<>(); + private List<StateChange> activeChangeSet = new ArrayList<>(); + private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L); + private boolean closed; + + FsStateChangelogWriter(UUID logId, KeyGroupRange keyGroupRange, StateChangeStore store) { + this.logId = logId; + this.keyGroupRange = keyGroupRange; + this.store = store; + } + + @Override + public void append(int keyGroup, byte[] value) { + LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, value.length); + checkState(!closed, "%s is closed", logId); + activeChangeSet.add(new StateChange(keyGroup, value)); + // size threshold could be added to call persist when reached. considerations: + // 0. can actually degrade performance by amplifying number of requests + // 1. which range to persist? + // 2. how to deal with retries/aborts? + } + + @Override + public SequenceNumber lastAppendedSequenceNumber() { + LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber); Review comment: logged `lastAppendedSequenceNumber` is different compared to the returned value. I'm not sure if that's the intention. ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.ExceptionUtils.rethrow; + +/** + * A {@link StateChangeStore} that waits for some configured amount of time before passing the + * accumulated state changes to the actual store. + */ +class BatchingStateChangeStore implements StateChangeStore { + private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeStore.class); + + private final ScheduledExecutorService scheduler; + private final long scheduleDelayMs; + private final BlockingQueue<StateChangeSet> scheduled; + private final AtomicBoolean drainScheduled; + private final RetryPolicy retryPolicy; + private volatile Throwable error; + private final int sizeThreshold; + private final RetryingExecutor retryingExecutor; + private final StateChangeStore delegate; + + BatchingStateChangeStore( + long persistDelayMs, + int sizeThreshold, + int requestQueueCapacity, + RetryPolicy retryPolicy, + StateChangeStore delegate) { + this.scheduleDelayMs = persistDelayMs; + this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true); + this.scheduler = SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG); + this.drainScheduled = new AtomicBoolean(false); + this.retryPolicy = retryPolicy; + this.retryingExecutor = new RetryingExecutor(); + this.sizeThreshold = sizeThreshold; + this.delegate = delegate; + } + + BatchingStateChangeStore( + long persistDelayMs, + int sizeThreshold, + int requestQueueCapacity, + RetryPolicy retryPolicy, + StateChangeStore delegate, + ScheduledExecutorService scheduler, + RetryingExecutor retryingExecutor) { + this.scheduleDelayMs = persistDelayMs; + this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true); + this.scheduler = scheduler; + this.drainScheduled = new AtomicBoolean(false); + this.retryPolicy = retryPolicy; + this.retryingExecutor = retryingExecutor; + this.sizeThreshold = sizeThreshold; + this.delegate = delegate; + } + + @Override + public void save(Collection<StateChangeSet> changeSets) { + if (error != null) { + LOG.debug("don't persist {} changesets, already failed", changeSets.size()); + changeSets.forEach(cs -> cs.setFailed(error)); + return; + } + LOG.debug("persist {} changeSets", changeSets.size()); + try { + for (StateChangeSet changeSet : changeSets) { + scheduled.put(changeSet); // blocks if no space in the queue + } + scheduleUploadIfNeeded(); + } catch (Exception e) { + changeSets.forEach(cs -> cs.setFailed(e)); + rethrow(e); + } Review comment: Why `rethrow` as `RuntimeException`? ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING; +import static org.apache.flink.runtime.concurrent.FutureUtils.combineAll; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +@NotThreadSafe +class FsStateChangelogWriter implements StateChangelogWriter<StateChangelogHandleStreamImpl> { + private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogWriter.class); + + private final UUID logId; + private final KeyGroupRange keyGroupRange; + private final StateChangeStore store; + private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = new TreeMap<>(); + private List<StateChange> activeChangeSet = new ArrayList<>(); + private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L); + private boolean closed; + + FsStateChangelogWriter(UUID logId, KeyGroupRange keyGroupRange, StateChangeStore store) { + this.logId = logId; + this.keyGroupRange = keyGroupRange; + this.store = store; + } + + @Override + public void append(int keyGroup, byte[] value) { + LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, value.length); + checkState(!closed, "%s is closed", logId); + activeChangeSet.add(new StateChange(keyGroup, value)); + // size threshold could be added to call persist when reached. considerations: + // 0. can actually degrade performance by amplifying number of requests + // 1. which range to persist? + // 2. how to deal with retries/aborts? + } + + @Override + public SequenceNumber lastAppendedSequenceNumber() { + LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber); + rollover(); + return lastAppendedSequenceNumber; + } Review comment: Why do we need to rollover for a getter? ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeStore.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.ExceptionUtils.rethrow; + +/** + * A {@link StateChangeStore} that waits for some configured amount of time before passing the + * accumulated state changes to the actual store. + */ +class BatchingStateChangeStore implements StateChangeStore { + private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeStore.class); + + private final ScheduledExecutorService scheduler; + private final long scheduleDelayMs; + private final BlockingQueue<StateChangeSet> scheduled; + private final AtomicBoolean drainScheduled; + private final RetryPolicy retryPolicy; + private volatile Throwable error; + private final int sizeThreshold; + private final RetryingExecutor retryingExecutor; + private final StateChangeStore delegate; + + BatchingStateChangeStore( + long persistDelayMs, + int sizeThreshold, + int requestQueueCapacity, + RetryPolicy retryPolicy, + StateChangeStore delegate) { + this.scheduleDelayMs = persistDelayMs; + this.scheduled = new ArrayBlockingQueue<>(requestQueueCapacity, true); + this.scheduler = SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG); + this.drainScheduled = new AtomicBoolean(false); + this.retryPolicy = retryPolicy; + this.retryingExecutor = new RetryingExecutor(); + this.sizeThreshold = sizeThreshold; + this.delegate = delegate; + } + + BatchingStateChangeStore( + long persistDelayMs, + int sizeThreshold, + int requestQueueCapacity, + RetryPolicy retryPolicy, + StateChangeStore delegate, + ScheduledExecutorService scheduler, + RetryingExecutor retryingExecutor) { Review comment: Can you explain why do we need both of them? Why isn't a single executor enough? ########## File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.StateChangelogWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.changelog.fs.StateChangeSet.Status.PENDING; +import static org.apache.flink.runtime.concurrent.FutureUtils.combineAll; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +@NotThreadSafe +class FsStateChangelogWriter implements StateChangelogWriter<StateChangelogHandleStreamImpl> { + private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogWriter.class); + + private final UUID logId; + private final KeyGroupRange keyGroupRange; + private final StateChangeStore store; + private final NavigableMap<SequenceNumber, StateChangeSet> changeSets = new TreeMap<>(); + private List<StateChange> activeChangeSet = new ArrayList<>(); + private SequenceNumber lastAppendedSequenceNumber = SequenceNumber.of(0L); + private boolean closed; + + FsStateChangelogWriter(UUID logId, KeyGroupRange keyGroupRange, StateChangeStore store) { + this.logId = logId; + this.keyGroupRange = keyGroupRange; + this.store = store; + } + + @Override + public void append(int keyGroup, byte[] value) { + LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, value.length); + checkState(!closed, "%s is closed", logId); + activeChangeSet.add(new StateChange(keyGroup, value)); + // size threshold could be added to call persist when reached. considerations: + // 0. can actually degrade performance by amplifying number of requests + // 1. which range to persist? + // 2. how to deal with retries/aborts? + } + + @Override + public SequenceNumber lastAppendedSequenceNumber() { + LOG.trace("query {} sqn: {}", logId, lastAppendedSequenceNumber); + rollover(); + return lastAppendedSequenceNumber; + } + + @Override + public CompletableFuture<StateChangelogHandleStreamImpl> persist(SequenceNumber from) + throws IOException { + LOG.debug("persist {} from {}", logId, from); + checkNotNull(from); + checkArgument( + lastAppendedSequenceNumber.next().equals(from) || changeSets.containsKey(from), + "sequence number %s to persist from not in range (%s:%s/%s)", + from, + changeSets.isEmpty() ? null : changeSets.firstKey(), + changeSets.isEmpty() ? null : changeSets.lastKey(), + lastAppendedSequenceNumber.next()); + + rollover(); + Collected collected = collect(from); + collected.toRetry.forEach( + changeSet -> changeSets.put(changeSet.getSequenceNumber(), changeSet)); + LOG.debug("collected {}", collected); + store.save(collected.toUpload); + return asHandle(collected.toReturn); + } + + private Collected collect(SequenceNumber from) { + Collected result = new Collected(); + changeSets + .tailMap(from, true) + .values() + .forEach( + changeSet -> { + if (changeSet.isConfirmed()) { + result.toReturn.add(changeSet); + } else if (changeSet.setScheduled()) { + result.toUpload.add(changeSet); + } else { + // we also re-upload any scheduled/uploading/uploaded changes + // even if they were not sent to the JM yet because this can happen + // in the meantime and then JM can decide to discard them + result.toRetry.add(changeSet.forRetry()); + } + }); + result.toUpload.addAll(result.toRetry); + result.toReturn.addAll(result.toUpload); + return result; + } + + @Override + public void close() { + LOG.debug("close {}", logId); + checkState(!closed); + closed = true; + activeChangeSet.clear(); + changeSets.values().forEach(StateChangeSet::setCancelled); + // todo in MVP or later: cleanup if transition succeeded and had non-shared state + changeSets.clear(); + // the store is closed from the owning FsStateChangelogClient + } + + @Override + public void confirm(SequenceNumber from, SequenceNumber to) { + LOG.debug("confirm {} from {} to {}", logId, from, to); + changeSets.subMap(from, true, to, false).values().forEach(StateChangeSet::setConfirmed); + } + + @Override + public void reset(SequenceNumber from, SequenceNumber to) { + LOG.debug("reset {} from {} to {}", logId, from, to); + changeSets.subMap(from, to).forEach((key, value) -> value.setAborted()); + // todo in MVP or later: cleanup if change to aborted succeeded and had non-shared state + // For now, relying on manual cleanup. + // If a checkpoint that is aborted uses the changes uploaded for another checkpoint + // which was completed on JM but not confirmed to this TM + // then discarding those changes would invalidate that previously completed checkpoint. + // Solution: + // 1. pass last completed checkpoint id in barriers, trigger RPC, and abort RPC + // 2. confirm for the id above + // 3. make sure that at most 1 checkpoint in flight (CC config) + } + + @Override + public void truncate(SequenceNumber to) { + LOG.debug("truncate {} to {}", logId, to); + if (to.compareTo(lastAppendedSequenceNumber) > 0) { + // can happen if client calls truncate(prevSqn.next()) + rollover(); + } + NavigableMap<SequenceNumber, StateChangeSet> headMap = changeSets.headMap(to, false); + headMap.values().forEach(StateChangeSet::setTruncated); + headMap.clear(); + } Review comment: Are we holding in memory all state changes until we call truncate? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
