This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cedb4596a3 HDDS-10268. [hsync] Add OpenTracing traces to client side 
read path (#6262)
cedb4596a3 is described below

commit cedb4596a328d6f4c5e5fd336fe5a8b55b37d4ae
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Wed Apr 10 09:38:20 2024 -0700

    HDDS-10268. [hsync] Add OpenTracing traces to client side read path (#6262)
---
 .../hadoop/hdds/scm/ContainerClientMetrics.java    | 79 +++++++++++++++++++++-
 .../hdds/scm/storage/ContainerProtocolCalls.java   | 76 +++++++++++++++++++--
 .../apache/hadoop/hdds/tracing/TracingUtil.java    | 10 +++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  2 +
 .../fs/ozone/BasicRootedOzoneFileSystem.java       | 65 ++++++++++++++----
 .../apache/hadoop/fs/ozone/OzoneFSInputStream.java | 42 +++++++++---
 .../hadoop/fs/ozone/OzoneFSOutputStream.java       | 19 ++++--
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     | 13 ++++
 8 files changed, 271 insertions(+), 35 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
index d51dfa4163..422943fff0 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.Interns;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.ozone.OzoneConsts;
 
 import java.util.Map;
@@ -51,6 +52,11 @@ public final class ContainerClientMetrics {
   private MutableCounterLong totalWriteChunkCalls;
   @Metric
   private MutableCounterLong totalWriteChunkBytes;
+  private MutableQuantiles[] listBlockLatency;
+  private MutableQuantiles[] getBlockLatency;
+  private MutableQuantiles[] getCommittedBlockLengthLatency;
+  private MutableQuantiles[] readChunkLatency;
+  private MutableQuantiles[] getSmallFileLatency;
   private final Map<PipelineID, MutableCounterLong> writeChunkCallsByPipeline;
   private final Map<PipelineID, MutableCounterLong> writeChunkBytesByPipeline;
   private final Map<UUID, MutableCounterLong> writeChunksCallsByLeaders;
@@ -84,6 +90,36 @@ public final class ContainerClientMetrics {
     writeChunkCallsByPipeline = new ConcurrentHashMap<>();
     writeChunkBytesByPipeline = new ConcurrentHashMap<>();
     writeChunksCallsByLeaders = new ConcurrentHashMap<>();
+
+    listBlockLatency = new MutableQuantiles[3];
+    getBlockLatency = new MutableQuantiles[3];
+    getCommittedBlockLengthLatency = new MutableQuantiles[3];
+    readChunkLatency = new MutableQuantiles[3];
+    getSmallFileLatency = new MutableQuantiles[3];
+    int[] intervals = {60, 300, 900};
+    for (int i = 0; i < intervals.length; i++) {
+      int interval = intervals[i];
+      listBlockLatency[i] = registry
+          .newQuantiles("listBlockLatency" + interval
+                  + "s", "ListBlock latency in microseconds", "ops",
+              "latency", interval);
+      getBlockLatency[i] = registry
+          .newQuantiles("getBlockLatency" + interval
+                  + "s", "GetBlock latency in microseconds", "ops",
+              "latency", interval);
+      getCommittedBlockLengthLatency[i] = registry
+          .newQuantiles("getCommittedBlockLengthLatency" + interval
+                  + "s", "GetCommittedBlockLength latency in microseconds",
+              "ops", "latency", interval);
+      readChunkLatency[i] = registry
+          .newQuantiles("readChunkLatency" + interval
+                  + "s", "ReadChunk latency in microseconds", "ops",
+              "latency", interval);
+      getSmallFileLatency[i] = registry
+          .newQuantiles("getSmallFileLatency" + interval
+                  + "s", "GetSmallFile latency in microseconds", "ops",
+              "latency", interval);
+    }
   }
 
   public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) {
@@ -111,7 +147,48 @@ public final class ContainerClientMetrics {
     totalWriteChunkBytes.incr(chunkSizeBytes);
   }
 
-  MutableCounterLong getTotalWriteChunkBytes() {
+  public void addListBlockLatency(long latency) {
+    for (MutableQuantiles q : listBlockLatency) {
+      if (q != null) {
+        q.add(latency);
+      }
+    }
+  }
+
+  public void addGetBlockLatency(long latency) {
+    for (MutableQuantiles q : getBlockLatency) {
+      if (q != null) {
+        q.add(latency);
+      }
+    }
+  }
+
+  public void addGetCommittedBlockLengthLatency(long latency) {
+    for (MutableQuantiles q : getCommittedBlockLengthLatency) {
+      if (q != null) {
+        q.add(latency);
+      }
+    }
+  }
+
+  public void addReadChunkLatency(long latency) {
+    for (MutableQuantiles q : readChunkLatency) {
+      if (q != null) {
+        q.add(latency);
+      }
+    }
+  }
+
+  public void addGetSmallFileLatency(long latency) {
+    for (MutableQuantiles q : getSmallFileLatency) {
+      if (q != null) {
+        q.add(latency);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getTotalWriteChunkBytes() {
     return totalWriteChunkBytes;
   }
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 03b7844cc9..72754d1f1c 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -29,6 +29,9 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
+import io.opentracing.Scope;
+import io.opentracing.Span;
+import io.opentracing.util.GlobalTracer;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -64,6 +67,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenExcep
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.security.token.Token;
@@ -128,6 +132,10 @@ public final class ContainerProtocolCalls  {
     if (token != null) {
       builder.setEncodedToken(token.encodeToUrlString());
     }
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      builder.setTraceID(traceId);
+    }
 
     ContainerCommandRequestProto request = builder.build();
     ContainerCommandResponseProto response =
@@ -146,14 +154,17 @@ public final class ContainerProtocolCalls  {
       try {
         return op.apply(d);
       } catch (IOException e) {
+        Span span = GlobalTracer.get().activeSpan();
         if (e instanceof StorageContainerException) {
           StorageContainerException sce = (StorageContainerException)e;
           // Block token expired. There's no point retrying other DN.
           // Throw the exception to request a new block token right away.
           if (sce.getResult() == BLOCK_TOKEN_VERIFICATION_FAILED) {
+            span.log("block token verification failed at DN " + d);
             throw e;
           }
         }
+        span.log("failed to connect to DN " + d);
         excluded.add(d);
         if (excluded.size() < pipeline.size()) {
           LOG.warn(toErrorMessage.apply(d)
@@ -211,6 +222,10 @@ public final class ContainerProtocolCalls  {
       List<Validator> validators,
       ContainerCommandRequestProto.Builder builder,
       DatanodeDetails datanode) throws IOException {
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      builder.setTraceID(traceId);
+    }
     final ContainerCommandRequestProto request = builder
         .setDatanodeUuid(datanode.getUuidString()).build();
     ContainerCommandResponseProto response =
@@ -246,6 +261,10 @@ public final class ContainerProtocolCalls  {
     if (token != null) {
       builder.setEncodedToken(token.encodeToUrlString());
     }
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      builder.setTraceID(traceId);
+    }
     ContainerCommandRequestProto request = builder.build();
     ContainerCommandResponseProto response =
         xceiverClient.sendCommand(request, getValidatorList());
@@ -319,10 +338,19 @@ public final class ContainerProtocolCalls  {
       builder.setEncodedToken(token.encodeToUrlString());
     }
 
-    return tryEachDatanode(xceiverClient.getPipeline(),
-        d -> readChunk(xceiverClient, chunk, blockID,
-            validators, builder, d),
-        d -> toErrorMessage(chunk, blockID, d));
+    Span span = GlobalTracer.get()
+        .buildSpan("readChunk").start();
+    try (Scope ignored = GlobalTracer.get().activateSpan(span)) {
+      span.setTag("offset", chunk.getOffset())
+          .setTag("length", chunk.getLen())
+          .setTag("block", blockID.toString());
+      return tryEachDatanode(xceiverClient.getPipeline(),
+          d -> readChunk(xceiverClient, chunk, blockID,
+              validators, builder, d),
+          d -> toErrorMessage(chunk, blockID, d));
+    } finally {
+      span.finish();
+    }
   }
 
   private static ContainerProtos.ReadChunkResponseProto readChunk(
@@ -330,10 +358,15 @@ public final class ContainerProtocolCalls  {
       List<Validator> validators,
       ContainerCommandRequestProto.Builder builder,
       DatanodeDetails d) throws IOException {
-    final ContainerCommandRequestProto request = builder
-        .setDatanodeUuid(d.getUuidString()).build();
+    ContainerCommandRequestProto.Builder requestBuilder = builder
+        .setDatanodeUuid(d.getUuidString());
+    Span span = GlobalTracer.get().activeSpan();
+    String traceId = TracingUtil.exportSpan(span);
+    if (traceId != null) {
+      requestBuilder = requestBuilder.setTraceID(traceId);
+    }
     ContainerCommandResponseProto reply =
-        xceiverClient.sendCommand(request, validators);
+        xceiverClient.sendCommand(requestBuilder.build(), validators);
     final ReadChunkResponseProto response = reply.getReadChunk();
     final long readLen = getLen(response);
     if (readLen != chunk.getLen()) {
@@ -515,6 +548,11 @@ public final class ContainerProtocolCalls  {
     if (encodedToken != null) {
       request.setEncodedToken(encodedToken);
     }
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      request.setTraceID(traceId);
+    }
+
     request.setCmdType(ContainerProtos.Type.CreateContainer);
     request.setContainerID(containerID);
     request.setCreateContainer(createRequest.build());
@@ -544,6 +582,10 @@ public final class ContainerProtocolCalls  {
     if (encodedToken != null) {
       request.setEncodedToken(encodedToken);
     }
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      request.setTraceID(traceId);
+    }
     client.sendCommand(request.build(), getValidatorList());
   }
 
@@ -566,6 +608,10 @@ public final class ContainerProtocolCalls  {
     if (encodedToken != null) {
       request.setEncodedToken(encodedToken);
     }
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      request.setTraceID(traceId);
+    }
     client.sendCommand(request.build(), getValidatorList());
   }
 
@@ -589,6 +635,10 @@ public final class ContainerProtocolCalls  {
     if (encodedToken != null) {
       request.setEncodedToken(encodedToken);
     }
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      request.setTraceID(traceId);
+    }
     ContainerCommandResponseProto response =
         client.sendCommand(request.build(), getValidatorList());
 
@@ -624,6 +674,10 @@ public final class ContainerProtocolCalls  {
     if (token != null) {
       builder.setEncodedToken(token.encodeToUrlString());
     }
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      builder.setTraceID(traceId);
+    }
     ContainerCommandRequestProto request = builder.build();
     ContainerCommandResponseProto response =
         client.sendCommand(request, getValidatorList());
@@ -694,6 +748,10 @@ public final class ContainerProtocolCalls  {
     if (token != null) {
       builder.setEncodedToken(token.encodeToUrlString());
     }
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      builder.setTraceID(traceId);
+    }
     ContainerCommandRequestProto request = builder.build();
     Map<DatanodeDetails, ContainerCommandResponseProto> responses =
             xceiverClient.sendCommandOnAllNodes(request);
@@ -719,6 +777,10 @@ public final class ContainerProtocolCalls  {
     if (encodedToken != null) {
       request.setEncodedToken(encodedToken);
     }
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      request.setTraceID(traceId);
+    }
     Map<DatanodeDetails, ContainerCommandResponseProto> responses =
         client.sendCommandOnAllNodes(request.build());
     for (Map.Entry<DatanodeDetails, ContainerCommandResponseProto> entry :
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java
index b968d40723..29bd847319 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java
@@ -139,6 +139,16 @@ public final class TracingUtil {
         ScmConfigKeys.HDDS_TRACING_ENABLED_DEFAULT);
   }
 
+  /**
+   * Execute {@code runnable} inside an activated new span.
+   */
+  public static <E extends Exception> void executeInNewSpan(String spanName,
+      CheckedRunnable<E> runnable) throws E {
+    Span span = GlobalTracer.get()
+        .buildSpan(spanName).start();
+    executeInSpan(span, runnable);
+  }
+
   /**
    * Execute {@code supplier} inside an activated new span.
    */
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index c990d25735..0806ffb847 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -321,6 +321,8 @@ public class RpcClient implements ClientProtocol {
     this.blockInputStreamFactory = BlockInputStreamFactoryImpl
         .getInstance(byteBufferPool, ecReconstructExecutor);
     this.clientMetrics = ContainerClientMetrics.acquire();
+
+    TracingUtil.initTracing("client", conf);
   }
 
   public XceiverClientFactory getXceiverClientManager() {
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index 1fcb1554b6..3ba291ae0f 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.fs.ozone;
 
 import com.google.common.base.Preconditions;
+import io.opentracing.Span;
+import io.opentracing.util.GlobalTracer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CreateFlag;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.ozone.OFSPath;
@@ -239,7 +242,12 @@ public class BasicRootedOzoneFileSystem extends FileSystem 
{
     statistics.incrementReadOps(1);
     LOG.trace("open() path: {}", path);
     final String key = pathToKey(path);
-    return new FSDataInputStream(createFSInputStream(adapter.readFile(key)));
+    return TracingUtil.executeInNewSpan("ofs open",
+        () -> {
+          Span span = GlobalTracer.get().activeSpan();
+          span.setTag("path", key);
+          return new 
FSDataInputStream(createFSInputStream(adapter.readFile(key)));
+        });
   }
 
   protected InputStream createFSInputStream(InputStream inputStream) {
@@ -263,7 +271,8 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
     incrementCounter(Statistic.INVOCATION_CREATE, 1);
     statistics.incrementWriteOps(1);
     final String key = pathToKey(f);
-    return createOutputStream(key, replication, overwrite, true);
+    return TracingUtil.executeInNewSpan("ofs create",
+        () -> createOutputStream(key, replication, overwrite, true));
   }
 
   @Override
@@ -277,8 +286,10 @@ public class BasicRootedOzoneFileSystem extends FileSystem 
{
     incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE, 1);
     statistics.incrementWriteOps(1);
     final String key = pathToKey(path);
-    return createOutputStream(key,
-        replication, flags.contains(CreateFlag.OVERWRITE), false);
+    return TracingUtil.executeInNewSpan("ofs createNonRecursive",
+        () ->
+            createOutputStream(key,
+                replication, flags.contains(CreateFlag.OVERWRITE), false));
   }
 
   private OutputStream selectOutputStream(String key, short replication,
@@ -374,6 +385,14 @@ public class BasicRootedOzoneFileSystem extends FileSystem 
{
    */
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
+    return TracingUtil.executeInNewSpan("ofs rename",
+        () -> renameInSpan(src, dst));
+  }
+
+  private boolean renameInSpan(Path src, Path dst) throws IOException {
+    Span span = GlobalTracer.get().activeSpan();
+    span.setTag("src", src.toString())
+        .setTag("dst", dst.toString());
     incrementCounter(Statistic.INVOCATION_RENAME, 1);
     statistics.incrementWriteOps(1);
     if (src.equals(dst)) {
@@ -526,8 +545,8 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
   @Override
   public Path createSnapshot(Path path, String snapshotName)
           throws IOException {
-    String snapshot = getAdapter()
-        .createSnapshot(pathToKey(path), snapshotName);
+    String snapshot = TracingUtil.executeInNewSpan("ofs createSnapshot",
+        () -> getAdapter().createSnapshot(pathToKey(path), snapshotName));
     return new Path(OzoneFSUtils.trimPathToDepth(path, PATH_DEPTH_TO_BUCKET),
         OM_SNAPSHOT_INDICATOR + OZONE_URI_DELIMITER + snapshot);
   }
@@ -541,7 +560,8 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
   @Override
   public void deleteSnapshot(Path path, String snapshotName)
       throws IOException {
-    adapter.deleteSnapshot(pathToKey(path), snapshotName);
+    TracingUtil.executeInNewSpan("ofs deleteSnapshot",
+        () -> adapter.deleteSnapshot(pathToKey(path), snapshotName));
   }
 
   private class DeleteIterator extends OzoneListingIterator {
@@ -672,6 +692,11 @@ public class BasicRootedOzoneFileSystem extends FileSystem 
{
    */
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
+    return TracingUtil.executeInNewSpan("ofs delete",
+        () -> deleteInSpan(f, recursive));
+  }
+
+  private boolean deleteInSpan(Path f, boolean recursive) throws IOException {
     incrementCounter(Statistic.INVOCATION_DELETE, 1);
     statistics.incrementWriteOps(1);
     LOG.debug("Delete path {} - recursive {}", f, recursive);
@@ -889,7 +914,8 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
 
   @Override
   public FileStatus[] listStatus(Path f) throws IOException {
-    return convertFileStatusArr(listStatusAdapter(f));
+    return TracingUtil.executeInNewSpan("ofs listStatus",
+        () -> convertFileStatusArr(listStatusAdapter(f)));
   }
 
   private FileStatus[] convertFileStatusArr(
@@ -946,7 +972,8 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
 
   @Override
   public Token<?> getDelegationToken(String renewer) throws IOException {
-    return adapter.getDelegationToken(renewer);
+    return TracingUtil.executeInNewSpan("ofs getDelegationToken",
+        () -> adapter.getDelegationToken(renewer));
   }
 
   /**
@@ -1014,7 +1041,8 @@ public class BasicRootedOzoneFileSystem extends 
FileSystem {
     if (isEmpty(key)) {
       return false;
     }
-    return mkdir(f);
+    return TracingUtil.executeInNewSpan("ofs mkdirs",
+        () -> mkdir(f));
   }
 
   @Override
@@ -1025,7 +1053,8 @@ public class BasicRootedOzoneFileSystem extends 
FileSystem {
 
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {    
-    return convertFileStatus(getFileStatusAdapter(f));
+    return TracingUtil.executeInNewSpan("ofs getFileStatus",
+        () -> convertFileStatus(getFileStatusAdapter(f)));
   }
 
   public FileStatusAdapter getFileStatusAdapter(Path f) throws IOException {
@@ -1096,7 +1125,8 @@ public class BasicRootedOzoneFileSystem extends 
FileSystem {
   public FileChecksum getFileChecksum(Path f, long length) throws IOException {
     incrementCounter(Statistic.INVOCATION_GET_FILE_CHECKSUM);
     String key = pathToKey(f);
-    return adapter.getFileChecksum(key, length);
+    return TracingUtil.executeInNewSpan("ofs getFileChecksum",
+        () -> adapter.getFileChecksum(key, length));
   }
 
   @Override
@@ -1508,6 +1538,11 @@ public class BasicRootedOzoneFileSystem extends 
FileSystem {
 
   @Override
   public ContentSummary getContentSummary(Path f) throws IOException {
+    return TracingUtil.executeInNewSpan("ofs getContentSummary",
+        () -> getContentSummaryInSpan(f));
+  }
+
+  private ContentSummary getContentSummaryInSpan(Path f) throws IOException {
     FileStatusAdapter status = getFileStatusAdapter(f);
 
     if (status.isFile()) {
@@ -1583,7 +1618,8 @@ public class BasicRootedOzoneFileSystem extends 
FileSystem {
     if (key.equals("NONE")) {
       throw new FileNotFoundException("File not found. path /NONE.");
     }
-    adapter.setTimes(key, mtime, atime);
+    TracingUtil.executeInNewSpan("ofs setTimes",
+        () -> adapter.setTimes(key, mtime, atime));
   }
 
   protected boolean setSafeModeUtil(SafeModeAction action,
@@ -1595,6 +1631,7 @@ public class BasicRootedOzoneFileSystem extends 
FileSystem {
       statistics.incrementWriteOps(1);
     }
     LOG.trace("setSafeMode() action:{}", action);
-    return getAdapter().setSafeMode(action, isChecked);
+    return TracingUtil.executeInNewSpan("ofs setSafeMode",
+        () -> getAdapter().setSafeMode(action, isChecked));
   }
 }
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
index 918640799c..35ee20d56c 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
@@ -23,6 +23,9 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.ReadOnlyBufferException;
 
+import io.opentracing.Scope;
+import io.opentracing.Span;
+import io.opentracing.util.GlobalTracer;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
@@ -30,6 +33,7 @@ import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
 
 /**
  * The input stream for Ozone file system.
@@ -52,25 +56,40 @@ public class OzoneFSInputStream extends FSInputStream
 
   @Override
   public int read() throws IOException {
-    int byteRead = inputStream.read();
-    if (statistics != null && byteRead >= 0) {
-      statistics.incrementBytesRead(1);
+    Span span = GlobalTracer.get()
+        .buildSpan("OzoneFSInputStream.read").start();
+    try (Scope scope = GlobalTracer.get().activateSpan(span)) {
+      int byteRead = inputStream.read();
+      if (statistics != null && byteRead >= 0) {
+        statistics.incrementBytesRead(1);
+      }
+      return byteRead;
+    } finally {
+      span.finish();
     }
-    return byteRead;
   }
 
   @Override
   public int read(byte[] b, int off, int len) throws IOException {
-    int bytesRead = inputStream.read(b, off, len);
-    if (statistics != null && bytesRead >= 0) {
-      statistics.incrementBytesRead(bytesRead);
+    Span span = GlobalTracer.get()
+        .buildSpan("OzoneFSInputStream.read").start();
+    try (Scope scope = GlobalTracer.get().activateSpan(span)) {
+      span.setTag("offset", off)
+          .setTag("length", len);
+      int bytesRead = inputStream.read(b, off, len);
+      if (statistics != null && bytesRead >= 0) {
+        statistics.incrementBytesRead(bytesRead);
+      }
+      return bytesRead;
+    } finally {
+      span.finish();
     }
-    return bytesRead;
   }
 
   @Override
   public synchronized void close() throws IOException {
-    inputStream.close();
+    TracingUtil.executeInNewSpan("OzoneFSInputStream.close",
+        inputStream::close);
   }
 
   @Override
@@ -101,6 +120,11 @@ public class OzoneFSInputStream extends FSInputStream
    */
   @Override
   public int read(ByteBuffer buf) throws IOException {
+    return TracingUtil.executeInNewSpan("OzoneFSInputStream.read(ByteBuffer)",
+        () -> readInTrace(buf));
+  }
+
+  private int readInTrace(ByteBuffer buf) throws IOException {
     if (buf.isReadOnly()) {
       throw new ReadOnlyBufferException();
     }
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
index 141a404694..c5f62d6f68 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.fs.ozone;
 
+import io.opentracing.Span;
+import io.opentracing.util.GlobalTracer;
 import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 
 import java.io.IOException;
@@ -42,17 +45,24 @@ public class OzoneFSOutputStream extends OutputStream
 
   @Override
   public void write(int b) throws IOException {
-    outputStream.write(b);
+    TracingUtil.executeInNewSpan("OzoneFSOutputStream.write",
+        () -> outputStream.write(b));
   }
 
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
-    outputStream.write(b, off, len);
+    TracingUtil.executeInNewSpan("OzoneFSOutputStream.write",
+        () -> {
+          Span span = GlobalTracer.get().activeSpan();
+          span.setTag("length", len);
+          outputStream.write(b, off, len);
+        });
   }
 
   @Override
   public synchronized void flush() throws IOException {
-    outputStream.flush();
+    TracingUtil.executeInNewSpan("OzoneFSOutputStream.flush",
+        outputStream::flush);
   }
 
   @Override
@@ -67,7 +77,8 @@ public class OzoneFSOutputStream extends OutputStream
 
   @Override
   public void hsync() throws IOException {
-    outputStream.hsync();
+    TracingUtil.executeInNewSpan("OzoneFSOutputStream.hsync",
+        outputStream::hsync);
   }
 
   protected OzoneOutputStream getWrappedOutputStream() {
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 7561e20a87..c377128d29 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.ozone;
 
+import io.opentracing.util.GlobalTracer;
 import org.apache.hadoop.fs.LeaseRecoverable;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.SafeMode;
@@ -29,6 +30,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 import java.io.IOException;
@@ -124,6 +126,11 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
 
   @Override
   public boolean recoverLease(final Path f) throws IOException {
+    return TracingUtil.executeInNewSpan("ofs recoverLease",
+        () -> recoverLeaseTraced(f));
+  }
+  private boolean recoverLeaseTraced(final Path f) throws IOException {
+    GlobalTracer.get().activeSpan().setTag("path", f.toString());
     statistics.incrementWriteOps(1);
     LOG.trace("recoverLease() path:{}", f);
     Path qualifiedPath = makeQualified(f);
@@ -133,6 +140,12 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
 
   @Override
   public boolean isFileClosed(Path f) throws IOException {
+    return TracingUtil.executeInNewSpan("ofs isFileClosed",
+        () -> isFileClosedTraced(f));
+  }
+
+  private boolean isFileClosedTraced(Path f) throws IOException {
+    GlobalTracer.get().activeSpan().setTag("path", f.toString());
     statistics.incrementWriteOps(1);
     LOG.trace("isFileClosed() path:{}", f);
     Path qualifiedPath = makeQualified(f);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to