This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 690a19cb37e389c351c755635146b5b61dbdb22f Author: Yong Zhang <[email protected]> AuthorDate: Thu Aug 11 11:05:44 2022 +0800 [improve][functions][admin] Improve the package download process (#16365) * Improve the package download process --- *Motivation* Improve the package download process to handle the download body more efficient. --- .../broker/admin/v3/PackagesApiNotEnabledTest.java | 17 +++- .../pulsar/broker/admin/v3/PackagesApiTest.java | 18 +++- .../pulsar/client/admin/internal/PackagesImpl.java | 98 ++++++++++++++++------ 3 files changed, 105 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java index 3fd39d30d2d..2c39fbbaf8d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java @@ -20,6 +20,10 @@ package org.apache.pulsar.broker.admin.v3; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.packages.management.core.common.PackageMetadata; @@ -45,14 +49,23 @@ public class PackagesApiNotEnabledTest extends MockedPulsarServiceBaseTest { } @Test(timeOut = 60000) - public void testPackagesOperationsWithoutPackagesServiceEnabled() { + public void testPackagesOperationsWithoutPackagesServiceEnabled() throws Exception { // download package api should return 503 Service Unavailable exception String unknownPackageName = "function://public/default/unknown@v1"; + Path tmp = Files.createTempDirectory("package-test-tmp"); try { - admin.packages().download(unknownPackageName, "/test/unknown"); + admin.packages().download(unknownPackageName, tmp.toAbsolutePath().toString() + "/unknown"); fail("should throw 503 error"); } catch (PulsarAdminException e) { assertEquals(503, e.getStatusCode()); + } finally { + Files.walk(tmp).sorted(Comparator.reverseOrder()).forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } // get metadata api should return 503 Service Unavailable exception diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java index 69331c02c7d..dd082681b23 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java @@ -32,7 +32,11 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collections; +import java.util.Comparator; import java.util.List; @Test(groups = "broker-admin") @@ -101,14 +105,24 @@ public class PackagesApiTest extends MockedPulsarServiceBaseTest { } @Test(timeOut = 60000) - public void testPackagesOperationsFailed() { + public void testPackagesOperationsFailed() throws IOException { // download a non-existent package should return not found exception String unknownPackageName = "function://public/default/unknown@v1"; + + Path tmp = Files.createTempDirectory("package-test-tmp"); try { - admin.packages().download(unknownPackageName, "/test/unknown"); + admin.packages().download(unknownPackageName, tmp.toAbsolutePath() + "/unknown"); fail("should throw 404 error"); } catch (PulsarAdminException e) { assertEquals(404, e.getStatusCode()); + } finally { + Files.walk(tmp).sorted(Comparator.reverseOrder()).forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } // get the metadata of a non-existent package should return not found exception diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java index 1ed3e5da367..885e39c1ce6 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java @@ -18,18 +18,19 @@ */ package org.apache.pulsar.client.admin.internal; +import static org.asynchttpclient.Dsl.get; import com.google.gson.Gson; +import io.netty.handler.codec.http.HttpHeaders; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; +import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; import java.util.List; import java.util.concurrent.CompletableFuture; import javax.ws.rs.client.Entity; -import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -39,8 +40,11 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.packages.management.core.common.PackageMetadata; import org.apache.pulsar.packages.management.core.common.PackageName; +import org.asynchttpclient.AsyncHandler; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.Dsl; +import org.asynchttpclient.HttpResponseBodyPart; +import org.asynchttpclient.HttpResponseStatus; import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.request.body.multipart.FilePart; import org.asynchttpclient.request.body.multipart.StringPart; @@ -125,30 +129,76 @@ public class PackagesImpl extends ComponentResource implements Packages { public CompletableFuture<Void> downloadAsync(String packageName, String path) { WebTarget webTarget = packages.path(PackageName.get(packageName).toRestPath()); final CompletableFuture<Void> future = new CompletableFuture<>(); - asyncGetRequest(webTarget, new InvocationCallback<Response>(){ - @Override - public void completed(Response response) { - if (response.getStatus() == Response.Status.OK.getStatusCode()) { - try (InputStream inputStream = response.readEntity(InputStream.class)) { - Path destinyPath = Paths.get(path); - if (destinyPath.getParent() != null) { - Files.createDirectories(destinyPath.getParent()); + try { + Path destinyPath = Paths.get(path); + if (destinyPath.getParent() != null) { + Files.createDirectories(destinyPath.getParent()); + } + + FileChannel os = new FileOutputStream(destinyPath.toFile()).getChannel(); + RequestBuilder builder = get(webTarget.getUri().toASCIIString()); + + CompletableFuture<HttpResponseStatus> statusFuture = + httpClient.executeRequest(addAuthHeaders(webTarget, builder).build(), + new AsyncHandler<HttpResponseStatus>() { + private HttpResponseStatus status; + + @Override + public State onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception { + status = httpResponseStatus; + if (status.getStatusCode() != Response.Status.OK.getStatusCode()) { + return State.ABORT; + } + return State.CONTINUE; } - Files.copy(inputStream, destinyPath, StandardCopyOption.REPLACE_EXISTING); - future.complete(null); + + @Override + public State onHeadersReceived(HttpHeaders httpHeaders) throws Exception { + return State.CONTINUE; + } + + @Override + public State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception { + os.write(httpResponseBodyPart.getBodyByteBuffer()); + return State.CONTINUE; + } + + @Override + public void onThrowable(Throwable throwable) { + // we don't need to handle that throwable and use the returned future to handle it. + } + + @Override + public HttpResponseStatus onCompleted() throws Exception { + return status; + } + }).toCompletableFuture(); + statusFuture + .whenComplete((status, throwable) -> { + try { + os.close(); } catch (IOException e) { - future.completeExceptionally(e); + future.completeExceptionally(getApiException(throwable)); } - } else { - future.completeExceptionally(getApiException(response)); - } - } - - @Override - public void failed(Throwable throwable) { - future.completeExceptionally(throwable); - } - }); + }) + .thenAccept(status -> { + if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) { + future.completeExceptionally( + getApiException(Response + .status(status.getStatusCode()) + .entity(status.getStatusText()) + .build())); + } else { + future.complete(null); + } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; + }); + } catch (Exception e) { + future.completeExceptionally(getApiException(e)); + } return future; }
