This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f251133  Revert "[BEAM-9651] Prevent StreamPool and stream 
initialization livelock"
     new e8fc585  Merge pull request #11367 from 
scwhittle/revert-11364-contention
f251133 is described below

commit f25113352d619828ae2c28661dcccf205bc0847d
Author: scwhittle <[email protected]>
AuthorDate: Thu Apr 9 15:58:32 2020 -0700

    Revert "[BEAM-9651] Prevent StreamPool and stream initialization livelock"
---
 .../worker/windmill/DirectStreamObserver.java      |  15 +--
 .../windmill/ForwardingClientResponseObserver.java |   4 +-
 .../worker/windmill/WindmillServerStub.java        | 108 ++++++---------------
 .../worker/windmill/GrpcWindmillServerTest.java    |  89 ++++-------------
 4 files changed, 55 insertions(+), 161 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
index 1be3c95..7565ba2 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
@@ -18,8 +18,6 @@
 package org.apache.beam.runners.dataflow.worker.windmill;
 
 import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.CallStreamObserver;
 import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
@@ -45,20 +43,13 @@ public final class DirectStreamObserver<T> implements 
StreamObserver<T> {
 
   @Override
   public void onNext(T value) {
-    int phase = phaser.getPhase(); // A negative phase indicates it has been 
terminated.
-    // The registered onReady may be blocked, so we periodically poll the 
observer directly.
-    // Additionally to avoid becoming permanently stuck due to synchronization 
we fallback
-    // to queuing in the outbound observer after 1 minute, see BEAM-9651 for 
more context.
-    for (int waitLoops = 0;
-        phase >= 0 && !outboundObserver.isReady() && waitLoops < 600;
-        ++waitLoops) {
+    int phase = phaser.getPhase();
+    if (!outboundObserver.isReady()) {
       try {
-        phase = phaser.awaitAdvanceInterruptibly(phase, 100, 
TimeUnit.MILLISECONDS);
+        phaser.awaitAdvanceInterruptibly(phase);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RuntimeException(e);
-      } catch (TimeoutException e) {
-        // Polling isReady in case the callback is delayed
       }
     }
     synchronized (outboundObserver) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
index c4e47d3..d7eba1f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
@@ -60,7 +60,7 @@ final class ForwardingClientResponseObserver<ReqT, RespT>
   }
 
   @Override
-  public void beforeStart(ClientCallStreamObserver<RespT> requestStream) {
-    requestStream.setOnReadyHandler(onReadyHandler);
+  public void beforeStart(ClientCallStreamObserver<RespT> stream) {
+    stream.setOnReadyHandler(onReadyHandler);
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index e390ba4..31c5114 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -20,22 +20,20 @@ package org.apache.beam.runners.dataflow.worker.windmill;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -156,113 +154,65 @@ public abstract class WindmillServerStub implements 
StatusDataProvider {
   public static class StreamPool<S extends WindmillStream> {
 
     private final Duration streamTimeout;
-    private final Supplier<S> supplier;
 
     private final class StreamData {
-      final Supplier<S> lazyStream = Suppliers.memoize(supplier);
-      Instant streamCreated = Instant.now();
-      AtomicInteger holds = new AtomicInteger(1);
+      final S stream = supplier.get();
+      int holds = 1;
     };
 
     private final List<StreamData> streams;
-    private final ConcurrentHashMap<S, StreamData> holds;
+    private final Supplier<S> supplier;
+    private final HashMap<S, StreamData> holds;
 
     public StreamPool(int numStreams, Duration streamTimeout, Supplier<S> 
supplier) {
-      this.streamTimeout = streamTimeout;
-      this.supplier = supplier;
       this.streams = new ArrayList<>(numStreams);
       for (int i = 0; i < numStreams; i++) {
         streams.add(null);
       }
-      this.holds = new ConcurrentHashMap<>();
+      this.streamTimeout = streamTimeout;
+      this.supplier = supplier;
+      this.holds = new HashMap<>();
     }
 
     // Returns a stream for use that may be cached from a previous call.  Each 
call of getStream
     // must be matched with a call of releaseStream.
     public S getStream() {
       int index = ThreadLocalRandom.current().nextInt(streams.size());
-      Instant timeoutThreshold = Instant.now().minus(streamTimeout);
-      StreamData streamData = null;
-      StreamData closeStream = null;
+      S result;
+      S closeStream = null;
       synchronized (this) {
-        streamData = streams.get(index);
-        if (streamData != null) {
-          if (streamData.streamCreated.isBefore(timeoutThreshold)) {
-            if (streamData.holds.decrementAndGet() <= 0) {
-              closeStream = streamData;
-            }
-            streamData = null; // Fall through to create a new stream
+        StreamData streamData = streams.get(index);
+        if (streamData == null
+            || 
streamData.stream.startTime().isBefore(Instant.now().minus(streamTimeout))) {
+          if (streamData != null && --streamData.holds == 0) {
+            holds.remove(streamData.stream);
+            closeStream = streamData.stream;
           }
-        }
-        if (streamData == null) {
           streamData = new StreamData();
           streams.set(index, streamData);
+          holds.put(streamData.stream, streamData);
         }
-        // The hold is decremented by releaseStream.
-        streamData.holds.incrementAndGet();
+        streamData.holds++;
+        result = streamData.stream;
       }
-      // Close the previous stream if it was retired and there were no other 
holds.
       if (closeStream != null) {
-        assert (closeStream.holds.intValue() == 0);
-        S stream = closeStream.lazyStream.get();
-        StreamData removed = holds.remove(stream);
-        assert (removed == closeStream);
-        stream.close();
+        closeStream.close();
       }
-      // Initialize the stream outside the synchronized section so that slow 
initialization does
-      // not block other streams.
-      S stream = streamData.lazyStream.get();
-      holds.put(stream, streamData);
-      return stream;
+      return result;
     }
 
-    // Releases a stream that was obtained with getStream. If the stream was 
retired and this was
-    // the final hold it is closed.
+    // Releases a stream that was obtained with getStream.
     public void releaseStream(S stream) {
-      StreamData streamData = holds.get(stream);
-      if (streamData.holds.decrementAndGet() <= 0) {
-        StreamData removed = holds.remove(stream);
-        assert (removed == streamData);
-        stream.close();
-      }
-    }
-
-    // Closes and awaits termination for all streams that do not have an 
active external hold,
-    // returning true if all streams were closed.
-    public boolean closeIdle(int duration, TimeUnit unit) throws 
InterruptedException {
-      boolean removedAll = true;
-      ArrayList<StreamData> streamsCopy = null;
+      boolean closeStream = false;
       synchronized (this) {
-        streamsCopy = new ArrayList<>(streams.size());
-        for (int i = 0; i < streams.size(); ++i) {
-          StreamData streamData = streams.get(i);
-          streams.set(i, null);
-          streamsCopy.add(streamData);
+        if (--holds.get(stream).holds == 0) {
+          closeStream = true;
+          holds.remove(stream);
         }
       }
-      for (int i = 0; i < streamsCopy.size(); ++i) {
-        StreamData streamData = streamsCopy.get(i);
-        if (streamData == null) {
-          continue;
-        }
-
-        if (streamData.holds.decrementAndGet() <= 0) {
-          S stream = streamData.lazyStream.get();
-          StreamData removed = holds.remove(stream);
-          assert (removed == streamData);
-          stream.close();
-        } else {
-          removedAll = false;
-          streamsCopy.set(i, null);
-        }
-      }
-      for (StreamData streamData : streamsCopy) {
-        if (streamData == null) {
-          continue;
-        }
-        streamData.lazyStream.get().awaitTermination(duration, unit);
+      if (closeStream) {
+        stream.close();
       }
-      return removedAll;
     }
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
index 6d5574a..b889138 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
@@ -60,7 +59,6 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitR
 import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.CommitWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.GetDataStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.GetWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.StreamPool;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Server;
 import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
@@ -69,7 +67,6 @@ import 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.inprocess.InProcessServerBuil
 import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
 import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.util.MutableHandlerRegistry;
 import org.hamcrest.Matchers;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.Before;
@@ -271,11 +268,14 @@ public class GrpcWindmillServerTest {
               assertEquals(workItem.getKey(), 
ByteString.copyFromUtf8("somewhat_long_key"));
             });
     assertTrue(latch.await(30, TimeUnit.SECONDS));
+
     stream.close();
     assertTrue(stream.awaitTermination(30, TimeUnit.SECONDS));
   }
 
-  private void addGetDataService() {
+  @Test
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public void testStreamingGetData() throws Exception {
     // This server responds to GetDataRequests with responses that mirror the 
requests.
     serviceRegistry.addService(
         new CloudWindmillServiceV1Alpha1ImplBase() {
@@ -406,83 +406,36 @@ public class GrpcWindmillServerTest {
             };
           }
         });
-  }
 
-  @Test
-  public void testStreamingGetData() throws Exception {
-    addGetDataService();
     GetDataStream stream = client.getDataStream();
+
     // Make requests of varying sizes to test chunking, and verify the 
responses.
     ExecutorService executor = Executors.newFixedThreadPool(50);
-    List<Future> futures = new ArrayList<>(200);
-
+    final CountDownLatch done = new CountDownLatch(200);
     for (int i = 0; i < 100; ++i) {
       final String key = "key" + i;
       final String s = i % 5 == 0 ? largeString(i) : "tag";
-      futures.add(
-          executor.submit(
-              () -> {
-                errorCollector.checkThat(
-                    stream.requestKeyedData("computation", 
makeGetDataRequest(key, s)),
-                    Matchers.equalTo(makeGetDataResponse(key, s)));
-              }));
-      futures.add(
-          executor.submit(
-              () -> {
-                errorCollector.checkThat(
-                    stream.requestGlobalData(makeGlobalDataRequest(key)),
-                    Matchers.equalTo(makeGlobalDataResponse(key)));
-              }));
-      Thread.sleep((i * 17) % 50);
-    }
-    for (Future f : futures) {
-      f.get();
+      executor.submit(
+          () -> {
+            errorCollector.checkThat(
+                stream.requestKeyedData("computation", makeGetDataRequest(key, 
s)),
+                Matchers.equalTo(makeGetDataResponse(key, s)));
+            done.countDown();
+          });
+      executor.execute(
+          () -> {
+            errorCollector.checkThat(
+                stream.requestGlobalData(makeGlobalDataRequest(key)),
+                Matchers.equalTo(makeGlobalDataResponse(key)));
+            done.countDown();
+          });
     }
+    done.await();
     stream.close();
     assertTrue(stream.awaitTermination(60, TimeUnit.SECONDS));
     executor.shutdown();
   }
 
-  @Test
-  public void testStreamingGetDataWithPool() throws Exception {
-    addGetDataService();
-
-    final StreamPool<GetDataStream> streamPool =
-        new StreamPool<GetDataStream>(4, Duration.standardSeconds(1), () -> 
client.getDataStream());
-
-    // Make requests of varying sizes to test chunking, and verify the 
responses.
-    ExecutorService executor = Executors.newFixedThreadPool(50);
-    List<Future> futures = new ArrayList<>(200);
-    for (int i = 0; i < 100; ++i) {
-      final String key = "key" + i;
-      final String s = i % 5 == 0 ? largeString(i) : "tag";
-      futures.add(
-          executor.submit(
-              () -> {
-                GetDataStream stream = streamPool.getStream();
-                errorCollector.checkThat(
-                    stream.requestKeyedData("computation", 
makeGetDataRequest(key, s)),
-                    Matchers.equalTo(makeGetDataResponse(key, s)));
-                streamPool.releaseStream(stream);
-              }));
-      futures.add(
-          executor.submit(
-              () -> {
-                GetDataStream stream = streamPool.getStream();
-                errorCollector.checkThat(
-                    stream.requestGlobalData(makeGlobalDataRequest(key)),
-                    Matchers.equalTo(makeGlobalDataResponse(key)));
-                streamPool.releaseStream(stream);
-              }));
-      Thread.sleep((i * 17) % 50);
-    }
-    for (Future f : futures) {
-      f.get();
-    }
-    assertTrue(streamPool.closeIdle(60, TimeUnit.SECONDS));
-    executor.shutdown();
-  }
-
   private String largeString(int length) {
     return String.join("", Collections.nCopies(length, "."));
   }

Reply via email to