[ 
https://issues.apache.org/jira/browse/HADOOP-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746534#comment-17746534
 ] 

ASF GitHub Bot commented on HADOOP-18184:
-----------------------------------------

ahmarsuhail commented on code in PR #5832:
URL: https://github.com/apache/hadoop/pull/5832#discussion_r1270694111


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java:
##########
@@ -409,27 +455,54 @@ protected String getOffsetStr(long offset) {
     return String.format("%d:%d", blockNumber, offset);
   }
 
+  @Override
+  public synchronized void unbuffer() {
+    LOG.debug("{}: unbuffered", getName());
+    if (closeStream(true)) {
+      getS3AStreamStatistics().unbuffered();
+    }
+  }
+
+  /**
+   * Close the stream in close() or unbuffer().
+   * @param unbuffer is this an unbuffer operation?
+   * @return true if the stream was closed; false means it was already closed.
+   */
+  protected boolean closeStream(final boolean unbuffer) {
+
+    if (underlyingResourcesClosed.getAndSet(true)) {
+      return false;
+    }
+
+    if (unbuffer) {

Review Comment:
   why don't we just do blockData = null? Since on 
`initializeUnderlyingResources` we create a new BlockData obj 



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java:
##########
@@ -298,16 +306,26 @@ public void requestPrefetch(int blockNumber) {
 
   /**
    * Requests cancellation of any previously issued prefetch requests.
+   * If the reason was switching to random IO, any active prefetched blocks
+   * are still cached.
+   * @param reason why?
    */
   @Override
-  public void cancelPrefetches() {
+  public void cancelPrefetches(final CancelReason reason) {
+    LOG.debug("Cancelling prefetches {}", reason);
     BlockOperations.Operation op = ops.cancelPrefetches();
 
-    for (BufferData data : bufferPool.getAll()) {
-      // We add blocks being prefetched to the local cache so that the 
prefetch is not wasted.
-      if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, 
BufferData.State.READY)) {
-        requestCaching(data);
+    if (reason == CancelReason.RandomIO) {
+      for (BufferData data : bufferPool.getAll()) {
+        // We add blocks being prefetched to the local cache so that the 
prefetch is not wasted.
+        // this only done if the reason is random IO-related, not due to 
close/unbuffer
+        if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, 
BufferData.State.READY)) {
+          requestCaching(data);
+        }
       }
+    } else {
+      // free the buffers
+      bufferPool.getAll().forEach(BufferData::setDone);

Review Comment:
   done buffers will get released on the next prefetch..but wondering if we can 
just release here instead. 



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java:
##########
@@ -494,19 +546,25 @@ private void addToCacheAndRelease(BufferData data, 
Future<Void> blockFuture,
     prefetchingStatistics.executorAcquired(
         Duration.between(taskQueuedStartTime, Instant.now()));
 
-    if (closed) {
+    if (isClosed()) {
       return;
     }
 
-    if (cachingDisabled.get()) {
+    final int blockNumber = data.getBlockNumber();
+    LOG.debug("Block {}: Preparing to cache block", blockNumber);
+
+    if (isCachingDisabled()) {

Review Comment:
   I'm confused about why we're doing this twice, here and on line 577. as far 
as I can tell, in between these two, nothing is changing the 
`isCachingDisabled` state



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java:
##########
@@ -494,19 +546,25 @@ private void addToCacheAndRelease(BufferData data, 
Future<Void> blockFuture,
     prefetchingStatistics.executorAcquired(
         Duration.between(taskQueuedStartTime, Instant.now()));
 
-    if (closed) {
+    if (isClosed()) {
       return;
     }
 
-    if (cachingDisabled.get()) {
+    final int blockNumber = data.getBlockNumber();
+    LOG.debug("Block {}: Preparing to cache block", blockNumber);
+
+    if (isCachingDisabled()) {
+      LOG.debug("Block {}: Preparing caching disabled, not prefetching", 
blockNumber);

Review Comment:
   I think this should be `caching disabled, not caching` or something. 
prefetching may or not be happening here (it could already be done prefetching 
by the time it gets here)



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java:
##########
@@ -392,16 +413,41 @@ private void readBlock(BufferData data, boolean 
isPrefetch, BufferData.State...
           ops.end(op);
         }
 
-        if (isPrefetch) {
-          prefetchingStatistics.prefetchOperationCompleted();
-          if (tracker != null) {
-            tracker.close();
-          }
+        // update the statistics
+        prefetchingStatistics.fetchOperationCompleted(isPrefetch, 
bytesFetched);
+        if (tracker != null) {
+          tracker.close();
+          LOG.debug("fetch completed: {}", tracker);
         }
       }
     }
   }
 
+  /**
+   * True if the manager has been closed.
+   */
+  private boolean isClosed() {
+    return closed.get();
+  }
+
+  /**
+   * Disable caching; updates stream statistics and logs exactly once
+   * at info
+   * @param endOp operation which measured the duration of the write.
+   */
+  private void disableCaching(final BlockOperations.End endOp) {
+    if (!cachingDisabled.getAndSet(true)) {
+      String message = String.format(
+          "Caching disabled because of slow operation (%.1f sec)", 
endOp.duration());
+      LOG_CACHING_DISABLED.info(message);
+      prefetchingStatistics.setPrefetchCachingState(false);

Review Comment:
   nit: naming here can get confusing, it's not immediately clear if 
prefetchCachingState refers to just caching blocks in memory via prefetching, 
or caching them to disk. It would make things clearer if this was instead  
something like `setPrefetchDiskCachingState`. if we do rename, also renaming 
the caching methods would make things clearer



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java:
##########
@@ -526,18 +585,21 @@ private void addToCacheAndRelease(BufferData data, 
Future<Void> blockFuture,
     synchronized (data) {
       try {
         if (data.stateEqualsOneOf(BufferData.State.DONE)) {
+          LOG.debug("Block {}: Block already in cache; not adding", 
blockNumber);

Review Comment:
   this should be something like "block no longer in use, not adding"



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java:
##########
@@ -76,36 +79,75 @@ public S3ACachingInputStream(
       S3AInputStreamStatistics streamStatistics,
       Configuration conf,
       LocalDirAllocator localDirAllocator) {
-    super(context, s3Attributes, client, streamStatistics);
 
-    this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
-    int bufferPoolSize = this.numBlocksToPrefetch + 1;
-    this.blockManager = this.createBlockManager(
-        this.getContext().getFuturePool(),
-        this.getReader(),
-        this.getBlockData(),
-        bufferPoolSize,
-        conf,
-        localDirAllocator);
+    super(context, s3Attributes, client, streamStatistics);
+    this.conf = conf;
+    this.localDirAllocator = localDirAllocator;
+    this.numBlocksToPrefetch = getContext().getPrefetchBlockCount();
+    demandCreateBlockManager();
     int fileSize = (int) s3Attributes.getLen();
     LOG.debug("Created caching input stream for {} (size = {})", 
this.getName(),
         fileSize);
+    streamStatistics.setPrefetchState(numBlocksToPrefetch > 0,

Review Comment:
   we could also update this statistic in S3AInMemoryInputStream?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java:
##########
@@ -339,49 +367,67 @@ public int read(byte[] buffer, int offset, int len) 
throws IOException {
       numBytesRemaining -= bytesToRead;
       numBytesRead += bytesToRead;
     }
-
     return numBytesRead;
   }
 
-  protected S3ARemoteObject getFile() {
+  /**
+   * Forward to superclass after updating the read fully IOStatistics
+   * {@inheritDoc}
+   */
+  @Override
+  public void readFully(final long position,
+      final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    throwIfClosed();
+    validatePositionedReadArgs(position, buffer, offset, length);
+    streamStatistics.readFullyOperationStarted(position, length);
+    super.readFully(position, buffer, offset, length);
+  }
+
+  protected final S3ARemoteObject getRemoteObject() {
     return remoteObject;
   }
 
-  protected S3ARemoteObjectReader getReader() {
+  protected final S3ARemoteObjectReader getReader() {
     return reader;
   }
 
-  protected S3ObjectAttributes getS3ObjectAttributes() {
+  protected final S3ObjectAttributes getS3ObjectAttributes() {
     return s3Attributes;
   }
 
-  protected FilePosition getFilePosition() {
+  protected final FilePosition getFilePosition() {
     return fpos;
   }
 
-  protected String getName() {
+  protected final String getName() {
     return name;
   }
 
-  protected boolean isClosed() {
-    return closed;
+  protected final boolean isClosed() {
+    return closed.get();
+  }
+
+  protected final boolean underlyingResourcesClosed() {
+    return underlyingResourcesClosed.get();
   }
 
-  protected long getNextReadPos() {
+  protected final long getNextReadPos() {
     return nextReadPos;
   }
 
-  protected BlockData getBlockData() {
+  protected final BlockData getBlockData() {
     return blockData;
   }
 
-  protected S3AReadOpContext getContext() {
+  protected final S3AReadOpContext getContext() {
     return context;
   }
 
   private void incrementBytesRead(int bytesRead) {
     if (bytesRead > 0) {
-      streamStatistics.bytesRead(bytesRead);
+      streamStatistics.bytesReadFromBuffer(bytesRead);

Review Comment:
   what's the difference between this and the one on line 432?



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java:
##########
@@ -24,14 +24,24 @@
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
-public interface PrefetchingStatistics extends IOStatisticsSource {
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;
+
+ public interface PrefetchingStatistics extends IOStatisticsSource {
 
   /**
    * A prefetch operation has started.
    * @return duration tracker
    */
   DurationTracker prefetchOperationStarted();
 
+  /**
+   * A block fetch operation has started.
+   * @return duration tracker
+   */
+  default DurationTracker blockFetchOperationStarted() {
+    return stubDurationTracker();

Review Comment:
   curious, why is the stubDurationTracker required? If it's because this 
interface maybe implemented somewhere that doesn't support duration tracking, 
then we should add it to the `prefetchOperationStarted()` method too





> s3a prefetching stream to support unbuffer()
> --------------------------------------------
>
>                 Key: HADOOP-18184
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18184
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Minor
>              Labels: pull-request-available
>
> Apache Impala uses unbuffer() to free up all client side resources held by a 
> stream, so allowing it to have a map of available (path -> stream) objects, 
> retained across queries.
> This saves on having to reopen the files, with the cost of HEAD checks etc. 
> S3AInputStream just closes its http connection. here there is a lot more 
> state to discard, but all memory and file storage must be freed.
> until this done, ITestS3AContractUnbuffer must skip when the prefetch stream 
> is used.
> its notable that the other tests don't fail, even though the stream doesn't 
> implement the interface; the graceful degradation handles that. it should 
> fail if the test xml resource says the stream does it, but that the stream 
> capabilities say it doesn't.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to