arunpandianp commented on code in PR #38073:
URL: https://github.com/apache/beam/pull/38073#discussion_r3042333590
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -118,36 +118,43 @@ static StreamingCommitFinalizer
create(BoundedQueueExecutor workExecutor) {
* has been successfully committed to the backing state store.
*/
public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>>
callbacks) {
+ List<FinalizationInfo> finalizeInfos = new ArrayList<>();
for (Map.Entry<Long, Pair<Instant, Runnable>> entry :
callbacks.entrySet()) {
- Long finalizeId = entry.getKey();
- final FinalizationInfo info =
+ finalizeInfos.add(
FinalizationInfo.create(
- finalizeId, entry.getValue().getLeft(),
entry.getValue().getRight());
-
+ entry.getKey(), entry.getValue().getLeft(),
entry.getValue().getRight()));
+ }
+ if (!finalizeInfos.isEmpty()) {
+ boolean shouldStartCleanupThread = false;
lock.lock();
try {
- FinalizationInfo existingInfo =
commitFinalizationCallbacks.put(finalizeId, info);
- if (existingInfo != null) {
- throw new IllegalStateException(
- "Expected to not have any past callbacks for bundle "
- + finalizeId
- + " but had "
- + existingInfo);
+ for (FinalizationInfo info : finalizeInfos) {
+ FinalizationInfo existingInfo =
commitFinalizationCallbacks.put(info.getId(), info);
+ if (existingInfo != null) {
+ throw new IllegalStateException(
+ "Expected to not have any past callbacks for bundle "
+ + info.getId()
+ + " but had "
+ + existingInfo);
+ }
+ cleanUpQueue.add(info);
+ @SuppressWarnings("ReferenceEquality")
+ boolean newMin = cleanUpQueue.peek() == info;
+ if (newMin) {
+ queueMinChanged.signal();
+ }
}
if (!cleanUpThreadStarted) {
// Start the cleanup thread lazily for pipelines that don't use
finalization callbacks
- // and some tests.
+ // and some tests. Run the thread without the lock held.
cleanUpThreadStarted = true;
- finalizationExecutor.execute(this::cleanupThreadBody, 0);
- }
- cleanUpQueue.add(info);
- @SuppressWarnings("ReferenceEquality")
- boolean newMin = cleanUpQueue.peek() == info;
- if (newMin) {
- queueMinChanged.signal();
+ shouldStartCleanupThread = true;
}
} finally {
lock.unlock();
+ if (shouldStartCleanupThread) {
+ finalizationExecutor.forceExecute(this::cleanupThreadBody, 0);
Review Comment:
cleanupThreadBody won't ever terminate unless the worker is shutdown right?
If so, running cleanupThreadBody in finalizationExecutor will take away a
harness thread. Worker will deadlock when run with harnessThreads set to 1. Can
we create a separate cleanupThread/threadpool?
We could use a ScheduledThreadPoolExecutor + the schedule methods and remove
the custom expiry logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]