tejasiyer-dev commented on code in PR #38609:
URL: https://github.com/apache/beam/pull/38609#discussion_r3291639713


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java:
##########
@@ -0,0 +1,689 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps a dofn and converts it from one which process elements 
synchronously to one
+ * which processes them asynchronously.
+ *
+ * <p>For synchronous dofns the default settings mean that many (100s) of 
elements will be processed
+ * in parallel and that processing an element will block all other work on 
that key. In addition
+ * runners are optimized for latencies less than a few seconds and longer 
operations can result in
+ * high retry rates. Async should be considered when the default parallelism 
is not correct and/or
+ * items are expected to take longer than a few seconds to process.
+ */
+public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, 
OutputT> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class);
+
+  private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10;
+  private static final int DEFAULT_TIMEOUT_SEC = 1;
+  private static final int DEFAULT_MAX_WAIT_TIME_MS = 500;
+  private static final int TEARDOWN_AWAIT_SEC = 5;
+  private static final int INITIAL_BACKOFF_SLEEP_MS = 10;
+  private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000;
+
+  @StateId("to_process")
+  private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec;
+
+  @TimerId("timer")
+  private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+  private final DoFn<InputT, OutputT> syncFn;
+  private final int parallelism;
+  private final Duration timerFrequency;
+  private final int maxItemsToBuffer;
+  private final Duration timeout;
+  private final Duration maxWaitTime;
+  private final SerializableFunction<InputT, Object> idFn;
+  private final boolean useThreadPool;
+  private final String uuid;
+
+  private transient @Nullable PipelineOptions pipelineOptions;
+
+  // Shared JVM-Wide States (Static Registries)
+  // Map-backed registry holding shared resources across serialized worker 
instances. Since runners
+  // clone DoFn instances on the same worker node, static maps ensure safe 
JVM-wide resource reuse.
+  private static final ConcurrentHashMap<String, ExecutorService> pool = new 
ConcurrentHashMap<>();
+  // activeElements (processingElements) is global JVM memory (all keys)
+  private static final ConcurrentHashMap<
+          String, ConcurrentHashMap<Object, InFlightElement<?, ?, ?>>>
+      processingElements = new ConcurrentHashMap<>();
+  private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer =
+      new ConcurrentHashMap<>();
+
+  private static final ReentrantLock lock = new ReentrantLock();

Review Comment:
   @AMOOOMA 
   
   I kept a single static class lock because it matches the exact 
synchronization model used by Python's AsyncWrapper (_lock = RLock()). 
   If you prefer, we can change this to a per-transform lock or leverage 
ConcurrentHashMap to reduce global thread contention, but I wanted to keep it 
aligned with the Python SDK first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to