m-trieu commented on code in PR #28016:
URL: https://github.com/apache/beam/pull/28016#discussion_r1302302025
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java:
##########
@@ -480,30 +428,26 @@ public CommitWorkStream commitWorkStream() {
@Override
public GetConfigResponse getConfig(GetConfigRequest request) {
- if (syncApplianceStub == null) {
- throw new RpcException(
- new UnsupportedOperationException("GetConfig not supported with
windmill service."));
- } else {
- return callWithBackoff(
- () ->
- syncApplianceStub
- .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
- .getConfig(request));
+ if (syncApplianceStub != null) {
+ return callWithBackoff(() -> syncApplianceStub.getConfig(request));
}
+
+ throw new RpcException(
+ new UnsupportedOperationException("GetConfig not supported with
windmill service."));
}
@Override
public ReportStatsResponse reportStats(ReportStatsRequest request) {
- if (syncApplianceStub == null) {
- throw new RpcException(
- new UnsupportedOperationException("ReportStats not supported with
windmill service."));
- } else {
+ if (syncApplianceStub != null) {
return callWithBackoff(
() ->
syncApplianceStub
.withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
Review Comment:
done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java:
##########
@@ -242,17 +239,17 @@ private synchronized void initializeLocalHost(int port) {
Channel channel = localhostChannel(port);
if (options.isEnableStreamingEngine()) {
this.stubList.add(CloudWindmillServiceV1Alpha1Grpc.newStub(channel));
-
this.syncStubList.add(CloudWindmillServiceV1Alpha1Grpc.newBlockingStub(channel));
} else {
- this.syncApplianceStub = WindmillApplianceGrpc.newBlockingStub(channel);
+ this.syncApplianceStub =
+ WindmillApplianceGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS);
Review Comment:
done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java:
##########
@@ -360,69 +343,34 @@ private <ResponseT> ResponseT
callWithBackoff(Supplier<ResponseT> function) {
@Override
public GetWorkResponse getWork(GetWorkRequest request) {
- if (syncApplianceStub == null) {
- return callWithBackoff(
- () ->
- syncStub()
- .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
- .getWork(
- request
- .toBuilder()
- .setJobId(options.getJobId())
- .setProjectId(options.getProject())
- .setWorkerId(options.getWorkerId())
- .build()));
- } else {
- return callWithBackoff(
- () ->
- syncApplianceStub
- .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
- .getWork(request));
+ if (syncApplianceStub != null) {
+ return callWithBackoff(() -> syncApplianceStub.getWork(request));
}
+
+ throw new RpcException(
+ new UnsupportedOperationException(
+ "Unary GetWork calls are not supported with Windmill service."));
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]