This is an automated email from the ASF dual-hosted git repository. ahmar pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 9aba17e1114 HADOOP-19542. S3A: Close AAL factory on service stop. (#7616) (#7658) 9aba17e1114 is described below commit 9aba17e1114508a3d517de58811fdafdc881be37 Author: ahmarsuhail <ahma...@amazon.co.uk> AuthorDate: Tue Apr 29 20:22:36 2025 +0100 HADOOP-19542. S3A: Close AAL factory on service stop. (#7616) (#7658) Contributed by: Ahmar Suhail. --- .../org/apache/hadoop/fs/statistics/StreamStatisticNames.java | 7 +++++++ .../src/main/java/org/apache/hadoop/fs/s3a/Statistic.java | 4 ++++ .../src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java | 6 ++++++ .../apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java | 8 ++++++++ .../hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java | 3 +++ .../hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java | 3 +++ .../apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java | 6 ++++++ 7 files changed, 37 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index ab4838618da..09c19ad071a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -104,6 +104,13 @@ public final class StreamStatisticNames { */ public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened"; + /** + * Total count of times object stream factory was closed. + * + * Value: {@value}. + */ + public static final String ANALYTICS_STREAM_FACTORY_CLOSED = "analytics_stream_factory_closed"; + /** * Count of exceptions raised during input stream reads. * Value: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index ee98693e696..6389742167d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -352,6 +352,10 @@ public enum Statistic { StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS, "Total count of times an attempt to close an input stream was made", TYPE_COUNTER), + ANALYTICS_STREAM_FACTORY_CLOSED( + "analytics_stream_factory_closed", + "Count of times the analytics stream factory was closed", + TYPE_COUNTER), STREAM_READ_EXCEPTIONS( StreamStatisticNames.STREAM_READ_EXCEPTIONS, "Count of exceptions raised during input stream reads", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java index 1a7868dd044..526c9b15bc3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java @@ -996,6 +996,12 @@ public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOE LOG.debug("Stream factory requested async client"); return clientManager().getOrCreateAsyncClient(); } + + @Override + public void incrementFactoryStatistic(Statistic statistic) { + incrementStatistic(statistic); + } + } /* diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java index e03a38ad2c0..102d31a4f12 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -32,6 +32,7 @@ import org.apache.hadoop.util.functional.LazyAutoCloseableReference; import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; +import static org.apache.hadoop.fs.s3a.Statistic.ANALYTICS_STREAM_FACTORY_CLOSED; import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext; /** @@ -95,6 +96,13 @@ public StreamFactoryRequirements factoryRequirements() { StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests); } + @Override + protected void serviceStop() throws Exception { + this.s3SeekableInputStreamFactory.close(); + callbacks().incrementFactoryStatistic(ANALYTICS_STREAM_FACTORY_CLOSED); + super.serviceStop(); + } + private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory() throws IOException { return s3SeekableInputStreamFactory.eval(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java index ce168bea883..9b2b54c48e5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java @@ -22,6 +22,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; +import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.service.Service; @@ -85,6 +86,8 @@ interface StreamFactoryCallbacks { * @throws IOException failure to create the client. */ S3AsyncClient getOrCreateAsyncClient(boolean requireCRT) throws IOException; + + void incrementFactoryStatistic(Statistic statistic); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index 816ec90646f..dff171bbdd8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -48,6 +48,7 @@ import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -106,6 +107,8 @@ public void testConnectorFrameWorkIntegration() throws Throwable { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + fs.close(); + verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1); } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java index c8929b52b2b..05f15d46238 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import org.apache.hadoop.fs.s3a.Statistic; import org.assertj.core.api.Assertions; import org.junit.Test; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -334,6 +335,11 @@ private static final class Callbacks implements ObjectInputStreamFactory.StreamF public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOException { throw new UnsupportedOperationException("not implemented"); } + + @Override + public void incrementFactoryStatistic(Statistic statistic) { + throw new UnsupportedOperationException("not implemented"); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org