[
https://issues.apache.org/jira/browse/HADOOP-18410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585290#comment-17585290
]
ASF GitHub Bot commented on HADOOP-18410:
-----------------------------------------
steveloughran commented on code in PR #4766:
URL: https://github.com/apache/hadoop/pull/4766#discussion_r955898795
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestSDKStreamDrainer.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import com.amazonaws.internal.SdkFilterInputStream;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for stream draining.
+ */
+public class TestSDKStreamDrainer extends HadoopTestBase {
+
+ public static final int BYTES = 100;
+
+ /**
+ * Aborting does as asked.
+ */
+ @Test
+ public void testDrainerAborted() throws Throwable {
+ assertAborted(drainer(BYTES, true, stream()));
+ }
+
+ /**
+ * Create a stream of the default length.
+ * @return a stream.
+ */
+ private static FakeSDKInputStream stream() {
+ return new FakeSDKInputStream(BYTES);
+ }
+
+ /**
+ * a normal drain; all bytes are read. No abort.
+ */
+ @Test
+ public void testDrainerDrained() throws Throwable {
+ assertBytesReadNotAborted(
+ drainer(BYTES, false, stream()),
+ BYTES);
+ }
+
+ /**
+ * Empty streams are fine.
+ */
+ @Test
+ public void testEmptyStream() throws Throwable {
+ int size = 0;
+ assertBytesReadNotAborted(
+ drainer(size, false, new FakeSDKInputStream(size)),
+ size);
+ }
+
+ /**
+ * Single char read; just a safety check on the test stream more than
+ * the production code.
+ */
+ @Test
+ public void testSingleChar() throws Throwable {
+ int size = 1;
+ assertBytesReadNotAborted(
+ drainer(size, false, new FakeSDKInputStream(size)),
+ size);
+ }
+
+ /**
+ * a read spanning multiple buffers.
+ */
+ @Test
+ public void testMultipleBuffers() throws Throwable {
+ int size = DRAIN_BUFFER_SIZE + 1;
+ assertBytesReadNotAborted(
+ drainer(size, false, new FakeSDKInputStream(size)),
+ size);
+ }
+
+ /**
+ * Read of exactly one buffer.
+ */
+ @Test
+ public void testExactlyOneBuffer() throws Throwable {
+ int size = DRAIN_BUFFER_SIZE;
+ assertBytesReadNotAborted(
+ drainer(size, false, new FakeSDKInputStream(size)),
+ size);
+ }
+
+ /**
+ * Less data than expected came back. not escalated.
+ */
+ @Test
+ public void testStreamUnderflow() throws Throwable {
+ int size = 50;
+ assertBytesReadNotAborted(
+ drainer(BYTES, false, new FakeSDKInputStream(size)),
+ size);
+ }
+
+ /**
+ * Test a drain where a read triggers an IOE; this must escalate
+ * to an abort.
+ */
+ @Test
+ public void testReadFailure() throws Throwable {
+ int threshold = 50;
+ SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
+ null,
+ new FakeSDKInputStream(BYTES, threshold),
+ false,
+ BYTES,
+ EMPTY_INPUT_STREAM_STATISTICS, "test");
+ intercept(IOException.class, "", () ->
+ drainer.applyRaisingException());
+
+ assertAborted(drainer);
+ }
+
+ /**
+ * abort does not read(), so the exception will not surface.
+ */
+ @Test
+ public void testReadFailureDoesNotSurfaceInAbort() throws Throwable {
+ int threshold = 50;
+ SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
+ null,
+ new FakeSDKInputStream(BYTES, threshold),
+ true,
+ BYTES,
+ EMPTY_INPUT_STREAM_STATISTICS, "test");
+ drainer.applyRaisingException();
+
+ assertAborted(drainer);
+ }
+
+ /**
+ * make sure the underlying stream read code works.
+ */
+ @Test
+ public void testFakeStreamRead() throws Throwable {
+ FakeSDKInputStream stream = stream();
+ int count = 0;
+ while (stream.read() > 0) {
+ count++;
+ }
+ Assertions.assertThat(count)
+ .describedAs("bytes read from %s", stream)
+ .isEqualTo(BYTES);
+ }
+
+ /**
+ * Create a drainer and invoke it, rethrowing any exception
+ * which occurred during the draining.
+ * @param remaining bytes remaining in the stream
+ * @param shouldAbort should we abort?
+ * @param in input stream.
+ * @return the drainer
+ * @throws Throwable something went wrong
+ */
+ private SDKStreamDrainer drainer(int remaining,
+ boolean shouldAbort,
+ FakeSDKInputStream in) throws Throwable {
+ SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
+ null,
+ in,
+ shouldAbort,
+ remaining,
+ EMPTY_INPUT_STREAM_STATISTICS, "test");
+ drainer.applyRaisingException();
+ return drainer;
+ }
+
+
+ /**
+ * The draining aborted.
+ * @param drainer drainer to assert on.
+ * @return the drainer.
+ */
+ private SDKStreamDrainer assertAborted(SDKStreamDrainer drainer) {
+ Assertions.assertThat(drainer)
+ .matches(SDKStreamDrainer::isAborted, "isAborted");
+ return drainer;
Review Comment:
not yet...
> S3AInputStream.unbuffer() async drain not releasing http connections
> --------------------------------------------------------------------
>
> Key: HADOOP-18410
> URL: https://issues.apache.org/jira/browse/HADOOP-18410
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.3.9
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Blocker
> Labels: pull-request-available
>
> Impala tcp-ds setup to s3 is hitting problems with timeout fetching http
> connections from the s3a fs pool. Disabling s3a async drain makes this
> problem *go away*. assumption, either those async ops are blocking, or they
> are not releasing references properly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]