steveloughran commented on a change in pull request #2353:
URL: https://github.com/apache/hadoop/pull/2353#discussion_r504849729



##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -279,7 +289,15 @@ int readRemote(long position, byte[] b, int offset, int 
length) throws IOExcepti
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", 
"read")) {
       LOG.trace("Trigger client.read for path={} position={} offset={} 
length={}", path, position, offset, length);
-      op = client.read(path, position, b, offset, length, tolerateOobAppends ? 
"*" : eTag, cachedSasToken.get());
+      if (ioStatistics != null) {

Review comment:
       the methods in IOStatisticsBinding now all take a null 
DurationTrackerFactory, so you don't need the two branches here any more

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
##########
@@ -73,7 +73,36 @@
   READ_THROTTLES("read_throttles",
       "Total number of times a read operation is throttled."),
   WRITE_THROTTLES("write_throttles",
-      "Total number of times a write operation is throttled.");
+      "Total number of times a write operation is throttled."),
+
+  //OutputStream statistics.
+  BYTES_TO_UPLOAD("bytes_upload",

Review comment:
       these should all be in hadoop-common *where possible*, so that we have 
consistent names everywhere for aggregation

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -73,6 +78,8 @@
   private long bytesFromReadAhead; // bytes read from readAhead; for testing
   private long bytesFromRemoteRead; // bytes read remotely; for testing
 
+  private IOStatistics ioStatistics;

Review comment:
       final?

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
##########
@@ -90,9 +107,7 @@ public void seek(long seekTo, long currentPos) {
    */
   @Override
   public void bytesRead(long bytes) {
-    if (bytes > 0) {
-      bytesRead += bytes;
-    }
+    ioStatisticsStore.incrementCounter(STREAM_READ_BYTES, bytes);

Review comment:
       One thing to consider here is the cost of the map lookup on every IOP. 
You can ask the IOStatisticsStore for a reference to the atomic counter, and 
use that direct. I'm doing that for the output stream, reviewing this patch 
makes me realise I should be doing it for read as well. at least the read byte 
counters which are incremented on every read.
   
         bytesUploaded = store.getCounterReference(
             STREAM_WRITE_TOTAL_DATA.getSymbol());
         bytesWritten = store.getCounterReference(
             StreamStatisticNames.STREAM_WRITE_BYTES);
             
           

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -376,27 +384,31 @@ private synchronized void writeCurrentBufferToService() 
throws IOException {
     position += bytesLength;
 
     if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
-      long start = System.currentTimeMillis();
-      waitForTaskToComplete();
-      outputStreamStatistics.timeSpentTaskWait(start, 
System.currentTimeMillis());
+      //Tracking time spent on waiting for task to complete.
+      try (DurationTracker ignored = 
outputStreamStatistics.timeSpentTaskWait()) {
+        waitForTaskToComplete();
+      }
     }
 
-    final Future<Void> job = completionService.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        AbfsPerfTracker tracker = client.getAbfsPerfTracker();
-        try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
-                "writeCurrentBufferToService", "append")) {
-          AbfsRestOperation op = client.append(path, offset, bytes, 0,
-                  bytesLength, cachedSasToken.get(), false);
-          cachedSasToken.update(op.getSasToken());
-          perfInfo.registerResult(op.getResult());
-          byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
-          perfInfo.registerSuccess(true);
-          return null;
-        }
-      }
-    });
+    final Future<Void> job =
+        completionService.submit(IOStatisticsBinding
+            .trackDurationOfCallable((IOStatisticsStore) ioStatistics,
+                AbfsStatistic.TIME_SPENT_ON_PUT_REQUEST.getStatName(),
+                () -> {
+                  AbfsPerfTracker tracker = client.getAbfsPerfTracker();

Review comment:
       given we are wrapping callables with callables, maybe the Abfs Perf 
tracker could join in. Not needed for this patch, but later...

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
##########
@@ -160,16 +188,7 @@ public long getWriteCurrentBufferOperations() {
   @Override public String toString() {
     final StringBuilder outputStreamStats = new StringBuilder(
         "OutputStream Statistics{");
-    outputStreamStats.append(", bytes_upload=").append(bytesToUpload);
-    outputStreamStats.append(", bytes_upload_successfully=")
-        .append(bytesUploadSuccessful);
-    outputStreamStats.append(", bytes_upload_failed=")
-        .append(bytesUploadFailed);
-    outputStreamStats.append(", time_spent_task_wait=")
-        .append(timeSpentOnTaskWait);
-    outputStreamStats.append(", queue_shrunk_ops=").append(queueShrunkOps);
-    outputStreamStats.append(", write_current_buffer_ops=")
-        .append(writeCurrentBufferOperations);
+    
outputStreamStats.append(IOStatisticsLogging.ioStatisticsSourceToString(ioStatisticsStore));

Review comment:
       ioStatisticsStore.toString() does that too; what you get through the 
logging API is resilience to failures

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
##########
@@ -161,55 +170,92 @@ public void remoteBytesRead(long bytes) {
    */
   @Override
   public void remoteReadOperation() {
-    remoteReadOperations++;
+    ioStatisticsStore.incrementCounter(getStatName(REMOTE_READ_OP));
   }
 
+  /**
+   * Getter for IOStatistics instance used.
+   * @return IOStatisticsStore instance which extends IOStatistics.
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return ioStatisticsStore;
+  }
+
+  @VisibleForTesting
   public long getSeekOperations() {
-    return seekOperations;
+    return ioStatisticsStore.counters().get(STREAM_READ_SEEK_OPERATIONS);
   }
 
+  @VisibleForTesting
   public long getForwardSeekOperations() {
-    return forwardSeekOperations;
+    return 
ioStatisticsStore.counters().get(STREAM_READ_SEEK_FORWARD_OPERATIONS);
   }
 
+  @VisibleForTesting
   public long getBackwardSeekOperations() {
-    return backwardSeekOperations;
+    return 
ioStatisticsStore.counters().get(STREAM_READ_SEEK_BACKWARD_OPERATIONS);
   }
 
+  @VisibleForTesting
   public long getBytesRead() {
-    return bytesRead;
+    return ioStatisticsStore.counters().get(STREAM_READ_BYTES);
   }
 
+  @VisibleForTesting
   public long getBytesSkippedOnSeek() {
-    return bytesSkippedOnSeek;
+    return ioStatisticsStore.counters().get(STREAM_READ_SEEK_BYTES_SKIPPED);
   }
 
+  @VisibleForTesting
   public long getBytesBackwardsOnSeek() {
-    return bytesBackwardsOnSeek;
+    return ioStatisticsStore.counters().get(STREAM_READ_SEEK_BYTES_BACKWARDS);
   }
 
+  @VisibleForTesting
   public long getSeekInBuffer() {
-    return seekInBuffer;
+    return ioStatisticsStore.counters().get(getStatName(SEEK_IN_BUFFER));
+
   }
 
+  @VisibleForTesting
   public long getReadOperations() {
-    return readOperations;
+    return ioStatisticsStore.counters().get(STREAM_READ_OPERATIONS);
   }
 
+  @VisibleForTesting
   public long getBytesReadFromBuffer() {
-    return bytesReadFromBuffer;
+    return ioStatisticsStore.counters().get(getStatName(BYTES_READ_BUFFER));
   }
 
+  @VisibleForTesting
   public long getRemoteReadOperations() {
-    return remoteReadOperations;
+    return ioStatisticsStore.counters().get(getStatName(REMOTE_READ_OP));
   }
 
+  @VisibleForTesting
   public long getReadAheadBytesRead() {
-    return readAheadBytesRead;
+    return 
ioStatisticsStore.counters().get(getStatName(READ_AHEAD_BYTES_READ));
   }
 
+  @VisibleForTesting
   public long getRemoteBytesRead() {
-    return remoteBytesRead;
+    return ioStatisticsStore.counters().get(getStatName(REMOTE_BYTES_READ));
+  }
+
+  @VisibleForTesting
+  public double getActionHttpGetRequest() {

Review comment:
       even for testing, add a comment. Here's actually the mean value, isn't 
it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to