gortiz commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3242303499


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/streaming/StreamingDispatchObserver.java:
##########
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.streaming;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.routing.QueryServerInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Broker-side gRPC client {@code StreamObserver} for the {@code 
SubmitWithStream} bidi RPC. Routes inbound
+ * {@link Worker.ServerToBroker} messages to either the {@code submit_ack} 
callback (first message) or the
+ * {@link StreamingQuerySession} (subsequent {@code OpChainComplete}s), and 
adapts the inbound side of the stream
+ * (the {@link StreamObserver} we use to send {@code BrokerToServer} messages) 
into a
+ * {@link StreamingServerHandle} so the session can fan out cancel.
+ *
+ * <p>Lifecycle — created when the broker opens a {@code SubmitWithStream} 
call to one server, then registered with
+ * the session via {@link StreamingQuerySession#registerStream}. Receives:
+ * <ol>
+ *   <li>Exactly one {@code submit_ack} (always the first server→broker 
message).</li>
+ *   <li>Zero or more {@code OpChainComplete} messages (one per opchain that 
ran on this server).</li>
+ *   <li>Exactly one {@code ServerDone} after the last opchain has 
reported.</li>
+ * </ol>
+ *
+ * <p>On {@code onError} or unexpected message order, drains the latch by 
{@code remainingExpected} via the session
+ * so {@link StreamingQuerySession#awaitCompletion} can still finalize.
+ */
+public class StreamingDispatchObserver
+    implements StreamObserver<Worker.ServerToBroker>, StreamingServerHandle {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamingDispatchObserver.class);
+
+  private final QueryServerInstance _server;
+  private final StreamingQuerySession _session;
+  /// Receives the submit-ack response or a failure throwable. Called exactly 
once per call (either with response on
+  /// the first ServerToBroker.submit_ack, or with an error if the stream 
breaks before the ack arrives).
+  private final BiConsumer<Worker.QueryResponse, Throwable> _ackCallback;
+  private final int _expectedOpChainsForThisServer;
+  private final AtomicBoolean _ackReceived = new AtomicBoolean(false);
+
+  /**
+   * Counts how many opchains we've already drained from the session for this 
server, so an onError doesn't
+   * double-drain after some opchains already reported successfully.
+   *
+   * <p>Accessed only from gRPC inbound callbacks ({@code onNext}, {@code 
onError}, {@code onCompleted}), which gRPC
+   * serializes per stream — no additional synchronization is needed.
+   */
+  private int _opChainsReportedForThisServer = 0;
+
+  /**
+   * The inbound side of the bidi stream — used to send {@code submit} 
(initial) and {@code cancel} (fan-out) from
+   * the broker. Set once via {@link #attachOutboundStream} after the gRPC 
stub is asked to start the stream; lives
+   * for the duration of the call.
+   */
+  private volatile StreamObserver<Worker.BrokerToServer> _outbound;
+
+  public StreamingDispatchObserver(QueryServerInstance server, 
StreamingQuerySession session,
+      int expectedOpChainsForThisServer, BiConsumer<Worker.QueryResponse, 
Throwable> ackCallback) {
+    _server = server;
+    _session = session;
+    _expectedOpChainsForThisServer = expectedOpChainsForThisServer;
+    _ackCallback = ackCallback;
+  }
+
+  public QueryServerInstance getServer() {
+    return _server;
+  }
+
+  /** Wires the outbound side of the bidi stream once the gRPC stub returns 
it. */
+  public void attachOutboundStream(StreamObserver<Worker.BrokerToServer> 
outbound) {
+    _outbound = outbound;
+  }
+
+  /** Sends the initial {@code BrokerToServer.submit} message on the outbound 
side. */
+  public void sendSubmit(Worker.QueryRequest request) {
+    StreamObserver<Worker.BrokerToServer> outbound = _outbound;
+    if (outbound == null) {
+      throw new IllegalStateException("attachOutboundStream must be called 
before sendSubmit");
+    }
+    
outbound.onNext(Worker.BrokerToServer.newBuilder().setSubmit(request).build());
+  }
+
+  @Override
+  public void onNext(Worker.ServerToBroker message) {
+    switch (message.getPayloadCase()) {
+      case SUBMIT_ACK:
+        if (_ackReceived.compareAndSet(false, true)) {
+          _ackCallback.accept(message.getSubmitAck(), null);
+        } else {
+          LOGGER.warn("Ignoring duplicate submit_ack from {}", _server);
+        }
+        break;
+      case OPCHAIN:
+        _session.recordOpChainComplete(message.getOpchain());

Review Comment:
   > Aside: _opChainsReportedForThisServer is a plain int. The class Javadoc 
correctly notes gRPC serializes inbound callbacks per stream, but cancel() on 
this same instance is called from a different I/O thread (via fanOutCancel's 
iteration of _openStreams). Today that's safe because cancel() only touches 
_outbound (which is volatile), but the threading boundary is subtle — a short 
note on what runs on which thread would help future maintainers.
   
   I'll improve the javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to