curcur edited a comment on pull request #17229: URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854
Hey folks, I’ve spent some time reading through the `FsStateChangelogWriter` and related `DFS` uploading implementation. Now I have a bit better understanding but with more questions (We can discuss offline if necessary). Before posting my questions here, let me briefly describe my understanding (This is needed because it relates to my question). Please correct me if I misunderstand 1. `StateChangeUploader` is per TM, shared amongst all tasks belonging to the same job (same TM) 2. For each `ChangelogKeyedStateBackend`, it has a separate writer (StateChangelogWriter), maintaining `activeChangeSet`(in memory), to which the writer writes to 3. persist (`persistInternal`) happens in two cases: 1). enough data in memory (`preEmptivePersistThresholdInBytes`); 2). checkpointing 4. What persist does is to put an UploadTask to `StateChangeUploader` 's upload queue, waiting to schedule to upload 5. The above all occur in the task thread 6. Then scheduling thread (`SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG`)) schedule task uploading (take UploadTasks away from StateChangeUploader 's upload queue 7. IO thread (`retryingExecutor.execute(retryPolicy, () -> delegate.upload(tasks))`) execute uploading tasks ===== **Questions (about availability provider)** 1. The UploadThrottle (availability provider) is shared amongst all tasks belonging to the same job; each time to check the availability, we have to grab the lock, whether it has a performance issue. From what I can see, the current `RecordWriter` and `InputProcessor` is per task and without lock; whether we should consider implementing changelog availablity provider per task as well? 2. for check `changelogWriterAvailabilityProvider.isApproximatelyAvailable()`, when checking `getAvailableFuture() == AVAILABLE`, it is outside the lock, so when doing the check, the available future can be complete? 3. Even if the available future is available, it means some task is able to proceed updating states, not necessary all tasks (other tasks might still be blocked on waiting) **Questions (about the locks)** I found there are three locks in three different classes (not sure whether there are more). `UploadThrottle` `BatchingStateChangeUploader` `FsStateChangelogWriter` But from the description above (the paragraph of "Please correct me if I misunderstand"), there is only one critical piece, that is "StateChangeUploader 's upload queue" 1. Several writers need to put uploadtasks into the queue 2. Scheduler need to schedule the tasks from the queue 3. AvaiablityProvider needs to check availability of the queue So, from what I can see, these three locks can be merged into one? Is there any reason to separate them? As discussed, more locks are easier to cause deadlock and make the code more difficult to reason about. **Questions (about the backpressure metric)** They are not mutually exclusive: but we can postpone this discussion when designing metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org