Copilot commented on code in PR #25211: URL: https://github.com/apache/pulsar/pull/25211#discussion_r2766984824
########## pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static com.google.common.net.HttpHeaders.EXPECT; +import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import lombok.Cleanup; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; +import org.apache.pulsar.packages.management.core.common.PackageMetadata; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.RequestBuilder; +import org.asynchttpclient.Response; +import org.asynchttpclient.request.body.multipart.FilePart; +import org.asynchttpclient.request.body.multipart.StringPart; +import org.eclipse.jetty.ee8.servlet.ServletHolder; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +public class ProxyPackagesUploadTest extends MockedPulsarServiceBaseTest { + + private static final int FILE_SIZE = 8 * 1024 * 1024; // 8 MB + private static final ObjectMapper MAPPER = ObjectMapperFactory.create(); + private WebServer webServer; + private PulsarAdmin proxyAdmin; + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setEnablePackagesManagement(true); + conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName()); + super.internalSetup(); + + ProxyConfiguration proxyConfig = new ProxyConfiguration(); + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setBrokerWebServiceURL(brokerUrl.toString()); + + webServer = new WebServer(proxyConfig, new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig, true))); + webServer.addServlet("/", new ServletHolder(new AdminProxyHandler(proxyConfig, null, null))); + webServer.start(); + + proxyAdmin = PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:" + webServer.getListenPortHTTP().get()) + .build(); + + admin.tenants().createTenant("public", createDefaultTenantInfo()); + admin.namespaces().createNamespace("public/default"); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + if (proxyAdmin != null) proxyAdmin.close(); + if (webServer != null) webServer.stop(); + super.internalCleanup(); + } + + @Test + public void testUploadPackageThroughProxy() throws Exception { + Path packageFile = Files.createTempFile("pkg-sdk", ".nar"); + packageFile.toFile().deleteOnExit(); + Files.write(packageFile, new byte[FILE_SIZE]); + + String pkgName = "function://public/default/pkg-sdk@v1"; + PackageMetadata meta = PackageMetadata.builder().description("sdk-test").build(); + + proxyAdmin.packages().upload(meta, pkgName, packageFile.toString()); + + verifyDownload(pkgName, FILE_SIZE); + + Files.deleteIfExists(packageFile); + } + + @Test + public void testUploadWithExpect100Continue() throws Exception { + Path packageFile = Files.createTempFile("pkg-ahc", ".nar"); + packageFile.toFile().deleteOnExit(); + Files.write(packageFile, new byte[FILE_SIZE]); + + String pkgName = "function://public/default/expect-test@v1"; + String uploadUrl = String.format("http://localhost:%d/admin/v3/packages/function/public/default/expect-test/v1", + webServer.getListenPortHTTP().orElseThrow()); + + @Cleanup + AsyncHttpClient client = new DefaultAsyncHttpClient(new DefaultAsyncHttpClientConfig.Builder().build()); + + Response response = client.executeRequest(new RequestBuilder("POST") + .setUrl(uploadUrl) + .addHeader(EXPECT, "100-continue") + .addBodyPart(new FilePart("file", packageFile.toFile())) + .addBodyPart(new StringPart("metadata", MAPPER.writeValueAsString( + PackageMetadata.builder().description("ahc-test").build()), "application/json")) + .build()).get(); + + assertThat(response.getStatusCode()).isEqualTo(204); + + verifyDownload(pkgName, FILE_SIZE); Review Comment: Inconsistent cleanup: The first test (testUploadPackageThroughProxy) explicitly deletes the temporary package file with Files.deleteIfExists(packageFile) at line 102, but this test does not. For consistency and to avoid accumulating temporary files during test execution, add Files.deleteIfExists(packageFile) after line 128. ```suggestion verifyDownload(pkgName, FILE_SIZE); Files.deleteIfExists(packageFile); ``` ########## pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java: ########## @@ -45,8 +46,15 @@ public CompletableFuture<Void> writeAsync(String path, InputStream inputStream) CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { try { - byte[] bytes = new byte[inputStream.available()]; - inputStream.read(bytes); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[8192]; + int read; + while ((read = inputStream.read(buffer)) != -1) { + if (read > 0) { + baos.write(buffer, 0, read); + } Review Comment: The check `if (read > 0)` is redundant. The method `InputStream.read(byte[])` returns -1 when end of stream is reached, otherwise it returns the number of bytes read which is always > 0. The while loop already checks for -1, so this additional check serves no purpose and can be removed. ```suggestion baos.write(buffer, 0, read); ``` ########## pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static com.google.common.net.HttpHeaders.EXPECT; +import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import lombok.Cleanup; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; +import org.apache.pulsar.packages.management.core.common.PackageMetadata; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.RequestBuilder; +import org.asynchttpclient.Response; +import org.asynchttpclient.request.body.multipart.FilePart; +import org.asynchttpclient.request.body.multipart.StringPart; +import org.eclipse.jetty.ee8.servlet.ServletHolder; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +public class ProxyPackagesUploadTest extends MockedPulsarServiceBaseTest { + + private static final int FILE_SIZE = 8 * 1024 * 1024; // 8 MB + private static final ObjectMapper MAPPER = ObjectMapperFactory.create(); + private WebServer webServer; + private PulsarAdmin proxyAdmin; + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setEnablePackagesManagement(true); + conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName()); + super.internalSetup(); + + ProxyConfiguration proxyConfig = new ProxyConfiguration(); + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setBrokerWebServiceURL(brokerUrl.toString()); + + webServer = new WebServer(proxyConfig, new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig, true))); + webServer.addServlet("/", new ServletHolder(new AdminProxyHandler(proxyConfig, null, null))); + webServer.start(); + + proxyAdmin = PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:" + webServer.getListenPortHTTP().get()) + .build(); + + admin.tenants().createTenant("public", createDefaultTenantInfo()); + admin.namespaces().createNamespace("public/default"); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + if (proxyAdmin != null) proxyAdmin.close(); + if (webServer != null) webServer.stop(); + super.internalCleanup(); + } + + @Test + public void testUploadPackageThroughProxy() throws Exception { + Path packageFile = Files.createTempFile("pkg-sdk", ".nar"); + packageFile.toFile().deleteOnExit(); Review Comment: The use of deleteOnExit() followed by manual deleteIfExists() is redundant. The deleteOnExit() ensures the file is deleted when the JVM exits, but since the test already calls deleteIfExists() at the end, the deleteOnExit() serves no purpose and can be removed. This pattern appears in both test methods and the verifyDownload helper method. ########## pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java: ########## @@ -322,6 +248,12 @@ private String getWebServiceUrl() throws PulsarServerException { } } + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + super.service(request, response); + } + @Override Review Comment: The PR description states that "Added logic in AdminProxyHandler to detect requests with the 'Expect: 100-continue' header and wrap them using NoExpectRequestWrapper, ensuring the header is not forwarded to the broker" and "Introduced the NoExpectRequestWrapper inner class to filter out the 'Expect' header from incoming requests." However, the actual code changes do not include any NoExpectRequestWrapper class or logic to detect and filter the "Expect: 100-continue" header. The changes only remove the ProxyContinueProtocolHandler and simplify the createHttpClient() method to call super.createHttpClient(), and add an empty service() method override. This is a significant discrepancy between the PR description and the actual implementation. Please either update the PR description to accurately reflect what was changed, or add the missing NoExpectRequestWrapper implementation if it was intended to be part of this PR. ########## pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java: ########## @@ -322,6 +248,12 @@ private String getWebServiceUrl() throws PulsarServerException { } } + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + super.service(request, response); + } + Review Comment: The service() method override is empty and simply calls super.service(request, response). This appears to serve no functional purpose and should be removed unless there's a specific reason for its presence that isn't evident from the code. ```suggestion ``` ########## pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java: ########## @@ -117,84 +112,15 @@ class AdminProxyHandler extends ProxyServlet { @Override protected HttpClient createHttpClient() throws ServletException { - ServletConfig config = getServletConfig(); - - HttpClient client = newHttpClient(); - - client.setFollowRedirects(true); - - // Must not store cookies, otherwise cookies of different clients will mix. - client.setHttpCookieStore(new HttpCookieStore.Empty()); - - Executor executor; - String value = config.getInitParameter("maxThreads"); - if (value == null || "-".equals(value)) { - executor = (Executor) getServletContext().getAttribute("org.eclipse.jetty.server.Executor"); - if (executor == null) { - throw new IllegalStateException("No server executor for proxy"); - } - } else { - QueuedThreadPool qtp = new QueuedThreadPool(Integer.parseInt(value)); - String servletName = config.getServletName(); - int dot = servletName.lastIndexOf('.'); - if (dot >= 0) { - servletName = servletName.substring(dot + 1); - } - qtp.setName(servletName); - executor = qtp; - } - - client.setExecutor(executor); - - value = config.getInitParameter("maxConnections"); - if (value == null) { - value = "256"; - } - client.setMaxConnectionsPerDestination(Integer.parseInt(value)); - - value = config.getInitParameter("idleTimeout"); - if (value == null) { - value = "30000"; - } - client.setIdleTimeout(Long.parseLong(value)); - - value = config.getInitParameter(INIT_PARAM_REQUEST_BUFFER_SIZE); - if (value != null) { - client.setRequestBufferSize(Integer.parseInt(value)); - } - - value = config.getInitParameter("responseBufferSize"); - if (value != null){ - client.setResponseBufferSize(Integer.parseInt(value)); - } - - try { - client.start(); - - // Content must not be decoded, otherwise the client gets confused. - // Allow encoded content, such as "Content-Encoding: gzip", to pass through without decoding it. - client.getContentDecoderFactories().clear(); - - // Pass traffic to the client, only intercept what's necessary. - ProtocolHandlers protocolHandlers = client.getProtocolHandlers(); - protocolHandlers.clear(); - protocolHandlers.put(new RedirectProtocolHandler(client)); - protocolHandlers.put(new ProxyContinueProtocolHandler()); - - return client; - } catch (Exception x) { - throw new ServletException(x); - } + HttpClient httpClient = super.createHttpClient(); + customizeHttpClient(httpClient); + return httpClient; } - class ProxyContinueProtocolHandler extends ContinueProtocolHandler { - - @Override - protected Runnable onContinue(Request request) { - HttpServletRequest clientRequest = - (HttpServletRequest) request.getAttributes().get(CLIENT_REQUEST_ATTRIBUTE); - return AdminProxyHandler.this.onContinue(clientRequest, request); - } + private void customizeHttpClient(HttpClient httpClient) { + httpClient.setFollowRedirects(true); + ProtocolHandlers protocolHandlers = httpClient.getProtocolHandlers(); Review Comment: The key change in createHttpClient() is that the old implementation called protocolHandlers.clear() which removed all default Jetty protocol handlers, then added back only RedirectProtocolHandler and a custom ProxyContinueProtocolHandler. The new implementation does NOT clear the protocol handlers, which preserves Jetty's default protocol handlers (including proper handling of "Expect: 100-continue"). This is the actual fix for the Early EOF issue, not the NoExpectRequestWrapper mentioned in the PR description. While the fix appears sound, the PR description is misleading about the implementation approach. ########## pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.packages.management.core; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import lombok.Cleanup; +import org.testng.annotations.Test; + +public class MockedPackagesStorageTest { + + @Test + public void testWriteAndRead() throws Exception { + PackagesStorageProvider provider = new MockedPackagesStorageProvider(); + PackagesStorage storage = provider.getStorage(mock(PackagesStorageConfiguration.class)); + storage.initialize(); + + // Test data + byte[] testBytes = new byte[1 * 1024 * 1024]; + + // Write + storage.writeAsync("test/path", new ByteArrayInputStream(testBytes)); Review Comment: Potential race condition: The writeAsync() call on line 40 does not wait for completion (missing .get()), but the readAsync() call immediately follows on line 45. This could cause the test to fail intermittently if the read happens before the write completes. Add .get() to line 40 to ensure the write completes before attempting to read. ```suggestion storage.writeAsync("test/path", new ByteArrayInputStream(testBytes)).get(); ``` ########## pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.packages.management.core; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import lombok.Cleanup; +import org.testng.annotations.Test; + +public class MockedPackagesStorageTest { + + @Test + public void testWriteAndRead() throws Exception { + PackagesStorageProvider provider = new MockedPackagesStorageProvider(); + PackagesStorage storage = provider.getStorage(mock(PackagesStorageConfiguration.class)); + storage.initialize(); + + // Test data + byte[] testBytes = new byte[1 * 1024 * 1024]; Review Comment: The test creates a byte array of all zeros (new byte[1 * 1024 * 1024]) which may not adequately test the buffered reading implementation. Consider initializing the array with varied data (e.g., using Random or a pattern) to better validate that the buffer-based read/write correctly preserves the entire data stream, especially at buffer boundaries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
