steveloughran commented on a change in pull request #2353:
URL: https://github.com/apache/hadoop/pull/2353#discussion_r504849729
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -279,7 +289,15 @@ int readRemote(long position, byte[] b, int offset, int
length) throws IOExcepti
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote",
"read")) {
LOG.trace("Trigger client.read for path={} position={} offset={}
length={}", path, position, offset, length);
- op = client.read(path, position, b, offset, length, tolerateOobAppends ?
"*" : eTag, cachedSasToken.get());
+ if (ioStatistics != null) {
Review comment:
the methods in IOStatisticsBinding now all take a null
DurationTrackerFactory, so you don't need the two branches here any more
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
##########
@@ -73,7 +73,36 @@
READ_THROTTLES("read_throttles",
"Total number of times a read operation is throttled."),
WRITE_THROTTLES("write_throttles",
- "Total number of times a write operation is throttled.");
+ "Total number of times a write operation is throttled."),
+
+ //OutputStream statistics.
+ BYTES_TO_UPLOAD("bytes_upload",
Review comment:
these should all be in hadoop-common *where possible*, so that we have
consistent names everywhere for aggregation
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -73,6 +78,8 @@
private long bytesFromReadAhead; // bytes read from readAhead; for testing
private long bytesFromRemoteRead; // bytes read remotely; for testing
+ private IOStatistics ioStatistics;
Review comment:
final?
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
##########
@@ -90,9 +107,7 @@ public void seek(long seekTo, long currentPos) {
*/
@Override
public void bytesRead(long bytes) {
- if (bytes > 0) {
- bytesRead += bytes;
- }
+ ioStatisticsStore.incrementCounter(STREAM_READ_BYTES, bytes);
Review comment:
One thing to consider here is the cost of the map lookup on every IOP.
You can ask the IOStatisticsStore for a reference to the atomic counter, and
use that direct. I'm doing that for the output stream, reviewing this patch
makes me realise I should be doing it for read as well. at least the read byte
counters which are incremented on every read.
bytesUploaded = store.getCounterReference(
STREAM_WRITE_TOTAL_DATA.getSymbol());
bytesWritten = store.getCounterReference(
StreamStatisticNames.STREAM_WRITE_BYTES);
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -376,27 +384,31 @@ private synchronized void writeCurrentBufferToService()
throws IOException {
position += bytesLength;
if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
- long start = System.currentTimeMillis();
- waitForTaskToComplete();
- outputStreamStatistics.timeSpentTaskWait(start,
System.currentTimeMillis());
+ //Tracking time spent on waiting for task to complete.
+ try (DurationTracker ignored =
outputStreamStatistics.timeSpentTaskWait()) {
+ waitForTaskToComplete();
+ }
}
- final Future<Void> job = completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- AbfsPerfTracker tracker = client.getAbfsPerfTracker();
- try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
- "writeCurrentBufferToService", "append")) {
- AbfsRestOperation op = client.append(path, offset, bytes, 0,
- bytesLength, cachedSasToken.get(), false);
- cachedSasToken.update(op.getSasToken());
- perfInfo.registerResult(op.getResult());
- byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
- perfInfo.registerSuccess(true);
- return null;
- }
- }
- });
+ final Future<Void> job =
+ completionService.submit(IOStatisticsBinding
+ .trackDurationOfCallable((IOStatisticsStore) ioStatistics,
+ AbfsStatistic.TIME_SPENT_ON_PUT_REQUEST.getStatName(),
+ () -> {
+ AbfsPerfTracker tracker = client.getAbfsPerfTracker();
Review comment:
given we are wrapping callables with callables, maybe the Abfs Perf
tracker could join in. Not needed for this patch, but later...
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
##########
@@ -160,16 +188,7 @@ public long getWriteCurrentBufferOperations() {
@Override public String toString() {
final StringBuilder outputStreamStats = new StringBuilder(
"OutputStream Statistics{");
- outputStreamStats.append(", bytes_upload=").append(bytesToUpload);
- outputStreamStats.append(", bytes_upload_successfully=")
- .append(bytesUploadSuccessful);
- outputStreamStats.append(", bytes_upload_failed=")
- .append(bytesUploadFailed);
- outputStreamStats.append(", time_spent_task_wait=")
- .append(timeSpentOnTaskWait);
- outputStreamStats.append(", queue_shrunk_ops=").append(queueShrunkOps);
- outputStreamStats.append(", write_current_buffer_ops=")
- .append(writeCurrentBufferOperations);
+
outputStreamStats.append(IOStatisticsLogging.ioStatisticsSourceToString(ioStatisticsStore));
Review comment:
ioStatisticsStore.toString() does that too; what you get through the
logging API is resilience to failures
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
##########
@@ -161,55 +170,92 @@ public void remoteBytesRead(long bytes) {
*/
@Override
public void remoteReadOperation() {
- remoteReadOperations++;
+ ioStatisticsStore.incrementCounter(getStatName(REMOTE_READ_OP));
}
+ /**
+ * Getter for IOStatistics instance used.
+ * @return IOStatisticsStore instance which extends IOStatistics.
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return ioStatisticsStore;
+ }
+
+ @VisibleForTesting
public long getSeekOperations() {
- return seekOperations;
+ return ioStatisticsStore.counters().get(STREAM_READ_SEEK_OPERATIONS);
}
+ @VisibleForTesting
public long getForwardSeekOperations() {
- return forwardSeekOperations;
+ return
ioStatisticsStore.counters().get(STREAM_READ_SEEK_FORWARD_OPERATIONS);
}
+ @VisibleForTesting
public long getBackwardSeekOperations() {
- return backwardSeekOperations;
+ return
ioStatisticsStore.counters().get(STREAM_READ_SEEK_BACKWARD_OPERATIONS);
}
+ @VisibleForTesting
public long getBytesRead() {
- return bytesRead;
+ return ioStatisticsStore.counters().get(STREAM_READ_BYTES);
}
+ @VisibleForTesting
public long getBytesSkippedOnSeek() {
- return bytesSkippedOnSeek;
+ return ioStatisticsStore.counters().get(STREAM_READ_SEEK_BYTES_SKIPPED);
}
+ @VisibleForTesting
public long getBytesBackwardsOnSeek() {
- return bytesBackwardsOnSeek;
+ return ioStatisticsStore.counters().get(STREAM_READ_SEEK_BYTES_BACKWARDS);
}
+ @VisibleForTesting
public long getSeekInBuffer() {
- return seekInBuffer;
+ return ioStatisticsStore.counters().get(getStatName(SEEK_IN_BUFFER));
+
}
+ @VisibleForTesting
public long getReadOperations() {
- return readOperations;
+ return ioStatisticsStore.counters().get(STREAM_READ_OPERATIONS);
}
+ @VisibleForTesting
public long getBytesReadFromBuffer() {
- return bytesReadFromBuffer;
+ return ioStatisticsStore.counters().get(getStatName(BYTES_READ_BUFFER));
}
+ @VisibleForTesting
public long getRemoteReadOperations() {
- return remoteReadOperations;
+ return ioStatisticsStore.counters().get(getStatName(REMOTE_READ_OP));
}
+ @VisibleForTesting
public long getReadAheadBytesRead() {
- return readAheadBytesRead;
+ return
ioStatisticsStore.counters().get(getStatName(READ_AHEAD_BYTES_READ));
}
+ @VisibleForTesting
public long getRemoteBytesRead() {
- return remoteBytesRead;
+ return ioStatisticsStore.counters().get(getStatName(REMOTE_BYTES_READ));
+ }
+
+ @VisibleForTesting
+ public double getActionHttpGetRequest() {
Review comment:
even for testing, add a comment. Here's actually the mean value, isn't
it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]