This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 0840c60e54 For HTTP/1 unary mode, use Content-Length instead of chunk
(#13979)
0840c60e54 is described below
commit 0840c60e54d752ab53bf1517adf178ff260240fb
Author: TomlongTK <[email protected]>
AuthorDate: Sun Apr 7 13:50:02 2024 +0800
For HTTP/1 unary mode, use Content-Length instead of chunk (#13979)
* For HTTP/1 unary mode, use Content-Length instead of chunk
* Fix format issue
* Http1 unary
* Fix unit test
* refine
* refine
* Fix status
---------
Co-authored-by: TomlongTK <[email protected]>
Co-authored-by: Sean Yang <[email protected]>
---
.../http12/AbstractServerHttpChannelObserver.java | 174 ++++++++++++---------
.../http12/h1/Http1ServerChannelObserver.java | 2 -
.../h1/Http1ServerStreamChannelObserver.java | 10 ++
.../http12/h1/Http1ServerUnaryChannelObserver.java | 58 +++++++
.../DefaultHttp11ServerTransportListener.java | 3 +-
5 files changed, 169 insertions(+), 78 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index a4086cc08f..515afb2d50 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -21,33 +21,27 @@ import
org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
-import java.util.List;
-import java.util.Map;
-
public abstract class AbstractServerHttpChannelObserver implements
CustomizableHttpChannelObserver<Object> {
+ private final HttpChannel httpChannel;
+
private HeadersCustomizer headersCustomizer = HeadersCustomizer.NO_OP;
private TrailersCustomizer trailersCustomizer = TrailersCustomizer.NO_OP;
private ErrorResponseCustomizer errorResponseCustomizer =
ErrorResponseCustomizer.NO_OP;
- private final HttpChannel httpChannel;
+ private HttpMessageEncoder responseEncoder;
private boolean headerSent;
- private HttpMessageEncoder responseEncoder;
-
- public AbstractServerHttpChannelObserver(HttpChannel httpChannel) {
+ protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) {
this.httpChannel = httpChannel;
}
- public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
- this.responseEncoder = responseEncoder;
- }
-
- public HttpMessageEncoder getResponseEncoder() {
- return responseEncoder;
+ @Override
+ public HttpChannel getHttpChannel() {
+ return httpChannel;
}
@Override
@@ -65,71 +59,38 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
this.errorResponseCustomizer = errorResponseCustomizer;
}
- protected HeadersCustomizer getHeadersCustomizer() {
- return headersCustomizer;
+ public HttpMessageEncoder getResponseEncoder() {
+ return responseEncoder;
}
- protected TrailersCustomizer getTrailersCustomizer() {
- return trailersCustomizer;
+ public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
+ this.responseEncoder = responseEncoder;
}
@Override
- public void onNext(Object data) {
+ public final void onNext(Object data) {
try {
- if (data instanceof HttpResult) {
- HttpResult<?> result = (HttpResult<?>) data;
- if (!headerSent) {
- doSendHeaders(String.valueOf(result.getStatus()),
result.getHeaders());
- }
- data = result.getBody();
- } else if (!headerSent) {
- doSendHeaders(HttpStatus.OK.getStatusString(), null);
- }
- HttpOutputMessage outputMessage = encodeHttpOutputMessage(data);
- preOutputMessage(outputMessage);
- responseEncoder.encode(outputMessage.getBody(), data);
- getHttpChannel().writeMessage(outputMessage);
- postOutputMessage(outputMessage);
+ doOnNext(data);
} catch (Throwable e) {
onError(e);
}
}
- protected void preOutputMessage(HttpOutputMessage outputMessage) throws
Throwable {}
-
- protected void postOutputMessage(HttpOutputMessage outputMessage) throws
Throwable {}
-
- protected abstract HttpMetadata encodeHttpMetadata();
-
- protected HttpOutputMessage encodeHttpOutputMessage(Object data) {
- return getHttpChannel().newOutputMessage();
- }
-
- protected HttpMetadata encodeTrailers(Throwable throwable) {
- return null;
+ protected void doOnNext(Object data) throws Throwable {
+ if (!headerSent) {
+ sendHeader(buildMetadata(resolveStatusCode(data), data, null));
+ }
+ sendMessage(buildMessage(data));
}
@Override
- public void onError(Throwable throwable) {
+ public final void onError(Throwable throwable) {
if (throwable instanceof HttpResultPayloadException) {
onNext(((HttpResultPayloadException) throwable).getResult());
return;
}
- int httpStatusCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode();
- if (throwable instanceof HttpStatusException) {
- httpStatusCode = ((HttpStatusException) throwable).getStatusCode();
- }
- if (!headerSent) {
- doSendHeaders(String.valueOf(httpStatusCode), null);
- }
try {
- ErrorResponse errorResponse = new ErrorResponse();
- errorResponse.setStatus(String.valueOf(httpStatusCode));
- errorResponse.setMessage(throwable.getMessage());
- errorResponseCustomizer.accept(errorResponse, throwable);
- HttpOutputMessage httpOutputMessage =
encodeHttpOutputMessage(errorResponse);
- responseEncoder.encode(httpOutputMessage.getBody(), errorResponse);
- getHttpChannel().writeMessage(httpOutputMessage);
+ doOnError(throwable);
} catch (Throwable ex) {
throwable = new EncodeException(ex);
} finally {
@@ -137,35 +98,98 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
}
}
+ protected void doOnError(Throwable throwable) throws Throwable {
+ String statusCode = resolveStatusCode(throwable);
+ Object data = buildErrorResponse(statusCode, throwable);
+ if (!headerSent) {
+ sendHeader(buildMetadata(statusCode, data, null));
+ }
+ sendMessage(buildMessage(data));
+ }
+
@Override
- public void onCompleted() {
+ public final void onCompleted() {
doOnCompleted(null);
}
- @Override
- public HttpChannel getHttpChannel() {
- return httpChannel;
+ protected void doOnCompleted(Throwable throwable) {
+ HttpMetadata httpMetadata = encodeTrailers(throwable);
+ if (httpMetadata == null) {
+ return;
+ }
+ trailersCustomizer.accept(httpMetadata.headers(), throwable);
+ getHttpChannel().writeHeader(httpMetadata);
+ }
+
+ protected HttpMetadata encodeTrailers(Throwable throwable) {
+ return null;
}
- private void doSendHeaders(String statusCode, Map<String, List<String>>
additionalHeaders) {
+ protected HttpOutputMessage encodeHttpOutputMessage(Object data) {
+ return getHttpChannel().newOutputMessage();
+ }
+
+ protected abstract HttpMetadata encodeHttpMetadata();
+
+ protected void preOutputMessage(HttpOutputMessage outputMessage) throws
Throwable {}
+
+ protected void postOutputMessage(HttpOutputMessage outputMessage) throws
Throwable {}
+
+ protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage
outputMessage) {}
+
+ protected final String resolveStatusCode(Object data) {
+ return data instanceof HttpResult
+ ? String.valueOf(((HttpResult<?>) data).getStatus())
+ : HttpStatus.OK.getStatusString();
+ }
+
+ protected final String resolveStatusCode(Throwable throwable) {
+ return throwable instanceof HttpStatusException
+ ? String.valueOf(((HttpStatusException)
throwable).getStatusCode())
+ : HttpStatus.INTERNAL_SERVER_ERROR.getStatusString();
+ }
+
+ protected final ErrorResponse buildErrorResponse(String statusCode,
Throwable throwable) {
+ ErrorResponse errorResponse = new ErrorResponse();
+ errorResponse.setStatus(statusCode);
+ errorResponse.setMessage(throwable.getMessage());
+ errorResponseCustomizer.accept(errorResponse, throwable);
+ return errorResponse;
+ }
+
+ protected final HttpOutputMessage buildMessage(Object data) throws
Throwable {
+ if (data instanceof HttpResult) {
+ data = ((HttpResult<?>) data).getBody();
+ }
+ HttpOutputMessage outputMessage = encodeHttpOutputMessage(data);
+ preOutputMessage(outputMessage);
+ responseEncoder.encode(outputMessage.getBody(), data);
+ return outputMessage;
+ }
+
+ protected final void sendMessage(HttpOutputMessage outputMessage) throws
Throwable {
+ getHttpChannel().writeMessage(outputMessage);
+ postOutputMessage(outputMessage);
+ }
+
+ protected final HttpMetadata buildMetadata(String statusCode, Object data,
HttpOutputMessage httpOutputMessage) {
HttpMetadata httpMetadata = encodeHttpMetadata();
HttpHeaders headers = httpMetadata.headers();
headers.set(HttpHeaderNames.STATUS.getName(), statusCode);
headers.set(HttpHeaderNames.CONTENT_TYPE.getName(),
responseEncoder.contentType());
- headersCustomizer.accept(headers);
- if (additionalHeaders != null) {
- headers.putAll(additionalHeaders);
+ if (data instanceof HttpResult) {
+ HttpResult<?> result = (HttpResult<?>) data;
+ if (result.getHeaders() != null) {
+ headers.putAll(result.getHeaders());
+ }
}
- getHttpChannel().writeHeader(httpMetadata);
- headerSent = true;
+ preMetadata(httpMetadata, httpOutputMessage);
+ headersCustomizer.accept(headers);
+ return httpMetadata;
}
- protected void doOnCompleted(Throwable throwable) {
- HttpMetadata httpMetadata = encodeTrailers(throwable);
- if (httpMetadata == null) {
- return;
- }
- trailersCustomizer.accept(httpMetadata.headers(), throwable);
+ protected final void sendHeader(HttpMetadata httpMetadata) {
getHttpChannel().writeHeader(httpMetadata);
+ headerSent = true;
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java
index 6d92b86554..e62fbeb804 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.remoting.http12.h1;
import org.apache.dubbo.remoting.http12.AbstractServerHttpChannelObserver;
import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpChannelObserver;
-import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
@@ -34,7 +33,6 @@ public class Http1ServerChannelObserver extends
AbstractServerHttpChannelObserve
@Override
protected HttpMetadata encodeHttpMetadata() {
HttpHeaders httpHeaders = new HttpHeaders();
- httpHeaders.set(HttpHeaderNames.TRANSFER_ENCODING.getName(),
"chunked");
return new Http1Metadata(httpHeaders);
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
index e5ee7aff44..8c3af9d684 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
@@ -17,6 +17,9 @@
package org.apache.dubbo.remoting.http12.h1;
import org.apache.dubbo.remoting.http12.HttpChannel;
+import org.apache.dubbo.remoting.http12.HttpHeaderNames;
+import org.apache.dubbo.remoting.http12.HttpHeaders;
+import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
import java.io.IOException;
@@ -33,6 +36,13 @@ public class Http1ServerStreamChannelObserver extends
Http1ServerChannelObserver
super(httpChannel);
}
+ @Override
+ protected HttpMetadata encodeHttpMetadata() {
+ HttpHeaders httpHeaders = new HttpHeaders();
+ httpHeaders.set(HttpHeaderNames.TRANSFER_ENCODING.getName(),
"chunked");
+ return new Http1Metadata(httpHeaders);
+ }
+
@Override
protected void preOutputMessage(HttpOutputMessage httpMessage) throws
IOException {
HttpOutputMessage httpOutputMessage =
this.getHttpChannel().newOutputMessage();
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java
new file mode 100644
index 0000000000..8cb2a36edd
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.h1;
+
+import org.apache.dubbo.remoting.http12.HttpChannel;
+import org.apache.dubbo.remoting.http12.HttpHeaderNames;
+import org.apache.dubbo.remoting.http12.HttpMetadata;
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+
+import java.io.OutputStream;
+
+import io.netty.buffer.ByteBufOutputStream;
+
+public class Http1ServerUnaryChannelObserver extends
Http1ServerChannelObserver {
+
+ public Http1ServerUnaryChannelObserver(HttpChannel httpChannel) {
+ super(httpChannel);
+ }
+
+ @Override
+ protected void doOnNext(Object data) throws Throwable {
+ HttpOutputMessage httpOutputMessage = buildMessage(data);
+ sendHeader(buildMetadata(resolveStatusCode(data), data,
httpOutputMessage));
+ sendMessage(httpOutputMessage);
+ }
+
+ @Override
+ protected void doOnError(Throwable throwable) throws Throwable {
+ String statusCode = resolveStatusCode(throwable);
+ Object data = buildErrorResponse(statusCode, throwable);
+ HttpOutputMessage httpOutputMessage = buildMessage(data);
+ sendHeader(buildMetadata(statusCode, data, httpOutputMessage));
+ sendMessage(httpOutputMessage);
+ }
+
+ @Override
+ protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage
outputMessage) {
+ OutputStream body = outputMessage.getBody();
+ if (body instanceof ByteBufOutputStream) {
+ int contentLength = ((ByteBufOutputStream) body).writtenBytes();
+
httpMetadata.headers().set(HttpHeaderNames.CONTENT_LENGTH.getName(),
String.valueOf(contentLength));
+ }
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
index 11f3012c22..950a2d9350 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.remoting.http12.RequestMetadata;
import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h1.Http1ServerStreamChannelObserver;
import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
+import org.apache.dubbo.remoting.http12.h1.Http1ServerUnaryChannelObserver;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
import org.apache.dubbo.remoting.http12.message.MediaType;
import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
@@ -58,7 +59,7 @@ public class DefaultHttp11ServerTransportListener
executorSupport =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel())
.getExecutorSupport(url);
this.httpChannel = httpChannel;
- serverChannelObserver = new Http1ServerChannelObserver(httpChannel);
+ serverChannelObserver = new
Http1ServerUnaryChannelObserver(httpChannel);
serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE);
}