This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 93821dd Got rid of the WindmillServiceUseStreamingRpcs option from StreamingD… (#7631) 93821dd is described below commit 93821dd2da461d3cf9238888881e182151dc06ca Author: drieber <drie...@google.com> AuthorDate: Tue Jan 29 18:10:55 2019 -0800 Got rid of the WindmillServiceUseStreamingRpcs option from StreamingD… (#7631) * Got rid of the WindmillServiceUseStreamingRpcs option from StreamingDataflowWorkerOptions. Streaming engine now always uses streaming RPCs. Appliance never uses streaming RPCs. * Initial implementation of FakeWindmillServer getWorkStream and commitWorkStream. * Run ./gradlew spotlessApply * Fixed tests by implementing FakeWindmillServer.getWorkStream and FakeWindmillServer.commitWorkStream. It was necessary to add a call to dispatchThread.interrupt from SDW.stop(), otherwise the CommitWorkStream would be stuck forever in closeAfterDefaultTimeout. --- .../dataflow/worker/StreamingDataflowWorker.java | 13 ++- .../options/StreamingDataflowWorkerOptions.java | 19 ----- .../worker/windmill/WindmillServerStub.java | 1 + .../dataflow/worker/FakeWindmillServer.java | 98 ++++++++++++++++++++-- 4 files changed, 98 insertions(+), 33 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index fb0e8bb..ab31b9b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -398,8 +398,6 @@ public class StreamingDataflowWorker { private final MemoryMonitor memoryMonitor; private final Thread memoryMonitorThread; - private final boolean useStreamingRpcs; - private final WorkerStatusPages statusPages; // Periodic sender of debug information to the debug capture service. private DebugCapture.Manager debugCaptureManager = null; @@ -585,7 +583,7 @@ public class StreamingDataflowWorker { @Override public void run() { LOG.info("Dispatch starting"); - if (useStreamingRpcs) { + if (windmillServiceEnabled) { streamingDispatchLoop(); } else { dispatchLoop(); @@ -601,7 +599,7 @@ public class StreamingDataflowWorker { new Runnable() { @Override public void run() { - if (useStreamingRpcs) { + if (windmillServiceEnabled) { streamingCommitLoop(); } else { commitLoop(); @@ -611,11 +609,10 @@ public class StreamingDataflowWorker { commitThread.setPriority(Thread.MAX_PRIORITY); commitThread.setName("CommitThread"); - this.useStreamingRpcs = options.getWindmillServiceUseStreamingRpcs(); this.publishCounters = publishCounters; this.windmillServer = options.getWindmillServerStub(); this.metricTrackingWindmillServer = - new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, useStreamingRpcs); + new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, windmillServiceEnabled); this.stateFetcher = new StateFetcher(metricTrackingWindmillServer); this.clientId = new Random().nextLong(); @@ -660,7 +657,6 @@ public class StreamingDataflowWorker { } LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled); - LOG.debug("useStreamingRpcs: {}", useStreamingRpcs); LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint()); LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort()); LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); @@ -812,6 +808,7 @@ public class StreamingDataflowWorker { debugCaptureManager.stop(); } running.set(false); + dispatchThread.interrupt(); dispatchThread.join(); // We need to interrupt the commitThread in case it is blocking on pulling // from the commitQueue. @@ -1391,7 +1388,7 @@ public class StreamingDataflowWorker { // Batch commits as long as there are more and we can fit them in the current request. CommitWorkStream commitStream = streamPool.getStream(); int commits = 0; - while (true) { + while (running.get()) { // There may be a commit left over from the previous iteration but if not, pull one. if (commit == null) { try { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java index 20ccc35..a136d7e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java @@ -88,12 +88,6 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt void setPeriodicStatusPageOutputDirectory(String directory); - @Description("If true, will use streaming RPCs with windmill service") - @Default.InstanceFactory(WindmillServiceUseStreamingRpcsFactory.class) - boolean getWindmillServiceUseStreamingRpcs(); - - void setWindmillServiceUseStreamingRpcs(boolean value); - @Description("Streaming requests will be batched into messages up to this limit.") @Default.InstanceFactory(WindmillServiceStreamingRpcBatchLimitFactory.class) int getWindmillServiceStreamingRpcBatchLimit(); @@ -199,19 +193,6 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt } } - /** Factory for setting value of WindmillServiceUseStreamingRpcs based on environment. */ - public static class WindmillServiceUseStreamingRpcsFactory - implements DefaultValueFactory<Boolean> { - @Override - public Boolean create(PipelineOptions options) { - StreamingDataflowWorkerOptions streamingOptions = - options.as(StreamingDataflowWorkerOptions.class); - return streamingEngineEnabled(streamingOptions) - && hasExperiment(streamingOptions, "windmill_service_streaming_rpcs") - && !hasExperiment(streamingOptions, "windmill_service_disable_streaming_rpcs"); - } - } - /** Factory for setting value of WindmillServiceStreamingRpcBatchLimit based on environment. */ public static class WindmillServiceStreamingRpcBatchLimitFactory implements DefaultValueFactory<Integer> { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java index ce5a058..f14451c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java @@ -153,6 +153,7 @@ public abstract class WindmillServerStub implements StatusDataProvider { /** Flushes any pending work items to the wire. */ void flush(); } + /** * Pool of homogeneous streams to Windmill. * diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java index 51327b0..bb46b78 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java @@ -31,11 +31,9 @@ import java.util.ArrayList; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Function; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse; @@ -48,6 +46,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitR import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; import org.apache.beam.vendor.guava.v20_0.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles; +import org.joda.time.Instant; import org.junit.rules.ErrorCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +57,7 @@ class FakeWindmillServer extends WindmillServerStub { private final Queue<Windmill.GetWorkResponse> workToOffer; private final Queue<Function<GetDataRequest, GetDataResponse>> dataToOffer; + // Keys are work tokens. private final Map<Long, WorkItemCommitRequest> commitsReceived; private final ArrayList<Windmill.ReportStatsRequest> statsReceived; private final LinkedBlockingQueue<Windmill.Exception> exceptions; @@ -180,7 +180,55 @@ class FakeWindmillServer extends WindmillServerStub { @Override public GetWorkStream getWorkStream(Windmill.GetWorkRequest request, WorkItemReceiver receiver) { - throw new UnsupportedOperationException(); + LOG.debug("getWorkStream: {}", request.toString()); + Instant startTime = Instant.now(); + final CountDownLatch done = new CountDownLatch(1); + return new GetWorkStream() { + @Override + public void closeAfterDefaultTimeout() { + while (done.getCount() > 0) { + Windmill.GetWorkResponse response = workToOffer.poll(); + if (response == null) { + try { + sleepMillis(500); + } catch (InterruptedException e) { + close(); + Thread.currentThread().interrupt(); + } + continue; + } + for (Windmill.ComputationWorkItems computationWork : response.getWorkList()) { + Instant inputDataWatermark = + WindmillTimeUtils.windmillToHarnessWatermark( + computationWork.getInputDataWatermark()); + for (Windmill.WorkItem workItem : computationWork.getWorkList()) { + receiver.receiveWork( + computationWork.getComputationId(), inputDataWatermark, Instant.now(), workItem); + } + } + } + } + + @Override + public void close() { + done.countDown(); + } + + @Override + public void awaitTermination() throws InterruptedException { + done.await(); + } + + @Override + public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException { + return done.await(time, unit); + } + + @Override + public Instant startTime() { + return startTime; + } + }; } @Override @@ -190,7 +238,45 @@ class FakeWindmillServer extends WindmillServerStub { @Override public CommitWorkStream commitWorkStream() { - throw new UnsupportedOperationException(); + Instant startTime = Instant.now(); + return new CommitWorkStream() { + @Override + public boolean commitWorkItem( + String computation, + WorkItemCommitRequest request, + Consumer<Windmill.CommitStatus> onDone) { + LOG.debug("commitWorkStream::commitWorkItem: {}", request); + errorCollector.checkThat(request.hasWorkToken(), equalTo(true)); + errorCollector.checkThat( + request.getShardingKey(), allOf(greaterThan(0L), lessThan(Long.MAX_VALUE))); + errorCollector.checkThat(request.getCacheToken(), not(equalTo(0L))); + commitsReceived.put(request.getWorkToken(), request); + onDone.accept(Windmill.CommitStatus.OK); + return true; // The request was accepted. + } + + @Override + public void flush() {} + + @Override + public void close() {} + + @Override + public void awaitTermination() {} + + @Override + public boolean awaitTermination(int time, TimeUnit unit) { + return true; + } + + @Override + public void closeAfterDefaultTimeout() {} + + @Override + public Instant startTime() { + return startTime; + } + }; } public void waitForEmptyWorkQueue() {