This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1d25d044477 [improve][proxy] Add regression tests for package upload
with 'Expect: 100-continue' (#25211)
1d25d044477 is described below
commit 1d25d044477c3e1784ba1d446bc12ee877dd3c3b
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Feb 5 17:34:07 2026 +0800
[improve][proxy] Add regression tests for package upload with 'Expect:
100-continue' (#25211)
Co-authored-by: Copilot <[email protected]>
(cherry picked from commit e8fedb16ca6b8b02b4981b325899000f1c828395)
---
.../management/core/MockedPackagesStorage.java | 10 +-
.../management/core/MockedPackagesStorageTest.java | 52 ++++++++
pulsar-proxy/pom.xml | 8 ++
.../pulsar/proxy/server/AdminProxyHandler.java | 79 ++---------
.../proxy/server/ProxyPackagesUploadTest.java | 144 +++++++++++++++++++++
5 files changed, 220 insertions(+), 73 deletions(-)
diff --git
a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
index 1a4b8010d51..6e76d142c12 100644
---
a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
+++
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.packages.management.core;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -45,8 +46,13 @@ public class MockedPackagesStorage implements
PackagesStorage {
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try {
- byte[] bytes = new byte[inputStream.available()];
- inputStream.read(bytes);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[8192];
+ int read;
+ while ((read = inputStream.read(buffer)) != -1) {
+ baos.write(buffer, 0, read);
+ }
+ byte[] bytes = baos.toByteArray();
storage.put(path, bytes);
future.complete(null);
} catch (IOException e) {
diff --git
a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java
new file mode 100644
index 00000000000..eb48f02680d
--- /dev/null
+++
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.core;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import lombok.Cleanup;
+import org.testng.annotations.Test;
+
+public class MockedPackagesStorageTest {
+
+ @Test
+ public void testWriteAndRead() throws Exception {
+ PackagesStorageProvider provider = new MockedPackagesStorageProvider();
+ PackagesStorage storage =
provider.getStorage(mock(PackagesStorageConfiguration.class));
+ storage.initialize();
+
+ // Test data
+ byte[] testBytes = new byte[1 * 1024 * 1024];
+
+ // Write
+ storage.writeAsync("test/path", new
ByteArrayInputStream(testBytes)).get();
+
+ // Read
+ @Cleanup
+ ByteArrayOutputStream readBaos = new ByteArrayOutputStream();
+ storage.readAsync("test/path", readBaos).get();
+
+ // Verify
+ assertEquals(readBaos.toByteArray(), testBytes);
+
+ storage.closeAsync().get();
+ }
+}
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 93610d2d9ba..a875faeb8aa 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -197,6 +197,14 @@
<version>${consolecaptor.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-package-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
<plugins>
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index eaed42a532a..3a21da53d93 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -30,9 +30,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.Executor;
import javax.net.ssl.SSLContext;
-import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -54,9 +52,7 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.proxy.ProxyServlet;
-import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,75 +101,16 @@ class AdminProxyHandler extends ProxyServlet {
@Override
protected HttpClient createHttpClient() throws ServletException {
- ServletConfig config = getServletConfig();
-
- HttpClient client = newHttpClient();
-
- client.setFollowRedirects(true);
-
- // Must not store cookies, otherwise cookies of different clients will
mix.
- client.setCookieStore(new HttpCookieStore.Empty());
-
- Executor executor;
- String value = config.getInitParameter("maxThreads");
- if (value == null || "-".equals(value)) {
- executor = (Executor)
getServletContext().getAttribute("org.eclipse.jetty.server.Executor");
- if (executor == null) {
- throw new IllegalStateException("No server executor for
proxy");
- }
- } else {
- QueuedThreadPool qtp = new
QueuedThreadPool(Integer.parseInt(value));
- String servletName = config.getServletName();
- int dot = servletName.lastIndexOf('.');
- if (dot >= 0) {
- servletName = servletName.substring(dot + 1);
- }
- qtp.setName(servletName);
- executor = qtp;
- }
-
- client.setExecutor(executor);
-
- value = config.getInitParameter("maxConnections");
- if (value == null) {
- value = "256";
- }
- client.setMaxConnectionsPerDestination(Integer.parseInt(value));
-
- value = config.getInitParameter("idleTimeout");
- if (value == null) {
- value = "30000";
- }
- client.setIdleTimeout(Long.parseLong(value));
-
- value = config.getInitParameter(INIT_PARAM_REQUEST_BUFFER_SIZE);
- if (value != null) {
- client.setRequestBufferSize(Integer.parseInt(value));
- }
-
- value = config.getInitParameter("responseBufferSize");
- if (value != null){
- client.setResponseBufferSize(Integer.parseInt(value));
- }
-
- try {
- client.start();
-
- // Content must not be decoded, otherwise the client gets confused.
- // Allow encoded content, such as "Content-Encoding: gzip", to
pass through without decoding it.
- client.getContentDecoderFactories().clear();
-
- // Pass traffic to the client, only intercept what's necessary.
- ProtocolHandlers protocolHandlers = client.getProtocolHandlers();
- protocolHandlers.clear();
- protocolHandlers.put(new RedirectProtocolHandler(client));
-
- return client;
- } catch (Exception x) {
- throw new ServletException(x);
- }
+ HttpClient httpClient = super.createHttpClient();
+ customizeHttpClient(httpClient);
+ return httpClient;
}
+ private void customizeHttpClient(HttpClient httpClient) {
+ httpClient.setFollowRedirects(true);
+ ProtocolHandlers protocolHandlers = httpClient.getProtocolHandlers();
+ protocolHandlers.put(new RedirectProtocolHandler(httpClient));
+ }
// This class allows the request body to be replayed, the default
implementation
// does not
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java
new file mode 100644
index 00000000000..8575be21207
--- /dev/null
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static com.google.common.net.HttpHeaders.EXPECT;
+import static org.assertj.core.api.Assertions.assertThat;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
+import org.apache.pulsar.packages.management.core.common.PackageMetadata;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.request.body.multipart.FilePart;
+import org.asynchttpclient.request.body.multipart.StringPart;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class ProxyPackagesUploadTest extends MockedPulsarServiceBaseTest {
+
+ private static final int FILE_SIZE = 8 * 1024 * 1024; // 8 MB
+ private static final ObjectMapper MAPPER = ObjectMapperFactory.create();
+ private WebServer webServer;
+ private PulsarAdmin proxyAdmin;
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ conf.setEnablePackagesManagement(true);
+
conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
+ super.internalSetup();
+
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setBrokerWebServiceURL(brokerUrl.toString());
+
+ webServer = new WebServer(proxyConfig, new AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig, true)));
+ webServer.addServlet("/", new ServletHolder(new
AdminProxyHandler(proxyConfig, null, null)));
+ webServer.start();
+
+ proxyAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl("http://localhost:" +
webServer.getListenPortHTTP().get())
+ .build();
+
+ admin.tenants().createTenant("public", createDefaultTenantInfo());
+ admin.namespaces().createNamespace("public/default");
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ if (proxyAdmin != null) {
+ proxyAdmin.close();
+ }
+ if (webServer != null) {
+ webServer.stop();
+ }
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testUploadPackageThroughProxy() throws Exception {
+ Path packageFile = Files.createTempFile("pkg-sdk", ".nar");
+ packageFile.toFile().deleteOnExit();
+ Files.write(packageFile, new byte[FILE_SIZE]);
+
+ String pkgName = "function://public/default/pkg-sdk@v1";
+ PackageMetadata meta =
PackageMetadata.builder().description("sdk-test").build();
+
+ proxyAdmin.packages().upload(meta, pkgName, packageFile.toString());
+
+ verifyDownload(pkgName, FILE_SIZE);
+ }
+
+ @Test
+ public void testUploadWithExpect100Continue() throws Exception {
+ Path packageFile = Files.createTempFile("pkg-ahc", ".nar");
+ packageFile.toFile().deleteOnExit();
+ Files.write(packageFile, new byte[FILE_SIZE]);
+
+ String pkgName = "function://public/default/expect-test@v1";
+ String uploadUrl =
String.format("http://localhost:%d/admin/v3/packages/function/public/default/expect-test/v1",
+ webServer.getListenPortHTTP().orElseThrow());
+
+ @Cleanup
+ AsyncHttpClient client = new DefaultAsyncHttpClient(new
DefaultAsyncHttpClientConfig.Builder().build());
+
+ Response response = client.executeRequest(new RequestBuilder("POST")
+ .setUrl(uploadUrl)
+ .addHeader(EXPECT, "100-continue")
+ .addBodyPart(new FilePart("file", packageFile.toFile()))
+ .addBodyPart(new StringPart("metadata",
MAPPER.writeValueAsString(
+
PackageMetadata.builder().description("ahc-test").build()), "application/json"))
+ .build()).get();
+
+ assertThat(response.getStatusCode()).isEqualTo(204);
+
+ verifyDownload(pkgName, FILE_SIZE);
+ }
+
+ private void verifyDownload(String packageName, int expectedSize) throws
Exception {
+ Path fromBroker = Files.createTempFile("from-broker", ".nar");
+ fromBroker.toFile().deleteOnExit();
+ admin.packages().download(packageName, fromBroker.toString());
+ assertThat(Files.size(fromBroker)).isEqualTo(expectedSize);
+ Files.deleteIfExists(fromBroker);
+
+ Path fromProxy = Files.createTempFile("from-proxy", ".nar");
+ fromProxy.toFile().deleteOnExit();
+ proxyAdmin.packages().download(packageName, fromProxy.toString());
+ assertThat(Files.size(fromProxy)).isEqualTo(expectedSize);
+ }
+}