This is an automated email from the ASF dual-hosted git repository.
adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
The following commit(s) were added to refs/heads/master by this push:
new daf95a8 Vendors in AbstractUnsafeUnaryGrpcService temporarily to
remove dependency on armeria release. (#2593)
daf95a8 is described below
commit daf95a8962ac7e7a6cbcc9c073a80eade2f71ef4
Author: Anuraag Agrawal <[email protected]>
AuthorDate: Wed May 15 20:42:46 2019 +0900
Vendors in AbstractUnsafeUnaryGrpcService temporarily to remove dependency
on armeria release. (#2593)
---
.../internal/AbstractUnsafeUnaryGrpcService.java | 129 +++++++++++++++++++++
.../server/internal/ZipkinGrpcCollector.java | 30 +++--
2 files changed, 148 insertions(+), 11 deletions(-)
diff --git
a/zipkin-server/src/main/java/zipkin2/server/internal/AbstractUnsafeUnaryGrpcService.java
b/zipkin-server/src/main/java/zipkin2/server/internal/AbstractUnsafeUnaryGrpcService.java
new file mode 100644
index 0000000..051ee7e
--- /dev/null
+++
b/zipkin-server/src/main/java/zipkin2/server/internal/AbstractUnsafeUnaryGrpcService.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright 2019 LINE Corporation
+ *
+ * LINE Corporation licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package zipkin2.server.internal;
+
+import com.linecorp.armeria.common.HttpData;
+import com.linecorp.armeria.common.HttpHeaderNames;
+import com.linecorp.armeria.common.HttpHeadersBuilder;
+import com.linecorp.armeria.common.HttpRequest;
+import com.linecorp.armeria.common.HttpResponse;
+import com.linecorp.armeria.common.HttpStatus;
+import com.linecorp.armeria.common.ResponseHeaders;
+import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageDeframer;
+import
com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageDeframer.ByteBufOrStream;
+import
com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageDeframer.Listener;
+import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageFramer;
+import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException;
+import com.linecorp.armeria.common.grpc.protocol.GrpcHeaderNames;
+import com.linecorp.armeria.common.grpc.protocol.GrpcTrailersUtil;
+import com.linecorp.armeria.server.AbstractHttpService;
+import com.linecorp.armeria.server.ServiceRequestContext;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Temporarily copied from Armeria to allow using before it's released.
+ */
+abstract class AbstractUnsafeUnaryGrpcService extends AbstractHttpService {
+
+ private static final ResponseHeaders RESPONSE_HEADERS =
+ ResponseHeaders.of(HttpStatus.OK,
+ HttpHeaderNames.CONTENT_TYPE, "application/grpc+proto",
+ GrpcHeaderNames.GRPC_ENCODING, "identity");
+
+ /**
+ * Returns an unframed response message to return to the client, given an
unframed request message. It is
+ * expected that the implementation has the logic to know how to parse the
request and serialize a response
+ * into {@link ByteBuf}. The returned {@link ByteBuf} will be framed and
returned to the client.
+ */
+ protected abstract CompletableFuture<ByteBuf> handleMessage(ByteBuf message);
+
+ @Override
+ protected final HttpResponse doPost(ServiceRequestContext ctx, HttpRequest
req) {
+ final CompletableFuture<HttpResponse> responseFuture =
+ req.aggregateWithPooledObjects(ctx.contextAwareEventLoop(), ctx.alloc())
+ .thenCompose(msg -> deframeMessage(msg.content(), ctx.alloc()))
+ .thenCompose(this::handleMessage)
+ .thenApply(responseMessage -> {
+ final ArmeriaMessageFramer framer = new ArmeriaMessageFramer(
+ ctx.alloc(), Integer.MAX_VALUE);
+ final HttpData framed = framer.writePayload(responseMessage);
+ return HttpResponse.of(
+ RESPONSE_HEADERS,
+ framed,
+ GrpcTrailersUtil.statusToTrailers(/* OK */ 0, null, true).build());
+ })
+ .exceptionally(t -> {
+ final HttpHeadersBuilder trailers;
+ if (t instanceof ArmeriaStatusException) {
+ ArmeriaStatusException statusException = (ArmeriaStatusException)
t;
+ trailers = GrpcTrailersUtil.statusToTrailers(
+ statusException.getCode(), statusException.getMessage(), false);
+ } else {
+ trailers = GrpcTrailersUtil.statusToTrailers(
+ /* INTERNAL */ 13, t.getMessage(), false);
+ }
+ return HttpResponse.of(trailers.build());
+ });
+
+ return HttpResponse.from(responseFuture);
+ }
+
+ private CompletableFuture<ByteBuf> deframeMessage(HttpData framed,
ByteBufAllocator alloc) {
+ final CompletableFuture<ByteBuf> deframed = new CompletableFuture<>();
+ try (ArmeriaMessageDeframer deframer = new ArmeriaMessageDeframer(
+ new Listener() {
+ @Override
+ public void messageRead(ByteBufOrStream message) {
+ // Compression not supported.
+ assert message.buf() != null;
+ deframed.complete(message.buf());
+ }
+
+ @Override
+ public void endOfStream() {
+ if (!deframed.isDone()) {
+ deframed.complete(Unpooled.EMPTY_BUFFER);
+ }
+ }
+ },
+ Integer.MAX_VALUE,
+ alloc)) {
+ deframer.request(1);
+ deframer.deframe(framed, true);
+ }
+ return deframed;
+ }
+}
diff --git
a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
index 9ae9a94..a0918ce 100644
---
a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
+++
b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
@@ -16,12 +16,15 @@
*/
package zipkin2.server.internal;
-import com.linecorp.armeria.common.grpc.protocol.AbstractUnaryGrpcService;
import com.linecorp.armeria.spring.ArmeriaServerConfigurator;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import zipkin2.Callback;
+import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorMetrics;
@@ -31,7 +34,6 @@ import zipkin2.storage.StorageComponent;
/** Collector for receiving spans on a gRPC endpoint. */
@ConditionalOnProperty(name = "zipkin.collector.grpc.enabled") // disabled by
default
final class ZipkinGrpcCollector {
- static final byte[] EMPTY = new byte[0];
@Bean ArmeriaServerConfigurator grpcCollectorConfigurator(StorageComponent
storage,
CollectorSampler sampler, CollectorMetrics metrics) {
@@ -46,7 +48,7 @@ final class ZipkinGrpcCollector {
sb.service("/zipkin.proto3.SpanService/Report", new
SpanService(collector, grpcMetrics));
}
- static final class SpanService extends AbstractUnaryGrpcService {
+ static final class SpanService extends AbstractUnsafeUnaryGrpcService {
final Collector collector;
final CollectorMetrics metrics;
@@ -56,24 +58,30 @@ final class ZipkinGrpcCollector {
this.metrics = metrics;
}
- @Override protected CompletableFuture<byte[]> handleMessage(byte[] bytes) {
+ @Override protected CompletableFuture<ByteBuf> handleMessage(ByteBuf
bytes) {
metrics.incrementMessages();
- metrics.incrementBytes(bytes.length);
+ metrics.incrementBytes(bytes.readableBytes());
- if (bytes.length == 0) {
+ if (!bytes.isReadable()) {
return CompletableFuture.completedFuture(bytes); // lenient on empty
messages
}
- CompletableFutureCallback result = new CompletableFutureCallback();
- collector.acceptSpans(bytes, SpanBytesDecoder.PROTO3, result);
- return result;
+
+ try {
+ CompletableFutureCallback result = new CompletableFutureCallback();
+ List<Span> spans =
SpanBytesDecoder.PROTO3.decodeList(bytes.nioBuffer());
+ collector.accept(spans, result);
+ return result;
+ } finally {
+ bytes.release();
+ }
}
}
- static final class CompletableFutureCallback extends
CompletableFuture<byte[]>
+ static final class CompletableFutureCallback extends
CompletableFuture<ByteBuf>
implements Callback<Void> {
@Override public void onSuccess(Void value) {
- complete(EMPTY);
+ complete(Unpooled.EMPTY_BUFFER);
}
@Override public void onError(Throwable t) {