pnowojski commented on a change in pull request #17229:
URL: https://github.com/apache/flink/pull/17229#discussion_r750169518
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java
##########
@@ -206,16 +251,27 @@ private synchronized void setErrorSafe(Throwable t) {
}
private static UploadTask wrapWithSizeUpdate(
- UploadTask uploadTask, long preComputedTaskSize, LongAdder
inflightSize) {
+ UploadTask uploadTask, Runnable releaseCapacityCallback) {
return new UploadTask(
uploadTask.changeSets,
result -> {
- inflightSize.add(-preComputedTaskSize);
- uploadTask.successCallback.accept(result);
+ try {
+ releaseCapacityCallback.run();
Review comment:
Why indirect `Runnable`? Can not we call `releaseCapacity(size)`
directly here?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -303,6 +304,8 @@
@GuardedBy("shouldInterruptOnCancelLock")
private boolean shouldInterruptOnCancel = true;
+ private final AvailabilityProvider changelogWriterAvailabilityProvider;
Review comment:
nit: Re the `null` version. I think I would sleep better with it here.
Maybe the change is barely visible (1% degradation?) or even if it's not
visible, it would be a one less potential issue to wonder/investigate/optimise
for someone in the future.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -474,7 +484,8 @@ protected void processInput(MailboxDefaultAction.Controller
controller) throws E
DataInputStatus status = inputProcessor.processInput();
switch (status) {
case MORE_AVAILABLE:
- if (recordWriter.isAvailable()) {
+ if (recordWriter.isAvailable()
+ &&
changelogWriterAvailabilityProvider.isApproximatelyAvailable()) {
Review comment:
Why `isApproximatelyAvailable` and not `isAvailable`? Isn't this
actually an anti optimisation? If the
`changelogWriterAvailabilityProvider.getAvailableFuture() != AVAILABLE`, it
should be actually cheaper to check
`changelogWriterAvailabilityProvider.getAvailableFuture().isDone()` rather then
suspend mailbox and wake it up via `CompletableFuture#thenRun` callback.
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java
##########
@@ -206,16 +251,27 @@ private synchronized void setErrorSafe(Throwable t) {
}
private static UploadTask wrapWithSizeUpdate(
- UploadTask uploadTask, long preComputedTaskSize, LongAdder
inflightSize) {
+ UploadTask uploadTask, Runnable releaseCapacityCallback) {
return new UploadTask(
uploadTask.changeSets,
result -> {
- inflightSize.add(-preComputedTaskSize);
- uploadTask.successCallback.accept(result);
+ try {
+ releaseCapacityCallback.run();
+ } finally {
+ uploadTask.successCallback.accept(result);
+ }
},
(result, error) -> {
- inflightSize.add(-preComputedTaskSize);
- uploadTask.failureCallback.accept(result, error);
+ try {
+ releaseCapacityCallback.run();
+ } finally {
+ uploadTask.failureCallback.accept(result, error);
+ }
});
}
+
+ @Override
+ public AvailabilityProvider getAvailabilityProvider() {
+ return availabilityHelper;
+ }
Review comment:
Can you include here in a comment the explanation why we think it's fine
to have this unsynchronised, despite that other task threads might actually
make this unavailable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]