[ 
https://issues.apache.org/jira/browse/HADOOP-19348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931226#comment-17931226
 ] 

ASF GitHub Bot commented on HADOOP-19348:
-----------------------------------------

ahmarsuhail commented on code in PR #7433:
URL: https://github.com/apache/hadoop/pull/7433#discussion_r1974002838


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.assertj.core.api.Assertions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static 
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Tests integration of the
+ * <a href="https://github.com/awslabs/analytics-accelerator-s3";>analytics 
accelerator library</a>
+ *
+ * Certain tests in this class rely on reading local parquet files stored in 
resources.
+ * These files are copied from local to S3 and then read via the analytics 
stream.
+ * This is done to ensure AAL can read the parquet format, and handles 
exceptions from malformed
+ * parquet files.
+ *
+ */
+public class ITestS3AAnalyticsAcceleratorStreamReading extends 
AbstractS3ATestBase {
+
+  private static final String PHYSICAL_IO_PREFIX = "physicalio";
+  private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+  private Path externalTestFile;
+
+  @Before
+  public void setUp() throws Exception {
+    super.setup();
+    externalTestFile = getExternalData(getConfiguration());
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration configuration = super.createConfiguration();
+    enableAnalyticsAccelerator(configuration);
+    return configuration;
+  }
+
+  @Test
+  public void testConnectorFrameWorkIntegration() throws Throwable {
+    describe("Verify S3 connector framework integration");
+
+    S3AFileSystem fs =
+        (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), 
getConfiguration());
+    byte[] buffer = new byte[500];
+    IOStatistics ioStats;
+
+    try (FSDataInputStream inputStream =
+        fs.openFile(externalTestFile)
+            .must(FS_OPTION_OPENFILE_READ_POLICY, 
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+            .build().get()) {
+      ioStats = inputStream.getIOStatistics();
+      inputStream.seek(5);
+      inputStream.read(buffer, 0, 500);
+
+      final InputStream wrappedStream = inputStream.getWrappedStream();
+      ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+
+      
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
+      Assertions.assertThat(objectInputStream.getInputPolicy())
+          .isEqualTo(S3AInputPolicy.Sequential);
+    }
+
+    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+    verifyStatisticCounterValue(getFileSystem().getIOStatistics(),

Review Comment:
   @steveloughran can't get this one to work..





> S3A: Add initial support for analytics-accelerator-s3
> -----------------------------------------------------
>
>                 Key: HADOOP-19348
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19348
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.2
>            Reporter: Ahmar Suhail
>            Assignee: Ahmar Suhail
>            Priority: Major
>              Labels: pull-request-available
>
> S3 recently released [Analytics Accelerator Library for Amazon 
> S3|https://github.com/awslabs/analytics-accelerator-s3] as an Alpha release, 
> which is an input stream, with an initial goal of improving performance for 
> Apache Spark workloads on Parquet datasets. 
> For example, it implements optimisations such as footer prefetching, and so 
> avoids the multiple GETS S3AInputStream currently makes for the footer bytes 
> and PageIndex structures.
> The library also tracks columns currently being read by a query using the 
> parquet metadata, and then prefetches these bytes when parquet files with the 
> same schema are opened. 
> This ticket tracks the work required for the basic initial integration. There 
> is still more work to be done, such as VectoredIO support etc, which we will 
> identify and follow up with. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to