Myasuka commented on a change in pull request #18976:
URL: https://github.com/apache/flink/pull/18976#discussion_r821847901
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
##########
@@ -17,129 +17,16 @@
package org.apache.flink.changelog.fs;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.io.AvailabilityProvider;
-import org.apache.flink.runtime.state.changelog.SequenceNumber;
-
-import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-
-import static java.util.stream.Collectors.toList;
-import static org.apache.flink.changelog.fs.FsStateChangelogOptions.BASE_PATH;
-import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.COMPRESSION_ENABLED;
-import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT;
-import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.NUM_UPLOAD_THREADS;
-import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.PERSIST_DELAY;
-import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD;
-import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.UPLOAD_BUFFER_SIZE;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-// todo: consider using CheckpointStreamFactory / CheckpointStorageWorkerView
-// Considerations:
-// 0. need for checkpointId in the current API to resolve the location
-// option a: pass checkpointId (race condition?)
-// option b: pass location (race condition?)
-// option c: add FsCheckpointStorageAccess.createSharedStateStream
-// 1. different settings for materialized/changelog (e.g. timeouts)
-// 2. re-use closeAndGetHandle
-// 3. re-use in-memory handles (.metadata)
-// 4. handle in-memory handles duplication
/**
* The purpose of this interface is to abstract the different implementations
of uploading state
* changelog parts. It has a single {@link #upload} method with a single
{@link UploadTask} argument
Review comment:
I think the doc should be modified to `a collection {@link UploadTask}`
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.flink.changelog.fs.FsStateChangelogOptions.BASE_PATH;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.COMPRESSION_ENABLED;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.NUM_UPLOAD_THREADS;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.PERSIST_DELAY;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.UPLOAD_BUFFER_SIZE;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+// todo: consider using CheckpointStreamFactory / CheckpointStorageWorkerView
+// Considerations:
+// 0. need for checkpointId in the current API to resolve the location
+// option a: pass checkpointId (race condition?)
+// option b: pass location (race condition?)
+// option c: add FsCheckpointStorageAccess.createSharedStateStream
+// 1. different settings for materialized/changelog (e.g. timeouts)
+// 2. re-use closeAndGetHandle
+// 3. re-use in-memory handles (.metadata)
+// 4. handle in-memory handles duplication
+
+/**
+ * Schedules {@link UploadTask upload tasks} on a {@link StateChangeUploader}.
In the simplest form,
+ * directly calls {@link StateChangeUploader#upload(Collection)}. Other
implementations might batch
+ * the tasks for efficiency.
+ */
+interface StateChangeUploadScheduler extends AutoCloseable {
+
+ /**
+ * Schedule the upload and {@link UploadTask#complete(List) complete} or
{@link
+ * UploadTask#fail(Throwable) fail} the corresponding tasks.
+ */
+ void upload(UploadTask uploadTask) throws IOException;
+
+ static StateChangeUploadScheduler directScheduler(StateChangeUploader
uploader) {
+ return new StateChangeUploadScheduler() {
+ @Override
+ public void upload(UploadTask uploadTask) throws IOException {
+ uploader.upload(singletonList(uploadTask)).complete();
+ }
+
+ @Override
+ public void close() throws Exception {
+ uploader.close();
+ }
+ };
+ }
+
+ static StateChangeUploadScheduler fromConfig(
+ ReadableConfig config, ChangelogStorageMetricGroup metricGroup)
throws IOException {
+ Path basePath = new Path(config.get(BASE_PATH));
+ long bytes = config.get(UPLOAD_BUFFER_SIZE).getBytes();
+ checkArgument(bytes <= Integer.MAX_VALUE);
+ int bufferSize = (int) bytes;
+ StateChangeFsUploader store =
+ new StateChangeFsUploader(
+ basePath,
+ basePath.getFileSystem(),
+ config.get(COMPRESSION_ENABLED),
+ bufferSize,
+ metricGroup);
+ BatchingStateChangeUploadScheduler batchingStore =
+ new BatchingStateChangeUploadScheduler(
+ config.get(PERSIST_DELAY).toMillis(),
+ config.get(PERSIST_SIZE_THRESHOLD).getBytes(),
+ RetryPolicy.fromConfig(config),
+ store,
+ config.get(NUM_UPLOAD_THREADS),
+ config.get(IN_FLIGHT_DATA_LIMIT).getBytes(),
+ metricGroup);
+ return batchingStore;
+ }
+
+ default AvailabilityProvider getAvailabilityProvider() {
+ return () -> AvailabilityProvider.AVAILABLE;
+ }
+
+ @ThreadSafe
+ final class UploadTask {
+ final Collection<StateChangeSet> changeSets;
+ final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
+ final Consumer<List<UploadResult>> successCallback;
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ public UploadTask(
+ Collection<StateChangeSet> changeSets,
+ Consumer<List<UploadResult>> successCallback,
+ BiConsumer<List<SequenceNumber>, Throwable> failureCallback) {
+ this.changeSets = new ArrayList<>(changeSets);
+ this.failureCallback = failureCallback;
+ this.successCallback = successCallback;
+ }
+
+ public void complete(List<UploadResult> results) {
+ if (finished.compareAndSet(false, true)) {
+ successCallback.accept(results);
+ }
+ }
+
+ public void fail(Throwable error) {
+ if (finished.compareAndSet(false, true)) {
+ failureCallback.accept(
+ changeSets.stream()
+ .map(StateChangeSet::getSequenceNumber)
+ .collect(toList()),
+ error);
+ }
+ }
+
+ public long getSize() {
+ long size = 0;
+ for (StateChangeSet set : changeSets) {
+ size = set.getSize();
Review comment:
I think we can make the size of `changeSets` larger than 1, or avoid the
`singletonList` in the `List<StateChangeSet> getChanges(int size)` method
within `BatchingStateChangeUploadSchedulerTest` to find this bug.
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
##########
@@ -96,19 +91,20 @@ public void upload(Collection<UploadTask> tasks) throws
IOException {
closer.register(() -> fileSystem.delete(path, true));
}
}
+ return null; // closer above throws an exception
Review comment:
Why not just throw the excpetion after closer's try-with-resource? If
so, we do not need to return a null here.
~~~java
} catch (IOException e) {
metrics.getUploadFailuresCounter().inc();
try (Closer closer = Closer.create()) {
tasks.forEach(cs -> closer.register(() -> cs.fail(e)));
closer.register(() -> fileSystem.delete(path, true));
}
throw e;
}
~~~
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.flink.changelog.fs.FsStateChangelogOptions.BASE_PATH;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.COMPRESSION_ENABLED;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.NUM_UPLOAD_THREADS;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.PERSIST_DELAY;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.UPLOAD_BUFFER_SIZE;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+// todo: consider using CheckpointStreamFactory / CheckpointStorageWorkerView
+// Considerations:
+// 0. need for checkpointId in the current API to resolve the location
+// option a: pass checkpointId (race condition?)
+// option b: pass location (race condition?)
+// option c: add FsCheckpointStorageAccess.createSharedStateStream
+// 1. different settings for materialized/changelog (e.g. timeouts)
+// 2. re-use closeAndGetHandle
+// 3. re-use in-memory handles (.metadata)
+// 4. handle in-memory handles duplication
+
+/**
+ * Schedules {@link UploadTask upload tasks} on a {@link StateChangeUploader}.
In the simplest form,
+ * directly calls {@link StateChangeUploader#upload(Collection)}. Other
implementations might batch
+ * the tasks for efficiency.
+ */
+interface StateChangeUploadScheduler extends AutoCloseable {
+
+ /**
+ * Schedule the upload and {@link UploadTask#complete(List) complete} or
{@link
+ * UploadTask#fail(Throwable) fail} the corresponding tasks.
+ */
+ void upload(UploadTask uploadTask) throws IOException;
+
+ static StateChangeUploadScheduler directScheduler(StateChangeUploader
uploader) {
+ return new StateChangeUploadScheduler() {
+ @Override
+ public void upload(UploadTask uploadTask) throws IOException {
+ uploader.upload(singletonList(uploadTask)).complete();
+ }
+
+ @Override
+ public void close() throws Exception {
+ uploader.close();
+ }
+ };
+ }
+
+ static StateChangeUploadScheduler fromConfig(
+ ReadableConfig config, ChangelogStorageMetricGroup metricGroup)
throws IOException {
+ Path basePath = new Path(config.get(BASE_PATH));
+ long bytes = config.get(UPLOAD_BUFFER_SIZE).getBytes();
+ checkArgument(bytes <= Integer.MAX_VALUE);
+ int bufferSize = (int) bytes;
+ StateChangeFsUploader store =
+ new StateChangeFsUploader(
+ basePath,
+ basePath.getFileSystem(),
+ config.get(COMPRESSION_ENABLED),
+ bufferSize,
+ metricGroup);
+ BatchingStateChangeUploadScheduler batchingStore =
+ new BatchingStateChangeUploadScheduler(
+ config.get(PERSIST_DELAY).toMillis(),
+ config.get(PERSIST_SIZE_THRESHOLD).getBytes(),
+ RetryPolicy.fromConfig(config),
+ store,
+ config.get(NUM_UPLOAD_THREADS),
+ config.get(IN_FLIGHT_DATA_LIMIT).getBytes(),
+ metricGroup);
+ return batchingStore;
+ }
+
+ default AvailabilityProvider getAvailabilityProvider() {
+ return () -> AvailabilityProvider.AVAILABLE;
+ }
+
+ @ThreadSafe
+ final class UploadTask {
+ final Collection<StateChangeSet> changeSets;
+ final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
+ final Consumer<List<UploadResult>> successCallback;
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ public UploadTask(
+ Collection<StateChangeSet> changeSets,
+ Consumer<List<UploadResult>> successCallback,
+ BiConsumer<List<SequenceNumber>, Throwable> failureCallback) {
+ this.changeSets = new ArrayList<>(changeSets);
+ this.failureCallback = failureCallback;
+ this.successCallback = successCallback;
+ }
+
+ public void complete(List<UploadResult> results) {
+ if (finished.compareAndSet(false, true)) {
+ successCallback.accept(results);
+ }
+ }
+
+ public void fail(Throwable error) {
+ if (finished.compareAndSet(false, true)) {
+ failureCallback.accept(
+ changeSets.stream()
+ .map(StateChangeSet::getSequenceNumber)
+ .collect(toList()),
+ error);
+ }
+ }
+
+ public long getSize() {
+ long size = 0;
+ for (StateChangeSet set : changeSets) {
+ size = set.getSize();
Review comment:
It should be `size += set.getSize();`
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.flink.changelog.fs.FsStateChangelogOptions.BASE_PATH;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.COMPRESSION_ENABLED;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.NUM_UPLOAD_THREADS;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.PERSIST_DELAY;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD;
+import static
org.apache.flink.changelog.fs.FsStateChangelogOptions.UPLOAD_BUFFER_SIZE;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+// todo: consider using CheckpointStreamFactory / CheckpointStorageWorkerView
+// Considerations:
+// 0. need for checkpointId in the current API to resolve the location
+// option a: pass checkpointId (race condition?)
+// option b: pass location (race condition?)
+// option c: add FsCheckpointStorageAccess.createSharedStateStream
+// 1. different settings for materialized/changelog (e.g. timeouts)
+// 2. re-use closeAndGetHandle
+// 3. re-use in-memory handles (.metadata)
+// 4. handle in-memory handles duplication
+
+/**
+ * Schedules {@link UploadTask upload tasks} on a {@link StateChangeUploader}.
In the simplest form,
+ * directly calls {@link StateChangeUploader#upload(Collection)}. Other
implementations might batch
+ * the tasks for efficiency.
+ */
+interface StateChangeUploadScheduler extends AutoCloseable {
+
+ /**
+ * Schedule the upload and {@link UploadTask#complete(List) complete} or
{@link
+ * UploadTask#fail(Throwable) fail} the corresponding tasks.
+ */
+ void upload(UploadTask uploadTask) throws IOException;
+
+ static StateChangeUploadScheduler directScheduler(StateChangeUploader
uploader) {
+ return new StateChangeUploadScheduler() {
+ @Override
+ public void upload(UploadTask uploadTask) throws IOException {
+ uploader.upload(singletonList(uploadTask)).complete();
+ }
+
+ @Override
+ public void close() throws Exception {
+ uploader.close();
+ }
+ };
+ }
+
+ static StateChangeUploadScheduler fromConfig(
+ ReadableConfig config, ChangelogStorageMetricGroup metricGroup)
throws IOException {
+ Path basePath = new Path(config.get(BASE_PATH));
+ long bytes = config.get(UPLOAD_BUFFER_SIZE).getBytes();
+ checkArgument(bytes <= Integer.MAX_VALUE);
+ int bufferSize = (int) bytes;
+ StateChangeFsUploader store =
+ new StateChangeFsUploader(
+ basePath,
+ basePath.getFileSystem(),
+ config.get(COMPRESSION_ENABLED),
+ bufferSize,
+ metricGroup);
+ BatchingStateChangeUploadScheduler batchingStore =
+ new BatchingStateChangeUploadScheduler(
+ config.get(PERSIST_DELAY).toMillis(),
+ config.get(PERSIST_SIZE_THRESHOLD).getBytes(),
+ RetryPolicy.fromConfig(config),
+ store,
+ config.get(NUM_UPLOAD_THREADS),
+ config.get(IN_FLIGHT_DATA_LIMIT).getBytes(),
+ metricGroup);
+ return batchingStore;
+ }
+
+ default AvailabilityProvider getAvailabilityProvider() {
+ return () -> AvailabilityProvider.AVAILABLE;
+ }
+
+ @ThreadSafe
+ final class UploadTask {
+ final Collection<StateChangeSet> changeSets;
+ final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
+ final Consumer<List<UploadResult>> successCallback;
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ public UploadTask(
+ Collection<StateChangeSet> changeSets,
+ Consumer<List<UploadResult>> successCallback,
+ BiConsumer<List<SequenceNumber>, Throwable> failureCallback) {
+ this.changeSets = new ArrayList<>(changeSets);
+ this.failureCallback = failureCallback;
+ this.successCallback = successCallback;
Review comment:
A minor suggestion that we could exchange the place of `failureCallback`
and `successCallback` to stay with the order of arguments.
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -170,16 +189,24 @@ private RetriableTask(
@Override
public void run() {
- LOG.debug("starting attempt {}", current);
+ LOG.debug("starting attempt {}", attemptNumber);
if (!actionCompleted.get()) {
Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout();
try {
- runnable.run();
+ Result result = action.tryExecute();
if (actionCompleted.compareAndSet(false, true)) {
- LOG.debug("succeeded with {} attempts", current);
- attemptsPerTaskHistogram.update(current);
+ LOG.debug("succeeded with {} attempts", attemptNumber);
+ action.completeWithResult(result);
+ attemptsPerTaskHistogram.update(attemptNumber);
+ } else {
+ LOG.debug(
+ "discard unnecessarily uploaded state, attempt
{}", attemptNumber);
+ try {
+ action.discardResult(result);
+ } catch (Exception e) {
+ LOG.warn("unable to discard execution attempt
result", e);
+ }
}
- attemptCompleted.set(true);
} catch (Exception e) {
handleError(e);
Review comment:
Just a question, how will we handle error if we could
`actionCompleted.compareAndSet(false, true)` but throwing a unchecked exception
within the suceess statement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]