[
https://issues.apache.org/jira/browse/HADOOP-19348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930341#comment-17930341
]
ASF GitHub Bot commented on HADOOP-19348:
-----------------------------------------
ahmarsuhail commented on code in PR #7334:
URL: https://github.com/apache/hadoop/pull/7334#discussion_r1969801801
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+ assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
+ assertEquals(objectInputStream.getInputPolicy(),
S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMalformedParquetFooter() throws IOException {
+ describe("Reading a malformed parquet file should not throw an exception");
+
+ // File with malformed footer take from
https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+ // This test ensures AAL does not throw exceptions if footer parsing
fails. It will only emit a WARN log,
+ // "Unable to parse parquet footer for test/malformedFooter.parquet,
parquet prefetch optimisations will be disabled for this key."
Review Comment:
yeah, AAL logs are very noisy right now.. too much get's logged at info. I
will downgrade everything debug and before the next AAL release (will upgrade
in hadoop before 3.4.2)
> S3A: Add initial support for analytics-accelerator-s3
> -----------------------------------------------------
>
> Key: HADOOP-19348
> URL: https://issues.apache.org/jira/browse/HADOOP-19348
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.4.2
> Reporter: Ahmar Suhail
> Priority: Major
> Labels: pull-request-available
>
> S3 recently released [Analytics Accelerator Library for Amazon
> S3|https://github.com/awslabs/analytics-accelerator-s3] as an Alpha release,
> which is an input stream, with an initial goal of improving performance for
> Apache Spark workloads on Parquet datasets.
> For example, it implements optimisations such as footer prefetching, and so
> avoids the multiple GETS S3AInputStream currently makes for the footer bytes
> and PageIndex structures.
> The library also tracks columns currently being read by a query using the
> parquet metadata, and then prefetches these bytes when parquet files with the
> same schema are opened.
> This ticket tracks the work required for the basic initial integration. There
> is still more work to be done, such as VectoredIO support etc, which we will
> identify and follow up with.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]