This is an automated email from the ASF dual-hosted git repository. ndimiduk pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit b910b117314eb5f7c1dae8767883fb0a6637adb1 Author: Nick Dimiduk <[email protected]> AuthorDate: Wed Feb 16 14:03:55 2022 +0100 HBASE-26531 Trace coprocessor exec endpoints Trace table ExecService invocations as table operations. Ensure span relationships for both table and master invocations. Signed-off-by: Andrew Purtell <[email protected]> --- .../apache/hadoop/hbase/client/AsyncTableImpl.java | 13 +- .../org/apache/hadoop/hbase/client/HTable.java | 230 ++++----- .../hadoop/hbase/client/RawAsyncTableImpl.java | 69 ++- .../hbase/client/RegionCoprocessorRpcChannel.java | 15 +- .../client/RegionCoprocessorRpcChannelImpl.java | 48 +- .../hbase/client/trace/StringTraceRenderer.java | 139 ++++++ .../client/trace/hamcrest/SpanDataMatchers.java | 18 + hbase-endpoint/pom.xml | 23 + .../TestCoprocessorEndpointTracing.java | 549 +++++++++++++++++++++ .../hadoop/hbase/trace/OpenTelemetryClassRule.java | 127 +++++ .../hadoop/hbase/trace/OpenTelemetryTestRule.java | 39 ++ 11 files changed, 1103 insertions(+), 167 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index f2a8e18..0ef6edf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; - import com.google.protobuf.RpcChannel; +import io.opentelemetry.context.Context; import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -279,26 +279,27 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { + final Context context = Context.current(); CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() { @Override public void onRegionComplete(RegionInfo region, R resp) { - pool.execute(() -> callback.onRegionComplete(region, resp)); + pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp))); } @Override public void onRegionError(RegionInfo region, Throwable error) { - pool.execute(() -> callback.onRegionError(region, error)); + pool.execute(context.wrap(() -> callback.onRegionError(region, error))); } @Override public void onComplete() { - pool.execute(() -> callback.onComplete()); + pool.execute(context.wrap(callback::onComplete)); } @Override public void onError(Throwable error) { - pool.execute(() -> callback.onError(error)); + pool.execute(context.wrap(() -> callback.onError(error))); } }; CoprocessorServiceBuilder<S, R> builder = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 856dbc6..f6221fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -31,6 +30,7 @@ import com.google.protobuf.ServiceException; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import java.io.IOException; import java.io.InterruptedIOException; @@ -38,8 +38,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -1168,13 +1168,10 @@ public class HTable implements Table { byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable) throws ServiceException, Throwable { final Map<byte[],R> results = Collections.synchronizedMap( - new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR)); - coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() { - @Override - public void update(byte[] region, byte[] row, R value) { - if (region != null) { - results.put(region, value); - } + new TreeMap<>(Bytes.BYTES_COMPARATOR)); + coprocessorService(service, startKey, endKey, callable, (region, row, value) -> { + if (region != null) { + results.put(region, value); } }); return results; @@ -1184,39 +1181,43 @@ public class HTable implements Table { public <T extends Service, R> void coprocessorService(final Class<T> service, byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable, final Batch.Callback<R> callback) throws ServiceException, Throwable { - // get regions covered by the row range - List<byte[]> keys = getStartKeysInRange(startKey, endKey); - Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (final byte[] r : keys) { - final RegionCoprocessorRpcChannel channel = + final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC); + TraceUtil.trace(() -> { + final Context context = Context.current(); + final ExecutorService wrappedPool = context.wrap(pool); + // get regions covered by the row range + List<byte[]> keys = getStartKeysInRange(startKey, endKey); + Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (final byte[] r : keys) { + final RegionCoprocessorRpcChannel channel = new RegionCoprocessorRpcChannel(connection, tableName, r); - Future<R> future = pool.submit(new Callable<R>() { - @Override - public R call() throws Exception { + Future<R> future = wrappedPool.submit(() -> { T instance = - org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel); + org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel); R result = callable.call(instance); byte[] region = channel.getLastRegion(); if (callback != null) { callback.update(region, r, result); } return result; - } - }); - futures.put(r, future); - } - for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) { - try { - e.getValue().get(); - } catch (ExecutionException ee) { - LOG.warn("Error calling coprocessor service " + service.getName() + " for row " - + Bytes.toStringBinary(e.getKey()), ee); - throw ee.getCause(); - } catch (InterruptedException ie) { - throw new InterruptedIOException("Interrupted calling coprocessor service " + }); + futures.put(r, future); + } + for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) { + try { + e.getValue().get(); + } catch (ExecutionException ee) { + LOG.warn("Error calling coprocessor service {} for row {}", service.getName(), + Bytes.toStringBinary(e.getKey()), ee); + throw ee.getCause(); + } catch (InterruptedException ie) { + throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie); + } } - } + }, supplier); } private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) @@ -1308,17 +1309,14 @@ public class HTable implements Table { public <R extends Message> Map<byte[], R> batchCoprocessorService( Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { - final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>( - Bytes.BYTES_COMPARATOR)); + final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<>( + Bytes.BYTES_COMPARATOR)); batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, - new Callback<R>() { - @Override - public void update(byte[] region, byte[] row, R result) { + (region, row, result) -> { if (region != null) { results.put(region, result); } - } - }); + }); return results; } @@ -1327,88 +1325,92 @@ public class HTable implements Table { final Descriptors.MethodDescriptor methodDescriptor, final Message request, byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback) throws ServiceException, Throwable { + final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC); + TraceUtil.trace(() -> { + final Context context = Context.current(); + final byte[] sanitizedStartKey = Optional.ofNullable(startKey) + .orElse(HConstants.EMPTY_START_ROW); + final byte[] sanitizedEndKey = Optional.ofNullable(endKey) + .orElse(HConstants.EMPTY_END_ROW); + + // get regions covered by the row range + Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions = + getKeysAndRegionsInRange(sanitizedStartKey, sanitizedEndKey, true); + List<byte[]> keys = keysAndRegions.getFirst(); + List<HRegionLocation> regions = keysAndRegions.getSecond(); + + // check if we have any calls to make + if (keys.isEmpty()) { + LOG.info("No regions were selected by key range start={}, end={}", + Bytes.toStringBinary(sanitizedStartKey), Bytes.toStringBinary(sanitizedEndKey)); + return; + } - if (startKey == null) { - startKey = HConstants.EMPTY_START_ROW; - } - if (endKey == null) { - endKey = HConstants.EMPTY_END_ROW; - } - // get regions covered by the row range - Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions = - getKeysAndRegionsInRange(startKey, endKey, true); - List<byte[]> keys = keysAndRegions.getFirst(); - List<HRegionLocation> regions = keysAndRegions.getSecond(); - - // check if we have any calls to make - if (keys.isEmpty()) { - LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) + - ", end=" + Bytes.toStringBinary(endKey)); - return; - } - - List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size()); - final Map<byte[], RegionCoprocessorServiceExec> execsByRow = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (int i = 0; i < keys.size(); i++) { - final byte[] rowKey = keys.get(i); - final byte[] region = regions.get(i).getRegionInfo().getRegionName(); - RegionCoprocessorServiceExec exec = + List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size()); + final Map<byte[], RegionCoprocessorServiceExec> execsByRow = + new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (int i = 0; i < keys.size(); i++) { + final byte[] rowKey = keys.get(i); + final byte[] region = regions.get(i).getRegionInfo().getRegionName(); + RegionCoprocessorServiceExec exec = new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request); - execs.add(exec); - execsByRow.put(rowKey, exec); - } + execs.add(exec); + execsByRow.put(rowKey, exec); + } - // tracking for any possible deserialization errors on success callback - // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here - final List<Throwable> callbackErrorExceptions = new ArrayList<>(); - final List<Row> callbackErrorActions = new ArrayList<>(); - final List<String> callbackErrorServers = new ArrayList<>(); - Object[] results = new Object[execs.size()]; + // tracking for any possible deserialization errors on success callback + // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here + final List<Throwable> callbackErrorExceptions = new ArrayList<>(); + final List<Row> callbackErrorActions = new ArrayList<>(); + final List<String> callbackErrorServers = new ArrayList<>(); + Object[] results = new Object[execs.size()]; - AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, + AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), RpcControllerFactory.instantiate(configuration)); - Callback<ClientProtos.CoprocessorServiceResult> resultsCallback - = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> { - if (LOG.isTraceEnabled()) { - LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() + - ": region=" + Bytes.toStringBinary(region) + - ", row=" + Bytes.toStringBinary(row) + - ", value=" + serviceResult.getValue().getValue()); - } - try { - Message.Builder builder = responsePrototype.newBuilderForType(); - org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, - serviceResult.getValue().getValue().toByteArray()); - callback.update(region, row, (R) builder.build()); - } catch (IOException e) { - LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(), - e); - callbackErrorExceptions.add(e); - callbackErrorActions.add(execsByRow.get(row)); - callbackErrorServers.add("null"); - } - }; - AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = + Callback<ClientProtos.CoprocessorServiceResult> resultsCallback = + (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> { + if (LOG.isTraceEnabled()) { + LOG.trace("Received result for endpoint {}: region={}, row={}, value={}", + methodDescriptor.getFullName(), Bytes.toStringBinary(region), + Bytes.toStringBinary(row), serviceResult.getValue().getValue()); + } + try { + Message.Builder builder = responsePrototype.newBuilderForType(); + org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, + serviceResult.getValue().getValue().toByteArray()); + callback.update(region, row, (R) builder.build()); + } catch (IOException e) { + LOG.error("Unexpected response type from endpoint {}", methodDescriptor.getFullName(), + e); + callbackErrorExceptions.add(e); + callbackErrorActions.add(execsByRow.get(row)); + callbackErrorServers.add("null"); + } + }; + AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = AsyncProcessTask.newBuilder(resultsCallback) - .setPool(pool) - .setTableName(tableName) - .setRowAccess(execs) - .setResults(results) - .setRpcTimeout(readRpcTimeoutMs) - .setOperationTimeout(operationTimeoutMs) - .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) - .build(); - AsyncRequestFuture future = asyncProcess.submit(task); - future.waitUntilDone(); - - if (future.hasError()) { - throw future.getErrors(); - } else if (!callbackErrorExceptions.isEmpty()) { - throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions, - callbackErrorServers); - } + .setPool(context.wrap(pool)) + .setTableName(tableName) + .setRowAccess(execs) + .setResults(results) + .setRpcTimeout(readRpcTimeoutMs) + .setOperationTimeout(operationTimeoutMs) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture future = asyncProcess.submit(task); + future.waitUntilDone(); + + if (future.hasError()) { + throw future.getErrors(); + } else if (!callbackErrorExceptions.isEmpty()) { + throw new RetriesExhaustedWithDetailsException( + callbackErrorExceptions, callbackErrorActions, callbackErrorServers); + } + }, supplier); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 437f12f..12ddbd8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -26,9 +26,10 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMu import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; - import com.google.protobuf.RpcChannel; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -51,16 +52,15 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.io.netty.util.Timer; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -793,14 +793,22 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs); + final Span span = Span.current(); S stub = stubMaker.apply(channel); CompletableFuture<R> future = new CompletableFuture<>(); ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); callable.call(stub, controller, resp -> { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - } else { - future.complete(resp); + try (Scope ignored = span.makeCurrent()) { + if (controller.failed()) { + final Throwable failure = controller.getFailed(); + future.completeExceptionally(failure); + TraceUtil.setError(span, failure); + } else { + future.complete(resp); + span.setStatus(StatusCode.OK); + } + } finally { + span.end(); } }); return future; @@ -833,8 +841,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { + final Span span = Span.current(); if (error != null) { callback.onError(error); + TraceUtil.setError(span, error); + span.end(); return; } unfinishedRequest.incrementAndGet(); @@ -845,17 +856,23 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { addListener( conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, operationTimeoutNs), - (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, - locateFinished, unfinishedRequest, l, e)); + (l, e) -> { + try (Scope ignored = span.makeCurrent()) { + onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, + locateFinished, unfinishedRequest, l, e); + } + }); } addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { - if (e != null) { - callback.onRegionError(region, e); - } else { - callback.onRegionComplete(region, r); - } - if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { - callback.onComplete(); + try (Scope ignored = span.makeCurrent()) { + if (e != null) { + callback.onRegionError(region, e); + } else { + callback.onRegionComplete(region, r); + } + if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { + callback.onComplete(); + } } }); } @@ -906,10 +923,22 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { @Override public void execute() { - addListener(conn.getLocator().getRegionLocation(tableName, startKey, - startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs), - (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, - endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); + final Span span = newTableOperationSpanBuilder() + .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC) + .build(); + try (Scope ignored = span.makeCurrent()) { + final RegionLocateType regionLocateType = startKeyInclusive + ? RegionLocateType.CURRENT + : RegionLocateType.AFTER; + final CompletableFuture<HRegionLocation> future = conn.getLocator() + .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs); + addListener(future, (loc, error) -> { + try (Scope ignored1 = span.makeCurrent()) { + onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, + endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error); + } + }); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java index 448302c..de8926b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.IOException; import org.apache.hadoop.hbase.HConstants; @@ -76,15 +78,18 @@ class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { if (row == null) { throw new NullPointerException("Can't be null!"); } + final Context context = Context.current(); ClientServiceCallable<CoprocessorServiceResponse> callable = - new ClientServiceCallable<CoprocessorServiceResponse>(this.conn, - this.table, this.row, this.conn.getRpcControllerFactory().newController(), HConstants.PRIORITY_UNSET) { + new ClientServiceCallable<CoprocessorServiceResponse>(this.conn, this.table, this.row, + this.conn.getRpcControllerFactory().newController(), HConstants.PRIORITY_UNSET) { @Override protected CoprocessorServiceResponse rpcCall() throws Exception { - byte [] regionName = getLocation().getRegionInfo().getRegionName(); - CoprocessorServiceRequest csr = + try (Scope ignored = context.makeCurrent()) { + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request, row, regionName); - return getStub().execService(getRpcController(), csr); + return getStub().execService(getRpcController(), csr); + } } }; CoprocessorServiceResponse result = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java index 94e7d9a..009936a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,12 +18,13 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; - import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -34,10 +35,8 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; /** * The implementation of a region based coprocessor rpc channel. @@ -70,6 +69,7 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel { private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request, Message responsePrototype, HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub) { + final Context context = Context.current(); CompletableFuture<Message> future = new CompletableFuture<>(); if (region != null && !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName())) { @@ -80,38 +80,42 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel { } CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request, row, loc.getRegionInfo().getRegionName()); - stub.execService(controller, csr, - new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() { - - @Override - public void run(CoprocessorServiceResponse resp) { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - } else { - try { - future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); - } catch (IOException e) { - future.completeExceptionally(e); - } + stub.execService(controller, csr, resp -> { + try (Scope ignored = context.makeCurrent()) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); + } catch (IOException e) { + future.completeExceptionally(e); } } - }); + } + }); return future; } @Override public void callMethod(MethodDescriptor method, RpcController controller, Message request, Message responsePrototype, RpcCallback<Message> done) { + final Context context = Context.current(); addListener( conn.callerFactory.<Message> single().table(tableName).row(row) .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(), + .action((c, l, s) -> { + try (Scope ignored = context.makeCurrent()) { + return rpcCall(method, request, responsePrototype, c, l, s); + } + }).call(), (r, e) -> { - if (e != null) { - ((ClientCoprocessorRpcController) controller).setFailed(e); + try (Scope ignored = context.makeCurrent()) { + if (e != null) { + ((ClientCoprocessorRpcController) controller).setFailed(e); + } + done.run(r); } - done.run(r); }); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/StringTraceRenderer.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/StringTraceRenderer.java new file mode 100644 index 0000000..2c70612 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/StringTraceRenderer.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.trace; + +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Rudimentary tool for visualizing a hierarchy of spans. Given a collection of spans, indexes + * them from parents to children and prints them out one per line, indented. + */ [email protected] +public class StringTraceRenderer { + private static final Logger logger = LoggerFactory.getLogger(StringTraceRenderer.class); + + private final List<Node> graphs; + + public StringTraceRenderer(final Collection<SpanData> spans) { + final Map<String, Node> spansById = indexSpansById(spans); + populateChildren(spansById); + graphs = findRoots(spansById); + } + + private static Map<String, Node> indexSpansById(final Collection<SpanData> spans) { + final Map<String, Node> spansById = new HashMap<>(spans.size()); + spans.forEach(span -> spansById.put(span.getSpanId(), new Node(span))); + return spansById; + } + + private static void populateChildren(final Map<String, Node> spansById) { + spansById.forEach((spanId, node) -> { + final SpanData spanData = node.spanData; + final String parentSpanId = spanData.getParentSpanId(); + if (Objects.equals(parentSpanId, SpanId.getInvalid())) { + return; + } + final Node parentNode = spansById.get(parentSpanId); + if (parentNode == null) { + logger.warn("Span {} has parent {} that is not found in index, {}", spanId, parentSpanId, + spanData); + return; + } + parentNode.children.put(spanId, node); + }); + } + + private static List<Node> findRoots(final Map<String, Node> spansById) { + return spansById.values() + .stream() + .filter(node -> Objects.equals(node.spanData.getParentSpanId(), SpanId.getInvalid())) + .collect(Collectors.toList()); + } + + public void render(final Consumer<String> writer) { + for (ListIterator<Node> iter = graphs.listIterator(); iter.hasNext(); ) { + final int idx = iter.nextIndex(); + final Node node = iter.next(); + render(writer, node, 0, idx == 0); + } + } + + private static void render( + final Consumer<String> writer, + final Node node, + final int indent, + final boolean isFirst + ) { + writer.accept(render(node.spanData, indent, isFirst)); + final List<Node> children = new ArrayList<>(node.children.values()); + for (ListIterator<Node> iter = children.listIterator(); iter.hasNext(); ) { + final int idx = iter.nextIndex(); + final Node child = iter.next(); + render(writer, child, indent + 2, idx == 0); + } + } + + private static String render( + final SpanData spanData, + final int indent, + final boolean isFirst + ) { + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < indent; i++) { + sb.append(' '); + } + + return sb.append(isFirst ? "└─ " : "├─ ") + .append(render(spanData)) + .toString(); + } + + private static String render(final SpanData spanData) { + return new ToStringBuilder(spanData, ToStringStyle.NO_CLASS_NAME_STYLE) + .append("spanId", spanData.getSpanId()) + .append("name", spanData.getName()) + .append("hasEnded", spanData.hasEnded()) + .toString(); + } + + private static class Node { + final SpanData spanData; + final LinkedHashMap<String, Node> children; + + Node(final SpanData spanData) { + this.spanData = spanData; + this.children = new LinkedHashMap<>(); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java index a9473da..01aa618 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java @@ -99,6 +99,24 @@ public final class SpanDataMatchers { }; } + public static Matcher<SpanData> hasParentSpanId(String parentSpanId) { + return hasParentSpanId(equalTo(parentSpanId)); + } + + public static Matcher<SpanData> hasParentSpanId(SpanData parent) { + return hasParentSpanId(parent.getSpanId()); + } + + public static Matcher<SpanData> hasParentSpanId(Matcher<String> matcher) { + return new FeatureMatcher<SpanData, String>(matcher, "SpanKind with a parentSpanId that", + "parentSpanId" + ) { + @Override protected String featureValueOf(SpanData item) { + return item.getParentSpanId(); + } + }; + } + public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) { final Matcher<StatusCode> matcher = is(equalTo(statusCode)); return new TypeSafeMatcher<SpanData>() { diff --git a/hbase-endpoint/pom.xml b/hbase-endpoint/pom.xml index 3709329..83ae850 100644 --- a/hbase-endpoint/pom.xml +++ b/hbase-endpoint/pom.xml @@ -143,6 +143,12 @@ <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> <!--Some of the CPEPs use hbase server-side internals; they shouldn't!--> <dependency> <groupId>org.apache.hbase</groupId> @@ -247,6 +253,23 @@ <artifactId>log4j-1.2-api</artifactId> <scope>test</scope> </dependency> + <!-- when depending on a test jar, maven does not pull in the transitive dependencies. require + them manually. --> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-trace</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-testing</artifactId> + <scope>test</scope> + </dependency> </dependencies> <profiles> <!-- Skip the tests in this module --> diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpointTracing.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpointTracing.java new file mode 100644 index 0000000..80959b8 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpointTracing.java @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.everyItem; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasProperty; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import com.google.protobuf.Descriptors; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ConnectionRule; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MatcherPredicate; +import org.apache.hadoop.hbase.MiniClusterRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.AsyncAdmin; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.ServiceCaller; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; +import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.Matcher; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExternalResource; +import org.junit.rules.RuleChain; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; + +/** + * Test cases to verify tracing coprocessor Endpoint execution + */ +@Category({ CoprocessorTests.class, MediumTests.class}) +public class TestCoprocessorEndpointTracing { + private static final Logger logger = + LoggerFactory.getLogger(TestCoprocessorEndpointTracing.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCoprocessorEndpointTracing.class); + + private static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create(); + private static final MiniClusterRule miniclusterRule = MiniClusterRule.newBuilder() + .setConfiguration(() -> { + final Configuration conf = new Configuration(); + conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + return conf; + }) + .build(); + private static final ConnectionRule connectionRule = ConnectionRule.createConnectionRule( + miniclusterRule::createConnection, miniclusterRule::createAsyncConnection); + + private static final class Setup extends ExternalResource { + @Override + protected void before() throws Throwable { + final HBaseTestingUtility util = miniclusterRule.getTestingUtility(); + final AsyncConnection connection = connectionRule.getAsyncConnection(); + final AsyncAdmin admin = connection.getAdmin(); + final TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TEST_TABLE) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)).build(); + admin.createTable(tableDescriptor).get(); + util.waitUntilAllRegionsAssigned(TEST_TABLE); + } + } + + @ClassRule + public static final TestRule testRule = RuleChain.outerRule(otelClassRule) + .around(miniclusterRule) + .around(connectionRule) + .around(new Setup()); + + private static final TableName TEST_TABLE = + TableName.valueOf(TestCoprocessorEndpointTracing.class.getSimpleName()); + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + + @Rule + public OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule); + + @Rule + public TestName testName = new TestName(); + + @Test + public void traceAsyncTableEndpoint() { + final AsyncConnection connection = connectionRule.getAsyncConnection(); + final AsyncTable<?> table = connection.getTable(TEST_TABLE); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final CompletableFuture<Map<byte[], String>> future = new CompletableFuture<>(); + final AsyncTable.CoprocessorCallback<EchoResponseProto> callback = + new AsyncTable.CoprocessorCallback<EchoResponseProto>() { + final ConcurrentMap<byte[], String> results = new ConcurrentHashMap<>(); + + @Override + public void onRegionComplete(RegionInfo region, EchoResponseProto resp) { + if (!future.isDone()) { + results.put(region.getRegionName(), resp.getMessage()); + } + } + + @Override + public void onRegionError(RegionInfo region, Throwable error) { + if (!future.isDone()) { + future.completeExceptionally(error); + } + } + + @Override + public void onComplete() { + if (!future.isDone()) { + future.complete(results); + } + } + + @Override + public void onError(Throwable error) { + if (!future.isDone()) { + future.completeExceptionally(error); + } + } + }; + + final Map<byte[], String> results = TraceUtil.trace(() -> { + table.coprocessorService(TestProtobufRpcProto::newStub, + (stub, controller, cb) -> stub.echo(controller, request, cb), callback) + .execute(); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }, testName.getMethodName()); + assertNotNull(results); + assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results)); + assertThat(results.values(), everyItem(allOf( + notNullValue(), + equalTo("hello")))); + + final Matcher<SpanData> parentMatcher = allOf(hasName(testName.getMethodName()), hasEnded()); + waitForAndLog(parentMatcher); + final List<SpanData> spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(tableOpMatcher)); + final SpanData tableOpSpan = spans.stream() + .filter(tableOpMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> rpcMatcher = allOf( + hasName("hbase.pb.ClientService/ExecService"), + hasParentSpanId(tableOpSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceSyncTableEndpointCall() throws Exception { + final Connection connection = connectionRule.getConnection(); + try (final Table table = connection.getTable(TEST_TABLE)) { + final RpcController controller = new ServerRpcController(); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final CoprocessorRpcUtils.BlockingRpcCallback<EchoResponseProto> callback = + new CoprocessorRpcUtils.BlockingRpcCallback<>(); + final Map<byte[], EchoResponseProto> results = TraceUtil.trace(() -> { + try { + return table.coprocessorService(TestProtobufRpcProto.class, null, null, + t -> { + t.echo(controller, request, callback); + return callback.get(); + }); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, testName.getMethodName()); + assertNotNull(results); + assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results)); + assertThat(results.values(), everyItem(allOf( + notNullValue(), + hasProperty("message", equalTo("hello"))))); + } + + final Matcher<SpanData> parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List<SpanData> spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(tableOpMatcher)); + final SpanData tableOpSpan = spans.stream() + .filter(tableOpMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> rpcMatcher = allOf( + hasName("hbase.pb.ClientService/ExecService"), + hasParentSpanId(tableOpSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceSyncTableEndpointCallAndCallback() throws Exception { + final Connection connection = connectionRule.getConnection(); + try (final Table table = connection.getTable(TEST_TABLE)) { + final RpcController controller = new ServerRpcController(); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final CoprocessorRpcUtils.BlockingRpcCallback<EchoResponseProto> callback = + new CoprocessorRpcUtils.BlockingRpcCallback<>(); + final ConcurrentMap<byte[], EchoResponseProto> results = new ConcurrentHashMap<>(); + TraceUtil.trace(() -> { + try { + table.coprocessorService(TestProtobufRpcProto.class, null, null, + t -> { + t.echo(controller, request, callback); + return callback.get(); + }, + (region, row, result) -> { + results.put(region, result); + }); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, testName.getMethodName()); + assertNotNull(results); + assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results)); + assertThat(results.values(), everyItem(allOf( + notNullValue(), + hasProperty("message", equalTo("hello"))))); + } + + final Matcher<SpanData> parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List<SpanData> spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(tableOpMatcher)); + final SpanData tableOpSpan = spans.stream() + .filter(tableOpMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> rpcMatcher = allOf( + hasName("hbase.pb.ClientService/ExecService"), + hasParentSpanId(tableOpSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceSyncTableRegionCoprocessorRpcChannel() throws Exception { + final Connection connection = connectionRule.getConnection(); + try (final Table table = connection.getTable(TEST_TABLE)) { + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final EchoResponseProto response = TraceUtil.trace(() -> { + try { + final CoprocessorRpcChannel channel = table.coprocessorService(new byte[] {}); + final TestProtobufRpcProto.BlockingInterface service = + TestProtobufRpcProto.newBlockingStub(channel); + return service.echo(null, request); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, testName.getMethodName()); + assertNotNull(response); + assertEquals("hello", response.getMessage()); + } + + final Matcher<SpanData> parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List<SpanData> spans = otelClassRule.getSpans(); + + /* + * This interface is really low level: it returns a Channel and expects the caller to invoke it. + * The Table instance isn't issuing a command here, it's not a table operation, so don't expect + * there to be a span like `COPROC_EXEC table`. + */ + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan)); + assertThat(spans, not(hasItem(tableOpMatcher))); + } + + @Test + public void traceSyncTableBatchEndpoint() throws Exception { + final Connection connection = connectionRule.getConnection(); + try (final Table table = connection.getTable(TEST_TABLE)) { + final Descriptors.MethodDescriptor descriptor = + TestProtobufRpcProto.getDescriptor().findMethodByName("echo"); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final Map<byte[], EchoResponseProto> response = TraceUtil.trace(() -> { + try { + return table.batchCoprocessorService( + descriptor, request, null, null, EchoResponseProto.getDefaultInstance()); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, testName.getMethodName()); + assertNotNull(response); + assertThat(response.values(), everyItem(allOf( + notNullValue(), + hasProperty("message", equalTo("hello"))))); + } + + final Matcher<SpanData> parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List<SpanData> spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(tableOpMatcher)); + final SpanData tableOpSpan = spans.stream() + .filter(tableOpMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> rpcMatcher = allOf( + hasName("hbase.pb.ClientService/Multi"), + hasParentSpanId(tableOpSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceSyncTableBatchEndpointCallback() throws Exception { + final Connection connection = connectionRule.getConnection(); + try (final Table table = connection.getTable(TEST_TABLE)) { + final Descriptors.MethodDescriptor descriptor = + TestProtobufRpcProto.getDescriptor().findMethodByName("echo"); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final ConcurrentMap<byte[], EchoResponseProto> results = new ConcurrentHashMap<>(); + TraceUtil.trace(() -> { + try { + table.batchCoprocessorService(descriptor, request, null, null, + EchoResponseProto.getDefaultInstance(), (region, row, res) -> results.put(region, res)); + } catch (Throwable t) { + throw new RuntimeException(t); + } + }, testName.getMethodName()); + assertNotNull(results); + assertTrue("coprocessor call returned no results.", MapUtils.isNotEmpty(results)); + assertThat(results.values(), everyItem(allOf( + notNullValue(), + hasProperty("message", equalTo("hello"))))); + } + + final Matcher<SpanData> parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List<SpanData> spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> tableOpMatcher = allOf( + hasName(containsString("COPROC_EXEC")), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(tableOpMatcher)); + final SpanData tableOpSpan = spans.stream() + .filter(tableOpMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> rpcMatcher = allOf( + hasName("hbase.pb.ClientService/Multi"), + hasParentSpanId(tableOpSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceAsyncAdminEndpoint() throws Exception { + final AsyncConnection connection = connectionRule.getAsyncConnection(); + final AsyncAdmin admin = connection.getAdmin(); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final ServiceCaller<TestProtobufRpcProto, EchoResponseProto> callback = + (stub, controller, cb) -> stub.echo(controller, request, cb); + + final String response = TraceUtil.tracedFuture( + () -> admin.coprocessorService(TestProtobufRpcProto::newStub, callback), + testName.getMethodName()) + .get() + .getMessage(); + assertEquals("hello", response); + + final Matcher<SpanData> parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List<SpanData> spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> rpcMatcher = allOf( + hasName("hbase.pb.MasterService/ExecMasterService"), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + @Test + public void traceSyncAdminEndpoint() throws Exception { + final Connection connection = connectionRule.getConnection(); + try (final Admin admin = connection.getAdmin()) { + final TestProtobufRpcProto.BlockingInterface service = + TestProtobufRpcProto.newBlockingStub(admin.coprocessorService()); + final EchoRequestProto request = EchoRequestProto.newBuilder().setMessage("hello").build(); + final String response = TraceUtil.trace(() -> { + try { + return service.echo(null, request).getMessage(); + } catch (ServiceException e) { + throw new RuntimeException(e); + } + }, testName.getMethodName()); + assertEquals("hello", response); + } + + final Matcher<SpanData> parentMatcher = allOf( + hasName(testName.getMethodName()), + hasEnded()); + waitForAndLog(parentMatcher); + final List<SpanData> spans = otelClassRule.getSpans(); + + final SpanData testSpan = spans.stream() + .filter(parentMatcher::matches) + .findFirst() + .orElseThrow(AssertionError::new); + final Matcher<SpanData> rpcMatcher = allOf( + hasName("hbase.pb.MasterService/ExecMasterService"), + hasParentSpanId(testSpan), + hasStatusWithCode(StatusCode.OK)); + assertThat(spans, hasItem(rpcMatcher)); + } + + private void waitForAndLog(Matcher<SpanData> spanMatcher) { + final Configuration conf = connectionRule.getAsyncConnection().getConfiguration(); + Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( + otelClassRule::getSpans, hasItem(spanMatcher))); + final List<SpanData> spans = otelClassRule.getSpans(); + if (logger.isDebugEnabled()) { + StringTraceRenderer renderer = new StringTraceRenderer(spans); + renderer.render(logger::debug); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryClassRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryClassRule.java new file mode 100644 index 0000000..3bbf2d4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryClassRule.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.util.List; +import org.junit.rules.ExternalResource; + +/** + * <p>Like {@link OpenTelemetryRule}, except modeled after the junit5 implementation + * {@code OpenTelemetryExtension}. Use this class when you need to make asserts on {@link SpanData} + * created on a MiniCluster. Make sure this rule initialized before the MiniCluster so that it can + * register its instance of {@link OpenTelemetry} as the global instance before any server-side + * component can call {@link TraceUtil#getGlobalTracer()}.</p> + * <p>For example:</p> + * <pre>{@code + * public class TestMyClass { + * private static final OpenTelemetryClassRule otelClassRule = + * OpenTelemetryClassRule.create(); + * private static final MiniClusterRule miniClusterRule = + * MiniClusterRule.newBuilder().build(); + * protected static final ConnectionRule connectionRule = + * new ConnectionRule(miniClusterRule::createConnection); + * + * @ClassRule + * public static final TestRule classRule = RuleChain.outerRule(otelClassRule) + * .around(miniClusterRule) + * .around(connectionRule); + * + * @Rule + * public final OpenTelemetryTestRule otelTestRule = + * new OpenTelemetryTestRule(otelClassRule); + * + * @Test + * public void myTest() { + * // ... + * // do something that makes spans + * final List<SpanData> spans = otelClassRule.getSpans(); + * // make assertions on them + * } + * } + * }</pre> + * + * @see <a href="https://github.com/open-telemetry/opentelemetry-java/blob/9a330d0/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/junit5/OpenTelemetryExtension.java">junit5/OpenTelemetryExtension.java</a> + */ +public final class OpenTelemetryClassRule extends ExternalResource { + + public static OpenTelemetryClassRule create() { + InMemorySpanExporter spanExporter = InMemorySpanExporter.create(); + + SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setTracerProvider(tracerProvider) + .build(); + + return new OpenTelemetryClassRule(openTelemetry, spanExporter); + } + + private final OpenTelemetrySdk openTelemetry; + private final InMemorySpanExporter spanExporter; + + private OpenTelemetryClassRule( + final OpenTelemetrySdk openTelemetry, + final InMemorySpanExporter spanExporter + ) { + this.openTelemetry = openTelemetry; + this.spanExporter = spanExporter; + } + + /** Returns the {@link OpenTelemetry} created by this Rule. */ + public OpenTelemetry getOpenTelemetry() { + return openTelemetry; + } + + /** Returns all the exported {@link SpanData} so far. */ + public List<SpanData> getSpans() { + return spanExporter.getFinishedSpanItems(); + } + + /** + * Clears the collected exported {@link SpanData}. + */ + public void clearSpans() { + spanExporter.reset(); + } + + @Override + protected void before() throws Throwable { + GlobalOpenTelemetry.resetForTest(); + GlobalOpenTelemetry.set(openTelemetry); + } + + @Override + protected void after() { + GlobalOpenTelemetry.resetForTest(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryTestRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryTestRule.java new file mode 100644 index 0000000..a6b50ff --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryTestRule.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import org.junit.rules.ExternalResource; + +/** + * Used alongside {@link OpenTelemetryClassRule}. See that class's javadoc for details on when to + * use these classes instead of {@link io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule} and + * an example of how to use these classes together. + */ +public final class OpenTelemetryTestRule extends ExternalResource { + + private final OpenTelemetryClassRule classRuleSupplier; + + public OpenTelemetryTestRule(final OpenTelemetryClassRule classRule) { + this.classRuleSupplier = classRule; + } + + @Override + protected void before() throws Throwable { + classRuleSupplier.clearSpans(); + } +}
