pnowojski commented on a change in pull request #17229:
URL: https://github.com/apache/flink/pull/17229#discussion_r738313038
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -303,6 +304,8 @@
@GuardedBy("shouldInterruptOnCancelLock")
private boolean shouldInterruptOnCancel = true;
+ private final AvailabilityProvider changelogWriterAvailabilityProvider;
Review comment:
`@Nullable changelogWriterAvailabilityProvider` would be
performance-wise a safer option.
Even with `@Nullable` there is a high chance of a performance regression, so
before merging I would like to see a couple of benchmark request runs.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -387,6 +390,13 @@ protected StreamTask(
this.stateBackend = createStateBackend();
this.checkpointStorage = createCheckpointStorage(stateBackend);
+ this.changelogWriterAvailabilityProvider =
+ environment.getTaskStateManager().getStateChangelogStorage()
== null
+ ? () -> AvailabilityProvider.AVAILABLE // todo:
benchmark
+ : environment
+ .getTaskStateManager()
+ .getStateChangelogStorage()
+ .getAvailabilityProvider();
Review comment:
I'm not sure if I like that we are hardcoding on this level a concept
like `changelog`. Why not something like
`getTaskStateManager().getStateBackendAvailabilityProvider()`, and basically
that would allow every statebackend to hook in to such mechanism?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -523,11 +534,15 @@ protected void
processInput(MailboxDefaultAction.Controller controller) throws E
if (!recordWriter.isAvailable()) {
timer = new
GaugePeriodTimer(ioMetrics.getBackPressuredTimePerSecond());
resumeFuture = recordWriter.getAvailableFuture();
- } else {
+ } else if (!inputProcessor.isAvailable()) {
timer =
new ThroughputPeriodTimer(
ioMetrics.getIdleTimeMsPerSecond(),
throughputCalculator);
resumeFuture = inputProcessor.getAvailableFuture();
+ } else {
+ // todo: add new metrics (FLINK-23486)
+ timer = new
GaugePeriodTimer(ioMetrics.getBackPressuredTimePerSecond());
+ resumeFuture =
changelogWriterAvailabilityProvider.getAvailableFuture();
Review comment:
Do we want to have:
```
100% = idle + busy + networkBackPressured + changelogBackPressured
```
or
```
100% = idle + busy + backPressured
stateBackend/changelogBackPressured is a subset of backBressured?
```
?
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadThrottle.java
##########
@@ -60,7 +70,22 @@ public void seizeCapacity(long bytes) throws
InterruptedException {
public void releaseCapacity(long bytes) {
synchronized (lock) {
inFlightBytesCounter -= bytes;
+ if (hasCapacity() && !isAvailable()) {
+ availabilityFuture.complete(null);
+ availabilityFuture = AVAILABLE;
+ }
lock.notifyAll();
}
}
+
+ private boolean hasCapacity() {
+ return inFlightBytesCounter < maxBytesInFlight;
+ }
+
+ @Override
+ public CompletableFuture<?> getAvailableFuture() {
+ synchronized (lock) {
+ return availabilityFuture;
+ }
+ }
Review comment:
This will have very significant impact on the performance as this will
be invoked per every record.
Why this class has to be thread safe and why do you need a `lock` here? Why
can not it work like `RecordWriter`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]