This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1d25d044477 [improve][proxy] Add regression tests for package upload 
with 'Expect: 100-continue' (#25211)
1d25d044477 is described below

commit 1d25d044477c3e1784ba1d446bc12ee877dd3c3b
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Feb 5 17:34:07 2026 +0800

    [improve][proxy] Add regression tests for package upload with 'Expect: 
100-continue' (#25211)
    
    Co-authored-by: Copilot <[email protected]>
    (cherry picked from commit e8fedb16ca6b8b02b4981b325899000f1c828395)
---
 .../management/core/MockedPackagesStorage.java     |  10 +-
 .../management/core/MockedPackagesStorageTest.java |  52 ++++++++
 pulsar-proxy/pom.xml                               |   8 ++
 .../pulsar/proxy/server/AdminProxyHandler.java     |  79 ++---------
 .../proxy/server/ProxyPackagesUploadTest.java      | 144 +++++++++++++++++++++
 5 files changed, 220 insertions(+), 73 deletions(-)

diff --git 
a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
 
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
index 1a4b8010d51..6e76d142c12 100644
--- 
a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
+++ 
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.packages.management.core;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -45,8 +46,13 @@ public class MockedPackagesStorage implements 
PackagesStorage {
         CompletableFuture<Void> future = new CompletableFuture<>();
         CompletableFuture.runAsync(() -> {
             try {
-                byte[] bytes = new byte[inputStream.available()];
-                inputStream.read(bytes);
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                byte[] buffer = new byte[8192];
+                int read;
+                while ((read = inputStream.read(buffer)) != -1) {
+                    baos.write(buffer, 0, read);
+                }
+                byte[] bytes = baos.toByteArray();
                 storage.put(path, bytes);
                 future.complete(null);
             } catch (IOException e) {
diff --git 
a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java
 
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java
new file mode 100644
index 00000000000..eb48f02680d
--- /dev/null
+++ 
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.core;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import lombok.Cleanup;
+import org.testng.annotations.Test;
+
+public class MockedPackagesStorageTest {
+
+    @Test
+    public void testWriteAndRead() throws Exception {
+        PackagesStorageProvider provider = new MockedPackagesStorageProvider();
+        PackagesStorage storage = 
provider.getStorage(mock(PackagesStorageConfiguration.class));
+        storage.initialize();
+
+        // Test data
+        byte[] testBytes = new byte[1 * 1024 * 1024];
+
+        // Write
+        storage.writeAsync("test/path", new 
ByteArrayInputStream(testBytes)).get();
+
+        // Read
+        @Cleanup
+        ByteArrayOutputStream readBaos = new ByteArrayOutputStream();
+        storage.readAsync("test/path", readBaos).get();
+
+        // Verify
+        assertEquals(readBaos.toByteArray(), testBytes);
+
+        storage.closeAsync().get();
+    }
+}
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 93610d2d9ba..a875faeb8aa 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -197,6 +197,14 @@
       <version>${consolecaptor.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-package-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
   <build>
     <plugins>
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index eaed42a532a..3a21da53d93 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -30,9 +30,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import javax.net.ssl.SSLContext;
-import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -54,9 +52,7 @@ import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
 import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.proxy.ProxyServlet;
-import org.eclipse.jetty.util.HttpCookieStore;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,75 +101,16 @@ class AdminProxyHandler extends ProxyServlet {
 
     @Override
     protected HttpClient createHttpClient() throws ServletException {
-        ServletConfig config = getServletConfig();
-
-        HttpClient client = newHttpClient();
-
-        client.setFollowRedirects(true);
-
-        // Must not store cookies, otherwise cookies of different clients will 
mix.
-        client.setCookieStore(new HttpCookieStore.Empty());
-
-        Executor executor;
-        String value = config.getInitParameter("maxThreads");
-        if (value == null || "-".equals(value)) {
-            executor = (Executor) 
getServletContext().getAttribute("org.eclipse.jetty.server.Executor");
-            if (executor == null) {
-                throw new IllegalStateException("No server executor for 
proxy");
-            }
-        } else {
-            QueuedThreadPool qtp = new 
QueuedThreadPool(Integer.parseInt(value));
-            String servletName = config.getServletName();
-            int dot = servletName.lastIndexOf('.');
-            if (dot >= 0) {
-                servletName = servletName.substring(dot + 1);
-            }
-            qtp.setName(servletName);
-            executor = qtp;
-        }
-
-        client.setExecutor(executor);
-
-        value = config.getInitParameter("maxConnections");
-        if (value == null) {
-            value = "256";
-        }
-        client.setMaxConnectionsPerDestination(Integer.parseInt(value));
-
-        value = config.getInitParameter("idleTimeout");
-        if (value == null) {
-            value = "30000";
-        }
-        client.setIdleTimeout(Long.parseLong(value));
-
-        value = config.getInitParameter(INIT_PARAM_REQUEST_BUFFER_SIZE);
-        if (value != null) {
-            client.setRequestBufferSize(Integer.parseInt(value));
-        }
-
-        value = config.getInitParameter("responseBufferSize");
-        if (value != null){
-            client.setResponseBufferSize(Integer.parseInt(value));
-        }
-
-        try {
-            client.start();
-
-            // Content must not be decoded, otherwise the client gets confused.
-            // Allow encoded content, such as "Content-Encoding: gzip", to 
pass through without decoding it.
-            client.getContentDecoderFactories().clear();
-
-            // Pass traffic to the client, only intercept what's necessary.
-            ProtocolHandlers protocolHandlers = client.getProtocolHandlers();
-            protocolHandlers.clear();
-            protocolHandlers.put(new RedirectProtocolHandler(client));
-
-            return client;
-        } catch (Exception x) {
-            throw new ServletException(x);
-        }
+        HttpClient httpClient = super.createHttpClient();
+        customizeHttpClient(httpClient);
+        return httpClient;
     }
 
+    private void customizeHttpClient(HttpClient httpClient) {
+        httpClient.setFollowRedirects(true);
+        ProtocolHandlers protocolHandlers = httpClient.getProtocolHandlers();
+        protocolHandlers.put(new RedirectProtocolHandler(httpClient));
+    }
 
     // This class allows the request body to be replayed, the default 
implementation
     // does not
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java
new file mode 100644
index 00000000000..8575be21207
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static com.google.common.net.HttpHeaders.EXPECT;
+import static org.assertj.core.api.Assertions.assertThat;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import 
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
+import org.apache.pulsar.packages.management.core.common.PackageMetadata;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.request.body.multipart.FilePart;
+import org.asynchttpclient.request.body.multipart.StringPart;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class ProxyPackagesUploadTest extends MockedPulsarServiceBaseTest {
+
+    private static final int FILE_SIZE = 8 * 1024 * 1024; // 8 MB
+    private static final ObjectMapper MAPPER = ObjectMapperFactory.create();
+    private WebServer webServer;
+    private PulsarAdmin proxyAdmin;
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        conf.setEnablePackagesManagement(true);
+        
conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
+        super.internalSetup();
+
+        ProxyConfiguration proxyConfig = new ProxyConfiguration();
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setBrokerWebServiceURL(brokerUrl.toString());
+
+        webServer = new WebServer(proxyConfig, new AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(proxyConfig, true)));
+        webServer.addServlet("/", new ServletHolder(new 
AdminProxyHandler(proxyConfig, null, null)));
+        webServer.start();
+
+        proxyAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl("http://localhost:"; + 
webServer.getListenPortHTTP().get())
+                .build();
+
+        admin.tenants().createTenant("public", createDefaultTenantInfo());
+        admin.namespaces().createNamespace("public/default");
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        if (proxyAdmin != null) {
+            proxyAdmin.close();
+        }
+        if (webServer != null) {
+            webServer.stop();
+        }
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testUploadPackageThroughProxy() throws Exception {
+        Path packageFile = Files.createTempFile("pkg-sdk", ".nar");
+        packageFile.toFile().deleteOnExit();
+        Files.write(packageFile, new byte[FILE_SIZE]);
+
+        String pkgName = "function://public/default/pkg-sdk@v1";
+        PackageMetadata meta = 
PackageMetadata.builder().description("sdk-test").build();
+
+        proxyAdmin.packages().upload(meta, pkgName, packageFile.toString());
+
+        verifyDownload(pkgName, FILE_SIZE);
+    }
+
+    @Test
+    public void testUploadWithExpect100Continue() throws Exception {
+        Path packageFile = Files.createTempFile("pkg-ahc", ".nar");
+        packageFile.toFile().deleteOnExit();
+        Files.write(packageFile, new byte[FILE_SIZE]);
+
+        String pkgName = "function://public/default/expect-test@v1";
+        String uploadUrl = 
String.format("http://localhost:%d/admin/v3/packages/function/public/default/expect-test/v1";,
+                webServer.getListenPortHTTP().orElseThrow());
+
+        @Cleanup
+        AsyncHttpClient client = new DefaultAsyncHttpClient(new 
DefaultAsyncHttpClientConfig.Builder().build());
+
+        Response response = client.executeRequest(new RequestBuilder("POST")
+                .setUrl(uploadUrl)
+                .addHeader(EXPECT, "100-continue")
+                .addBodyPart(new FilePart("file", packageFile.toFile()))
+                .addBodyPart(new StringPart("metadata", 
MAPPER.writeValueAsString(
+                        
PackageMetadata.builder().description("ahc-test").build()), "application/json"))
+                .build()).get();
+
+        assertThat(response.getStatusCode()).isEqualTo(204);
+
+        verifyDownload(pkgName, FILE_SIZE);
+    }
+
+    private void verifyDownload(String packageName, int expectedSize) throws 
Exception {
+        Path fromBroker = Files.createTempFile("from-broker", ".nar");
+        fromBroker.toFile().deleteOnExit();
+        admin.packages().download(packageName, fromBroker.toString());
+        assertThat(Files.size(fromBroker)).isEqualTo(expectedSize);
+        Files.deleteIfExists(fromBroker);
+
+        Path fromProxy = Files.createTempFile("from-proxy", ".nar");
+        fromProxy.toFile().deleteOnExit();
+        proxyAdmin.packages().download(packageName, fromProxy.toString());
+        assertThat(Files.size(fromProxy)).isEqualTo(expectedSize);
+    }
+}

Reply via email to