This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cedb4596a3 HDDS-10268. [hsync] Add OpenTracing traces to client side
read path (#6262)
cedb4596a3 is described below
commit cedb4596a328d6f4c5e5fd336fe5a8b55b37d4ae
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Wed Apr 10 09:38:20 2024 -0700
HDDS-10268. [hsync] Add OpenTracing traces to client side read path (#6262)
---
.../hadoop/hdds/scm/ContainerClientMetrics.java | 79 +++++++++++++++++++++-
.../hdds/scm/storage/ContainerProtocolCalls.java | 76 +++++++++++++++++++--
.../apache/hadoop/hdds/tracing/TracingUtil.java | 10 +++
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 2 +
.../fs/ozone/BasicRootedOzoneFileSystem.java | 65 ++++++++++++++----
.../apache/hadoop/fs/ozone/OzoneFSInputStream.java | 42 +++++++++---
.../hadoop/fs/ozone/OzoneFSOutputStream.java | 19 ++++--
.../hadoop/fs/ozone/RootedOzoneFileSystem.java | 13 ++++
8 files changed, 271 insertions(+), 35 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
index d51dfa4163..422943fff0 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.ozone.OzoneConsts;
import java.util.Map;
@@ -51,6 +52,11 @@ public final class ContainerClientMetrics {
private MutableCounterLong totalWriteChunkCalls;
@Metric
private MutableCounterLong totalWriteChunkBytes;
+ private MutableQuantiles[] listBlockLatency;
+ private MutableQuantiles[] getBlockLatency;
+ private MutableQuantiles[] getCommittedBlockLengthLatency;
+ private MutableQuantiles[] readChunkLatency;
+ private MutableQuantiles[] getSmallFileLatency;
private final Map<PipelineID, MutableCounterLong> writeChunkCallsByPipeline;
private final Map<PipelineID, MutableCounterLong> writeChunkBytesByPipeline;
private final Map<UUID, MutableCounterLong> writeChunksCallsByLeaders;
@@ -84,6 +90,36 @@ public final class ContainerClientMetrics {
writeChunkCallsByPipeline = new ConcurrentHashMap<>();
writeChunkBytesByPipeline = new ConcurrentHashMap<>();
writeChunksCallsByLeaders = new ConcurrentHashMap<>();
+
+ listBlockLatency = new MutableQuantiles[3];
+ getBlockLatency = new MutableQuantiles[3];
+ getCommittedBlockLengthLatency = new MutableQuantiles[3];
+ readChunkLatency = new MutableQuantiles[3];
+ getSmallFileLatency = new MutableQuantiles[3];
+ int[] intervals = {60, 300, 900};
+ for (int i = 0; i < intervals.length; i++) {
+ int interval = intervals[i];
+ listBlockLatency[i] = registry
+ .newQuantiles("listBlockLatency" + interval
+ + "s", "ListBlock latency in microseconds", "ops",
+ "latency", interval);
+ getBlockLatency[i] = registry
+ .newQuantiles("getBlockLatency" + interval
+ + "s", "GetBlock latency in microseconds", "ops",
+ "latency", interval);
+ getCommittedBlockLengthLatency[i] = registry
+ .newQuantiles("getCommittedBlockLengthLatency" + interval
+ + "s", "GetCommittedBlockLength latency in microseconds",
+ "ops", "latency", interval);
+ readChunkLatency[i] = registry
+ .newQuantiles("readChunkLatency" + interval
+ + "s", "ReadChunk latency in microseconds", "ops",
+ "latency", interval);
+ getSmallFileLatency[i] = registry
+ .newQuantiles("getSmallFileLatency" + interval
+ + "s", "GetSmallFile latency in microseconds", "ops",
+ "latency", interval);
+ }
}
public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) {
@@ -111,7 +147,48 @@ public final class ContainerClientMetrics {
totalWriteChunkBytes.incr(chunkSizeBytes);
}
- MutableCounterLong getTotalWriteChunkBytes() {
+ public void addListBlockLatency(long latency) {
+ for (MutableQuantiles q : listBlockLatency) {
+ if (q != null) {
+ q.add(latency);
+ }
+ }
+ }
+
+ public void addGetBlockLatency(long latency) {
+ for (MutableQuantiles q : getBlockLatency) {
+ if (q != null) {
+ q.add(latency);
+ }
+ }
+ }
+
+ public void addGetCommittedBlockLengthLatency(long latency) {
+ for (MutableQuantiles q : getCommittedBlockLengthLatency) {
+ if (q != null) {
+ q.add(latency);
+ }
+ }
+ }
+
+ public void addReadChunkLatency(long latency) {
+ for (MutableQuantiles q : readChunkLatency) {
+ if (q != null) {
+ q.add(latency);
+ }
+ }
+ }
+
+ public void addGetSmallFileLatency(long latency) {
+ for (MutableQuantiles q : getSmallFileLatency) {
+ if (q != null) {
+ q.add(latency);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public MutableCounterLong getTotalWriteChunkBytes() {
return totalWriteChunkBytes;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 03b7844cc9..72754d1f1c 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -29,6 +29,9 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
+import io.opentracing.Scope;
+import io.opentracing.Span;
+import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -64,6 +67,7 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenExcep
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.security.token.Token;
@@ -128,6 +132,10 @@ public final class ContainerProtocolCalls {
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ builder.setTraceID(traceId);
+ }
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
@@ -146,14 +154,17 @@ public final class ContainerProtocolCalls {
try {
return op.apply(d);
} catch (IOException e) {
+ Span span = GlobalTracer.get().activeSpan();
if (e instanceof StorageContainerException) {
StorageContainerException sce = (StorageContainerException)e;
// Block token expired. There's no point retrying other DN.
// Throw the exception to request a new block token right away.
if (sce.getResult() == BLOCK_TOKEN_VERIFICATION_FAILED) {
+ span.log("block token verification failed at DN " + d);
throw e;
}
}
+ span.log("failed to connect to DN " + d);
excluded.add(d);
if (excluded.size() < pipeline.size()) {
LOG.warn(toErrorMessage.apply(d)
@@ -211,6 +222,10 @@ public final class ContainerProtocolCalls {
List<Validator> validators,
ContainerCommandRequestProto.Builder builder,
DatanodeDetails datanode) throws IOException {
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ builder.setTraceID(traceId);
+ }
final ContainerCommandRequestProto request = builder
.setDatanodeUuid(datanode.getUuidString()).build();
ContainerCommandResponseProto response =
@@ -246,6 +261,10 @@ public final class ContainerProtocolCalls {
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ builder.setTraceID(traceId);
+ }
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
xceiverClient.sendCommand(request, getValidatorList());
@@ -319,10 +338,19 @@ public final class ContainerProtocolCalls {
builder.setEncodedToken(token.encodeToUrlString());
}
- return tryEachDatanode(xceiverClient.getPipeline(),
- d -> readChunk(xceiverClient, chunk, blockID,
- validators, builder, d),
- d -> toErrorMessage(chunk, blockID, d));
+ Span span = GlobalTracer.get()
+ .buildSpan("readChunk").start();
+ try (Scope ignored = GlobalTracer.get().activateSpan(span)) {
+ span.setTag("offset", chunk.getOffset())
+ .setTag("length", chunk.getLen())
+ .setTag("block", blockID.toString());
+ return tryEachDatanode(xceiverClient.getPipeline(),
+ d -> readChunk(xceiverClient, chunk, blockID,
+ validators, builder, d),
+ d -> toErrorMessage(chunk, blockID, d));
+ } finally {
+ span.finish();
+ }
}
private static ContainerProtos.ReadChunkResponseProto readChunk(
@@ -330,10 +358,15 @@ public final class ContainerProtocolCalls {
List<Validator> validators,
ContainerCommandRequestProto.Builder builder,
DatanodeDetails d) throws IOException {
- final ContainerCommandRequestProto request = builder
- .setDatanodeUuid(d.getUuidString()).build();
+ ContainerCommandRequestProto.Builder requestBuilder = builder
+ .setDatanodeUuid(d.getUuidString());
+ Span span = GlobalTracer.get().activeSpan();
+ String traceId = TracingUtil.exportSpan(span);
+ if (traceId != null) {
+ requestBuilder = requestBuilder.setTraceID(traceId);
+ }
ContainerCommandResponseProto reply =
- xceiverClient.sendCommand(request, validators);
+ xceiverClient.sendCommand(requestBuilder.build(), validators);
final ReadChunkResponseProto response = reply.getReadChunk();
final long readLen = getLen(response);
if (readLen != chunk.getLen()) {
@@ -515,6 +548,11 @@ public final class ContainerProtocolCalls {
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ request.setTraceID(traceId);
+ }
+
request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setContainerID(containerID);
request.setCreateContainer(createRequest.build());
@@ -544,6 +582,10 @@ public final class ContainerProtocolCalls {
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ request.setTraceID(traceId);
+ }
client.sendCommand(request.build(), getValidatorList());
}
@@ -566,6 +608,10 @@ public final class ContainerProtocolCalls {
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ request.setTraceID(traceId);
+ }
client.sendCommand(request.build(), getValidatorList());
}
@@ -589,6 +635,10 @@ public final class ContainerProtocolCalls {
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ request.setTraceID(traceId);
+ }
ContainerCommandResponseProto response =
client.sendCommand(request.build(), getValidatorList());
@@ -624,6 +674,10 @@ public final class ContainerProtocolCalls {
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ builder.setTraceID(traceId);
+ }
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
client.sendCommand(request, getValidatorList());
@@ -694,6 +748,10 @@ public final class ContainerProtocolCalls {
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ builder.setTraceID(traceId);
+ }
ContainerCommandRequestProto request = builder.build();
Map<DatanodeDetails, ContainerCommandResponseProto> responses =
xceiverClient.sendCommandOnAllNodes(request);
@@ -719,6 +777,10 @@ public final class ContainerProtocolCalls {
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ request.setTraceID(traceId);
+ }
Map<DatanodeDetails, ContainerCommandResponseProto> responses =
client.sendCommandOnAllNodes(request.build());
for (Map.Entry<DatanodeDetails, ContainerCommandResponseProto> entry :
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java
index b968d40723..29bd847319 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java
@@ -139,6 +139,16 @@ public final class TracingUtil {
ScmConfigKeys.HDDS_TRACING_ENABLED_DEFAULT);
}
+ /**
+ * Execute {@code runnable} inside an activated new span.
+ */
+ public static <E extends Exception> void executeInNewSpan(String spanName,
+ CheckedRunnable<E> runnable) throws E {
+ Span span = GlobalTracer.get()
+ .buildSpan(spanName).start();
+ executeInSpan(span, runnable);
+ }
+
/**
* Execute {@code supplier} inside an activated new span.
*/
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index c990d25735..0806ffb847 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -321,6 +321,8 @@ public class RpcClient implements ClientProtocol {
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, ecReconstructExecutor);
this.clientMetrics = ContainerClientMetrics.acquire();
+
+ TracingUtil.initTracing("client", conf);
}
public XceiverClientFactory getXceiverClientManager() {
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index 1fcb1554b6..3ba291ae0f 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.fs.ozone;
import com.google.common.base.Preconditions;
+import io.opentracing.Span;
+import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.ozone.OFSPath;
@@ -239,7 +242,12 @@ public class BasicRootedOzoneFileSystem extends FileSystem
{
statistics.incrementReadOps(1);
LOG.trace("open() path: {}", path);
final String key = pathToKey(path);
- return new FSDataInputStream(createFSInputStream(adapter.readFile(key)));
+ return TracingUtil.executeInNewSpan("ofs open",
+ () -> {
+ Span span = GlobalTracer.get().activeSpan();
+ span.setTag("path", key);
+ return new
FSDataInputStream(createFSInputStream(adapter.readFile(key)));
+ });
}
protected InputStream createFSInputStream(InputStream inputStream) {
@@ -263,7 +271,8 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
incrementCounter(Statistic.INVOCATION_CREATE, 1);
statistics.incrementWriteOps(1);
final String key = pathToKey(f);
- return createOutputStream(key, replication, overwrite, true);
+ return TracingUtil.executeInNewSpan("ofs create",
+ () -> createOutputStream(key, replication, overwrite, true));
}
@Override
@@ -277,8 +286,10 @@ public class BasicRootedOzoneFileSystem extends FileSystem
{
incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE, 1);
statistics.incrementWriteOps(1);
final String key = pathToKey(path);
- return createOutputStream(key,
- replication, flags.contains(CreateFlag.OVERWRITE), false);
+ return TracingUtil.executeInNewSpan("ofs createNonRecursive",
+ () ->
+ createOutputStream(key,
+ replication, flags.contains(CreateFlag.OVERWRITE), false));
}
private OutputStream selectOutputStream(String key, short replication,
@@ -374,6 +385,14 @@ public class BasicRootedOzoneFileSystem extends FileSystem
{
*/
@Override
public boolean rename(Path src, Path dst) throws IOException {
+ return TracingUtil.executeInNewSpan("ofs rename",
+ () -> renameInSpan(src, dst));
+ }
+
+ private boolean renameInSpan(Path src, Path dst) throws IOException {
+ Span span = GlobalTracer.get().activeSpan();
+ span.setTag("src", src.toString())
+ .setTag("dst", dst.toString());
incrementCounter(Statistic.INVOCATION_RENAME, 1);
statistics.incrementWriteOps(1);
if (src.equals(dst)) {
@@ -526,8 +545,8 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
@Override
public Path createSnapshot(Path path, String snapshotName)
throws IOException {
- String snapshot = getAdapter()
- .createSnapshot(pathToKey(path), snapshotName);
+ String snapshot = TracingUtil.executeInNewSpan("ofs createSnapshot",
+ () -> getAdapter().createSnapshot(pathToKey(path), snapshotName));
return new Path(OzoneFSUtils.trimPathToDepth(path, PATH_DEPTH_TO_BUCKET),
OM_SNAPSHOT_INDICATOR + OZONE_URI_DELIMITER + snapshot);
}
@@ -541,7 +560,8 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
@Override
public void deleteSnapshot(Path path, String snapshotName)
throws IOException {
- adapter.deleteSnapshot(pathToKey(path), snapshotName);
+ TracingUtil.executeInNewSpan("ofs deleteSnapshot",
+ () -> adapter.deleteSnapshot(pathToKey(path), snapshotName));
}
private class DeleteIterator extends OzoneListingIterator {
@@ -672,6 +692,11 @@ public class BasicRootedOzoneFileSystem extends FileSystem
{
*/
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
+ return TracingUtil.executeInNewSpan("ofs delete",
+ () -> deleteInSpan(f, recursive));
+ }
+
+ private boolean deleteInSpan(Path f, boolean recursive) throws IOException {
incrementCounter(Statistic.INVOCATION_DELETE, 1);
statistics.incrementWriteOps(1);
LOG.debug("Delete path {} - recursive {}", f, recursive);
@@ -889,7 +914,8 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
@Override
public FileStatus[] listStatus(Path f) throws IOException {
- return convertFileStatusArr(listStatusAdapter(f));
+ return TracingUtil.executeInNewSpan("ofs listStatus",
+ () -> convertFileStatusArr(listStatusAdapter(f)));
}
private FileStatus[] convertFileStatusArr(
@@ -946,7 +972,8 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
@Override
public Token<?> getDelegationToken(String renewer) throws IOException {
- return adapter.getDelegationToken(renewer);
+ return TracingUtil.executeInNewSpan("ofs getDelegationToken",
+ () -> adapter.getDelegationToken(renewer));
}
/**
@@ -1014,7 +1041,8 @@ public class BasicRootedOzoneFileSystem extends
FileSystem {
if (isEmpty(key)) {
return false;
}
- return mkdir(f);
+ return TracingUtil.executeInNewSpan("ofs mkdirs",
+ () -> mkdir(f));
}
@Override
@@ -1025,7 +1053,8 @@ public class BasicRootedOzoneFileSystem extends
FileSystem {
@Override
public FileStatus getFileStatus(Path f) throws IOException {
- return convertFileStatus(getFileStatusAdapter(f));
+ return TracingUtil.executeInNewSpan("ofs getFileStatus",
+ () -> convertFileStatus(getFileStatusAdapter(f)));
}
public FileStatusAdapter getFileStatusAdapter(Path f) throws IOException {
@@ -1096,7 +1125,8 @@ public class BasicRootedOzoneFileSystem extends
FileSystem {
public FileChecksum getFileChecksum(Path f, long length) throws IOException {
incrementCounter(Statistic.INVOCATION_GET_FILE_CHECKSUM);
String key = pathToKey(f);
- return adapter.getFileChecksum(key, length);
+ return TracingUtil.executeInNewSpan("ofs getFileChecksum",
+ () -> adapter.getFileChecksum(key, length));
}
@Override
@@ -1508,6 +1538,11 @@ public class BasicRootedOzoneFileSystem extends
FileSystem {
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
+ return TracingUtil.executeInNewSpan("ofs getContentSummary",
+ () -> getContentSummaryInSpan(f));
+ }
+
+ private ContentSummary getContentSummaryInSpan(Path f) throws IOException {
FileStatusAdapter status = getFileStatusAdapter(f);
if (status.isFile()) {
@@ -1583,7 +1618,8 @@ public class BasicRootedOzoneFileSystem extends
FileSystem {
if (key.equals("NONE")) {
throw new FileNotFoundException("File not found. path /NONE.");
}
- adapter.setTimes(key, mtime, atime);
+ TracingUtil.executeInNewSpan("ofs setTimes",
+ () -> adapter.setTimes(key, mtime, atime));
}
protected boolean setSafeModeUtil(SafeModeAction action,
@@ -1595,6 +1631,7 @@ public class BasicRootedOzoneFileSystem extends
FileSystem {
statistics.incrementWriteOps(1);
}
LOG.trace("setSafeMode() action:{}", action);
- return getAdapter().setSafeMode(action, isChecked);
+ return TracingUtil.executeInNewSpan("ofs setSafeMode",
+ () -> getAdapter().setSafeMode(action, isChecked));
}
}
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
index 918640799c..35ee20d56c 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
@@ -23,6 +23,9 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
+import io.opentracing.Scope;
+import io.opentracing.Span;
+import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
@@ -30,6 +33,7 @@ import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
/**
* The input stream for Ozone file system.
@@ -52,25 +56,40 @@ public class OzoneFSInputStream extends FSInputStream
@Override
public int read() throws IOException {
- int byteRead = inputStream.read();
- if (statistics != null && byteRead >= 0) {
- statistics.incrementBytesRead(1);
+ Span span = GlobalTracer.get()
+ .buildSpan("OzoneFSInputStream.read").start();
+ try (Scope scope = GlobalTracer.get().activateSpan(span)) {
+ int byteRead = inputStream.read();
+ if (statistics != null && byteRead >= 0) {
+ statistics.incrementBytesRead(1);
+ }
+ return byteRead;
+ } finally {
+ span.finish();
}
- return byteRead;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
- int bytesRead = inputStream.read(b, off, len);
- if (statistics != null && bytesRead >= 0) {
- statistics.incrementBytesRead(bytesRead);
+ Span span = GlobalTracer.get()
+ .buildSpan("OzoneFSInputStream.read").start();
+ try (Scope scope = GlobalTracer.get().activateSpan(span)) {
+ span.setTag("offset", off)
+ .setTag("length", len);
+ int bytesRead = inputStream.read(b, off, len);
+ if (statistics != null && bytesRead >= 0) {
+ statistics.incrementBytesRead(bytesRead);
+ }
+ return bytesRead;
+ } finally {
+ span.finish();
}
- return bytesRead;
}
@Override
public synchronized void close() throws IOException {
- inputStream.close();
+ TracingUtil.executeInNewSpan("OzoneFSInputStream.close",
+ inputStream::close);
}
@Override
@@ -101,6 +120,11 @@ public class OzoneFSInputStream extends FSInputStream
*/
@Override
public int read(ByteBuffer buf) throws IOException {
+ return TracingUtil.executeInNewSpan("OzoneFSInputStream.read(ByteBuffer)",
+ () -> readInTrace(buf));
+ }
+
+ private int readInTrace(ByteBuffer buf) throws IOException {
if (buf.isReadOnly()) {
throw new ReadOnlyBufferException();
}
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
index 141a404694..c5f62d6f68 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
@@ -18,7 +18,10 @@
package org.apache.hadoop.fs.ozone;
+import io.opentracing.Span;
+import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import java.io.IOException;
@@ -42,17 +45,24 @@ public class OzoneFSOutputStream extends OutputStream
@Override
public void write(int b) throws IOException {
- outputStream.write(b);
+ TracingUtil.executeInNewSpan("OzoneFSOutputStream.write",
+ () -> outputStream.write(b));
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
- outputStream.write(b, off, len);
+ TracingUtil.executeInNewSpan("OzoneFSOutputStream.write",
+ () -> {
+ Span span = GlobalTracer.get().activeSpan();
+ span.setTag("length", len);
+ outputStream.write(b, off, len);
+ });
}
@Override
public synchronized void flush() throws IOException {
- outputStream.flush();
+ TracingUtil.executeInNewSpan("OzoneFSOutputStream.flush",
+ outputStream::flush);
}
@Override
@@ -67,7 +77,8 @@ public class OzoneFSOutputStream extends OutputStream
@Override
public void hsync() throws IOException {
- outputStream.hsync();
+ TracingUtil.executeInNewSpan("OzoneFSOutputStream.hsync",
+ outputStream::hsync);
}
protected OzoneOutputStream getWrappedOutputStream() {
diff --git
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 7561e20a87..c377128d29 100644
---
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.ozone;
+import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.fs.LeaseRecoverable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeMode;
@@ -29,6 +30,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException;
@@ -124,6 +126,11 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
@Override
public boolean recoverLease(final Path f) throws IOException {
+ return TracingUtil.executeInNewSpan("ofs recoverLease",
+ () -> recoverLeaseTraced(f));
+ }
+ private boolean recoverLeaseTraced(final Path f) throws IOException {
+ GlobalTracer.get().activeSpan().setTag("path", f.toString());
statistics.incrementWriteOps(1);
LOG.trace("recoverLease() path:{}", f);
Path qualifiedPath = makeQualified(f);
@@ -133,6 +140,12 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
@Override
public boolean isFileClosed(Path f) throws IOException {
+ return TracingUtil.executeInNewSpan("ofs isFileClosed",
+ () -> isFileClosedTraced(f));
+ }
+
+ private boolean isFileClosedTraced(Path f) throws IOException {
+ GlobalTracer.get().activeSpan().setTag("path", f.toString());
statistics.incrementWriteOps(1);
LOG.trace("isFileClosed() path:{}", f);
Path qualifiedPath = makeQualified(f);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]