arunpandianp commented on code in PR #38073:
URL: https://github.com/apache/beam/pull/38073#discussion_r3048367151
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java:
##########
@@ -60,20 +63,28 @@ public void setUp() {
.setDaemon(true)
.build(),
/*useFairMonitor=*/ false);
- finalizer = StreamingCommitFinalizer.create(executor);
+
+ cleanupExecutor =
+ Executors.newScheduledThreadPool(
+ 10,
Review Comment:
`1`
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -842,6 +854,13 @@ static StreamingDataflowWorker forTesting(
WindmillStubFactoryFactory stubFactory) {
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
+ ScheduledExecutorService commitFinalizerCleanupExecutor =
+ Executors.newScheduledThreadPool(
+ 10,
Review Comment:
`1`
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -616,6 +620,13 @@ public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions o
StreamingCounters streamingCounters = StreamingCounters.create();
WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options,
LOG);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
+ ScheduledExecutorService commitFinalizerCleanupExecutor =
+ Executors.newScheduledThreadPool(
+ 10,
Review Comment:
since the callbacks are quick, 1 thread here should be enough?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -119,13 +89,32 @@ static StreamingCommitFinalizer
create(BoundedQueueExecutor workExecutor) {
*/
public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>>
callbacks) {
List<FinalizationInfo> finalizeInfos = new ArrayList<>();
+ Instant now = Instant.now();
for (Map.Entry<Long, Pair<Instant, Runnable>> entry :
callbacks.entrySet()) {
- finalizeInfos.add(
- FinalizationInfo.create(
- entry.getKey(), entry.getValue().getLeft(),
entry.getValue().getRight()));
+ Instant cleanupTime = entry.getValue().getLeft();
+ // Ignore finalizers that have already expired.
+ if (cleanupTime.isAfter(now)) {
+ ScheduledFuture<?> cleanupFuture =
+ cleanupExecutor.schedule(
+ () -> {
+ lock.lock();
+ try {
+ commitFinalizationCallbacks.remove(entry.getKey());
Review Comment:
There is a race here and this can execute before
`commitFinalizationCallbacks.put` below.
We can move cleanupExecutor.schedule under `lock.lock` to fix the race.
since cleanupExecutor is unbounded, it is safe to call schedule under lock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]