[
https://issues.apache.org/jira/browse/HADOOP-19348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931411#comment-17931411
]
ASF GitHub Bot commented on HADOOP-19348:
-----------------------------------------
ahmarsuhail commented on code in PR #7433:
URL: https://github.com/apache/hadoop/pull/7433#discussion_r1975142487
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.assertj.core.api.Assertions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Tests integration of the
+ * <a href="https://github.com/awslabs/analytics-accelerator-s3">analytics
accelerator library</a>
+ *
+ * Certain tests in this class rely on reading local parquet files stored in
resources.
+ * These files are copied from local to S3 and then read via the analytics
stream.
+ * This is done to ensure AAL can read the parquet format, and handles
exceptions from malformed
+ * parquet files.
+ *
+ */
+public class ITestS3AAnalyticsAcceleratorStreamReading extends
AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Path externalTestFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ externalTestFile = getExternalData(getConfiguration());
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws Throwable {
+ describe("Verify S3 connector framework integration");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(externalTestFile.toUri(),
getConfiguration());
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream =
+ fs.openFile(externalTestFile)
+ .must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+ .build().get()) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+
+
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
+ Assertions.assertThat(objectInputStream.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+ verifyStatisticCounterValue(getFileSystem().getIOStatistics(),
Review Comment:
Ok think it's to do with the test base class maybe, and these one's don't
aggregate to the FS. I added an assertion in `ITestS3AOpenCost` which extends
`AbstractS3ACostTest` and can see it get's propagated.
```
s3a.AbstractS3ATestBase
(AbstractS3ATestBase.java:dumpFileSystemIOStatistics(130)) - Aggregate
FileSystem Statistics counters=((action_file_opened=1)
(action_http_head_request=3)
(audit_request_execution=13)
....
(stream_read_analytics_opened=1)
....
```
Think this is enough for now, and I'll remove this assertion and merge as
is. Once we support statistics properly, we can add in tests in
`ITestS3AOpenCost`
> S3A: Add initial support for analytics-accelerator-s3
> -----------------------------------------------------
>
> Key: HADOOP-19348
> URL: https://issues.apache.org/jira/browse/HADOOP-19348
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.4.2
> Reporter: Ahmar Suhail
> Assignee: Ahmar Suhail
> Priority: Major
> Labels: pull-request-available
>
> S3 recently released [Analytics Accelerator Library for Amazon
> S3|https://github.com/awslabs/analytics-accelerator-s3] as an Alpha release,
> which is an input stream, with an initial goal of improving performance for
> Apache Spark workloads on Parquet datasets.
> For example, it implements optimisations such as footer prefetching, and so
> avoids the multiple GETS S3AInputStream currently makes for the footer bytes
> and PageIndex structures.
> The library also tracks columns currently being read by a query using the
> parquet metadata, and then prefetches these bytes when parquet files with the
> same schema are opened.
> This ticket tracks the work required for the basic initial integration. There
> is still more work to be done, such as VectoredIO support etc, which we will
> identify and follow up with.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]