This is an automated email from the ASF dual-hosted git repository. kwin pushed a commit to branch feature/jetty-12.1 in repository https://gitbox.apache.org/repos/asf/maven-resolver.git
commit c54dcec187b87c1fbac43921b2209922f3726018 Author: Konrad Windszus <[email protected]> AuthorDate: Tue Jan 13 19:16:46 2026 +0100 Update to Jetty 12.1 This affects both Server used for Tests and Jetty Transport (for HTTP) This increases Java requirements to 17 for Jetty Transport --- maven-resolver-test-http/pom.xml | 14 +- .../aether/internal/test/util/http/HttpServer.java | 252 +++++++++---------- maven-resolver-transport-jetty/pom.xml | 8 +- .../transport/jetty/JettyRFC9457Reporter.java | 4 +- .../aether/transport/jetty/JettyTransporter.java | 18 +- .../transport/jetty/PutTaskRequestContent.java | 280 ++++++++++++++++----- .../transport/jetty/JettyTransporterTest.java | 5 + pom.xml | 4 +- 8 files changed, 366 insertions(+), 219 deletions(-) diff --git a/maven-resolver-test-http/pom.xml b/maven-resolver-test-http/pom.xml index af6d7ca63..9617490fa 100644 --- a/maven-resolver-test-http/pom.xml +++ b/maven-resolver-test-http/pom.xml @@ -32,7 +32,7 @@ <description>A collection of utility classes to ease testing of HTTP transports.</description> <properties> - <javaVersion>11</javaVersion> + <javaVersion>17</javaVersion> </properties> <dependencies> @@ -75,7 +75,16 @@ </dependency> <dependency> <groupId>org.eclipse.jetty.http2</groupId> - <artifactId>http2-server</artifactId> + <artifactId>jetty-http2-server</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty.compression</groupId> + <artifactId>jetty-compression-server</artifactId> + </dependency> + <dependency> + <groupId>jakarta.servlet</groupId> + <artifactId>jakarta.servlet-api</artifactId> + <version>6.1.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> @@ -84,7 +93,6 @@ <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> - <version>${slf4jVersion}</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> diff --git a/maven-resolver-test-http/src/main/java/org/eclipse/aether/internal/test/util/http/HttpServer.java b/maven-resolver-test-http/src/main/java/org/eclipse/aether/internal/test/util/http/HttpServer.java index b08270b43..0ca20510c 100644 --- a/maven-resolver-test-http/src/main/java/org/eclipse/aether/internal/test/util/http/HttpServer.java +++ b/maven-resolver-test-http/src/main/java/org/eclipse/aether/internal/test/util/http/HttpServer.java @@ -18,36 +18,38 @@ */ package org.eclipse.aether.internal.test.util.http; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; import java.util.Collections; -import java.util.Enumeration; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import com.google.gson.Gson; +import jakarta.servlet.http.HttpServletResponse; import org.eclipse.aether.internal.impl.checksum.Sha1ChecksumAlgorithmFactory; import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper; import org.eclipse.aether.spi.connector.transport.http.RFC9457.RFC9457Payload; import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory; +import org.eclipse.jetty.http.DateGenerator; +import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Request; @@ -56,10 +58,8 @@ import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; -import org.eclipse.jetty.server.handler.AbstractHandler; -import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; -import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -287,20 +287,18 @@ public class HttpServer { return this; } - HandlerList handlers = new HandlerList(); - handlers.addHandler(new ConnectionClosingHandler()); - handlers.addHandler(new ServerErrorHandler()); - handlers.addHandler(new LogHandler()); - handlers.addHandler(new ProxyAuthHandler()); - handlers.addHandler(new AuthHandler()); - handlers.addHandler(new RedirectHandler()); - handlers.addHandler(new RepoHandler()); - handlers.addHandler(new RFC9457Handler()); - server = new Server(); httpConnector = new ServerConnector(server); server.addConnector(httpConnector); - server.setHandler(handlers); + server.setHandler(new Handler.Sequence( + new ConnectionClosingHandler(), + new ServerErrorHandler(), + new LogHandler(), + new ProxyAuthHandler(), + new AuthHandler(), + new RedirectHandler(), + new RepoHandler(), + new RFC9457Handler())); server.start(); return this; @@ -315,152 +313,142 @@ public class HttpServer { } } - private class ConnectionClosingHandler extends AbstractHandler { + private class ConnectionClosingHandler extends Handler.Abstract { + @Override - public void handle(String target, Request req, HttpServletRequest request, HttpServletResponse response) { + public boolean handle(Request request, Response response, Callback callback) throws Exception { if (connectionsToClose.getAndDecrement() > 0) { - Response jettyResponse = (Response) response; - jettyResponse.getHttpChannel().getConnection().close(); + request.getConnectionMetaData().getConnection().close(); } + return false; } } - private class ServerErrorHandler extends AbstractHandler { + private class ServerErrorHandler extends Handler.Abstract { @Override - public void handle(String target, Request req, HttpServletRequest request, HttpServletResponse response) - throws IOException { + public boolean handle(Request request, Response response, Callback callback) throws IOException { if (serverErrorsBeforeWorks.getAndDecrement() > 0) { response.setStatus(serverErrorStatusCode); - writeResponseBodyMessage(response, "Oops, come back later!"); + writeResponseBodyMessage(request, response, "Oops, come back later!"); + callback.succeeded(); + return true; } + return false; } } - private class LogHandler extends AbstractHandler { + private class LogHandler extends Handler.Abstract { @Override - public void handle(String target, Request req, HttpServletRequest request, HttpServletResponse response) { + public boolean handle(Request req, Response response, Callback callback) { LOGGER.info( "{} {}{}", req.getMethod(), - req.getRequestURL(), - req.getQueryString() != null ? "?" + req.getQueryString() : ""); + req.getHttpURI().getDecodedPath(), + req.getHttpURI().getQuery() != null ? "?" + req.getHttpURI().getQuery() : ""); Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - for (Enumeration<String> en = req.getHeaderNames(); en.hasMoreElements(); ) { - String name = en.nextElement(); - StringBuilder buffer = new StringBuilder(128); - for (Enumeration<String> ien = req.getHeaders(name); ien.hasMoreElements(); ) { - if (buffer.length() > 0) { - buffer.append(", "); - } - buffer.append(ien.nextElement()); - } - headers.put(name, buffer.toString()); + for (HttpField header : req.getHeaders()) { + headers.put(header.getName(), header.getValueList().stream().collect(Collectors.joining(", "))); } - logEntries.add(new LogEntry(req.getMethod(), req.getOriginalURI(), Collections.unmodifiableMap(headers))); + logEntries.add(new LogEntry( + req.getMethod(), req.getHttpURI().getPathQuery(), Collections.unmodifiableMap(headers))); + return false; } } private static final Pattern SIMPLE_RANGE = Pattern.compile("bytes=([0-9])+-"); - private class RepoHandler extends AbstractHandler { + private class RepoHandler extends Handler.Abstract { @Override - public void handle(String target, Request req, HttpServletRequest request, HttpServletResponse response) - throws IOException { - String path = req.getPathInfo().substring(1); + public boolean handle(Request req, Response response, Callback callback) throws Exception { + String path = req.getHttpURI().getDecodedPath().substring(1); if (!path.startsWith("repo/")) { - return; + return false; } - req.setHandled(true); - if (ExpectContinue.FAIL.equals(expectContinue) && request.getHeader(HttpHeader.EXPECT.asString()) != null) { + if (ExpectContinue.FAIL.equals(expectContinue) && req.getHeaders().get(HttpHeader.EXPECT) != null) { response.setStatus(HttpServletResponse.SC_EXPECTATION_FAILED); - writeResponseBodyMessage(response, "Expectation was set to fail"); - return; + writeResponseBodyMessage(req, response, "Expectation was set to fail"); + callback.succeeded(); + return true; } File file = new File(repoDir, path.substring(5)); if (HttpMethod.GET.is(req.getMethod()) || HttpMethod.HEAD.is(req.getMethod())) { - if (!file.isFile() || path.endsWith(URIUtil.SLASH)) { + if (!file.isFile() || path.endsWith("/")) { response.setStatus(HttpServletResponse.SC_NOT_FOUND); - writeResponseBodyMessage(response, "Not found"); - return; + writeResponseBodyMessage(req, response, "Not found"); + callback.succeeded(); + return true; } - long ifUnmodifiedSince = request.getDateHeader(HttpHeader.IF_UNMODIFIED_SINCE.asString()); + long ifUnmodifiedSince = req.getHeaders().getDateField(HttpHeader.IF_UNMODIFIED_SINCE); if (ifUnmodifiedSince != -1L && file.lastModified() > ifUnmodifiedSince) { response.setStatus(HttpServletResponse.SC_PRECONDITION_FAILED); - writeResponseBodyMessage(response, "Precondition failed"); - return; + writeResponseBodyMessage(req, response, "Precondition failed"); + callback.succeeded(); + return true; } long offset = 0L; - String range = request.getHeader(HttpHeader.RANGE.asString()); + String range = req.getHeaders().get(HttpHeader.RANGE); if (range != null && rangeSupport) { Matcher m = SIMPLE_RANGE.matcher(range); if (m.matches()) { offset = Long.parseLong(m.group(1)); if (offset >= file.length()) { response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE); - writeResponseBodyMessage(response, "Range not satisfiable"); - return; + writeResponseBodyMessage(req, response, "Range not satisfiable"); + callback.succeeded(); + return true; } } - String encoding = request.getHeader(HttpHeader.ACCEPT_ENCODING.asString()); + String encoding = req.getHeaders().get(HttpHeader.ACCEPT_ENCODING); if ((encoding != null && !"identity".equals(encoding)) || ifUnmodifiedSince == -1L) { response.setStatus(HttpServletResponse.SC_BAD_REQUEST); - return; + callback.succeeded(); + return true; } } response.setStatus((offset > 0L) ? HttpServletResponse.SC_PARTIAL_CONTENT : HttpServletResponse.SC_OK); - response.setDateHeader(HttpHeader.LAST_MODIFIED.asString(), file.lastModified()); - response.setHeader(HttpHeader.CONTENT_LENGTH.asString(), Long.toString(file.length() - offset)); + response.getHeaders().add(HttpHeader.LAST_MODIFIED, DateGenerator.formatDate(file.lastModified())); + response.getHeaders().add(HttpHeader.CONTENT_LENGTH, Long.toString(file.length() - offset)); if (offset > 0L) { - response.setHeader( - HttpHeader.CONTENT_RANGE.asString(), - "bytes " + offset + "-" + (file.length() - 1L) + "/" + file.length()); + response.getHeaders() + .add( + HttpHeader.CONTENT_RANGE, + "bytes " + offset + "-" + (file.length() - 1L) + "/" + file.length()); } if (checksumHeader != null) { Map<String, String> checksums = ChecksumAlgorithmHelper.calculate( file, Collections.singletonList(new Sha1ChecksumAlgorithmFactory())); if (checksumHeader == ChecksumHeader.NEXUS) { - response.setHeader(HttpHeader.ETAG.asString(), "{SHA1{" + checksums.get("SHA-1") + "}}"); + response.getHeaders().add(HttpHeader.ETAG.asString(), "{SHA1{" + checksums.get("SHA-1") + "}}"); } else if (checksumHeader == ChecksumHeader.XCHECKSUM) { - response.setHeader("x-checksum-sha1", checksums.get(Sha1ChecksumAlgorithmFactory.NAME)); + response.getHeaders().add("x-checksum-sha1", checksums.get(Sha1ChecksumAlgorithmFactory.NAME)); } } if (HttpMethod.HEAD.is(req.getMethod())) { - return; + callback.succeeded(); + return true; } - try (FileInputStream is = new FileInputStream(file)) { + try (FileInputStream is = new FileInputStream(file); + OutputStream os = Response.asBufferedOutputStream(req, response)) { if (offset > 0L) { long skipped = is.skip(offset); while (skipped < offset && is.read() >= 0) { skipped++; } } - IO.copy(is, response.getOutputStream()); + IO.copy(is, os); } } else if (HttpMethod.PUT.is(req.getMethod())) { if (!webDav) { file.getParentFile().mkdirs(); } if (file.getParentFile().exists()) { - try { - FileOutputStream os = null; - try { - os = new FileOutputStream(file); - IO.copy(request.getInputStream(), os); - os.close(); - os = null; - } finally { - try { - if (os != null) { - os.close(); - } - } catch (final IOException e) { - // Suppressed due to an exception already thrown in the try block. - } - } + try (InputStream is = Content.Source.asInputStream(req); + FileOutputStream os = new FileOutputStream(file)) { + IO.copy(is, os); } catch (IOException e) { file.delete(); throw e; @@ -471,9 +459,9 @@ public class HttpServer { } } else if (HttpMethod.OPTIONS.is(req.getMethod())) { if (webDav) { - response.setHeader("DAV", "1,2"); + response.getHeaders().add("DAV", "1,2"); } - response.setHeader(HttpHeader.ALLOW.asString(), "GET, PUT, HEAD, OPTIONS"); + response.getHeaders().add(HttpHeader.ALLOW, "GET, PUT, HEAD, OPTIONS"); response.setStatus(HttpServletResponse.SC_OK); } else if (webDav && "MKCOL".equals(req.getMethod())) { if (file.exists()) { @@ -486,33 +474,29 @@ public class HttpServer { } else { response.setStatus(HttpServletResponse.SC_METHOD_NOT_ALLOWED); } + callback.succeeded(); + return true; } } - private void writeResponseBodyMessage(HttpServletResponse response, String message) throws IOException { - try (OutputStream outputStream = response.getOutputStream()) { + private void writeResponseBodyMessage(Request request, Response response, String message) throws IOException { + try (OutputStream outputStream = Response.asBufferedOutputStream(request, response)) { outputStream.write(message.getBytes(StandardCharsets.UTF_8)); } } - private class RFC9457Handler extends AbstractHandler { + private class RFC9457Handler extends Handler.Abstract { @Override - public void handle( - final String target, - final Request req, - final HttpServletRequest request, - final HttpServletResponse response) - throws IOException, ServletException { - String path = req.getPathInfo().substring(1); + public boolean handle(Request req, Response response, Callback callback) throws Exception { + String path = req.getHttpURI().getPath().substring(1); if (!path.startsWith("rfc9457/")) { - return; + return false; } - req.setHandled(true); if (HttpMethod.GET.is(req.getMethod())) { response.setStatus(HttpServletResponse.SC_FORBIDDEN); - response.setHeader(HttpHeader.CONTENT_TYPE.asString(), "application/problem+json"); + response.getHeaders().add(HttpHeader.CONTENT_TYPE.asString(), "application/problem+json"); RFC9457Payload rfc9457Payload; if (path.endsWith("missing_fields.txt")) { rfc9457Payload = new RFC9457Payload(null, null, null, null, null); @@ -524,8 +508,10 @@ public class HttpServer { "Your current balance is 30, but that costs 50.", URI.create("/account/12345/msgs/abc")); } - writeResponseBodyMessage(response, buildRFC9457Message(rfc9457Payload)); + writeResponseBodyMessage(req, response, buildRFC9457Message(rfc9457Payload)); } + callback.succeeded(); + return true; } } @@ -533,64 +519,70 @@ public class HttpServer { return new Gson().toJson(payload, RFC9457Payload.class); } - private class RedirectHandler extends AbstractHandler { + private class RedirectHandler extends Handler.Abstract { @Override - public void handle(String target, Request req, HttpServletRequest request, HttpServletResponse response) { - String path = req.getPathInfo(); + public boolean handle(Request req, Response response, Callback callback) throws Exception { + String path = req.getHttpURI().getPath(); if (!path.startsWith("/redirect/")) { - return; + return false; } - req.setHandled(true); StringBuilder location = new StringBuilder(128); - String scheme = req.getParameter("scheme"); - location.append(scheme != null ? scheme : req.getScheme()); + String scheme = Request.getParameters(req).getValue("scheme"); + location.append(scheme != null ? scheme : req.getHttpURI().getScheme()); location.append("://"); - location.append(req.getServerName()); + location.append(Request.getServerName(req)); location.append(":"); if ("http".equalsIgnoreCase(scheme)) { location.append(getHttpPort()); } else if ("https".equalsIgnoreCase(scheme)) { location.append(getHttpsPort()); } else { - location.append(req.getServerPort()); + location.append(Request.getServerPort(req)); } location.append("/repo").append(path.substring(9)); - response.setStatus(HttpServletResponse.SC_MOVED_PERMANENTLY); - response.setHeader(HttpHeader.LOCATION.asString(), location.toString()); + Response.sendRedirect( + req, response, callback, HttpServletResponse.SC_MOVED_PERMANENTLY, location.toString(), false); + callback.succeeded(); + return true; } } - private class AuthHandler extends AbstractHandler { + private class AuthHandler extends Handler.Abstract { @Override - public void handle(String target, Request req, HttpServletRequest request, HttpServletResponse response) - throws IOException { + public boolean handle(Request request, Response response, Callback callback) throws Exception { if (ExpectContinue.BROKEN.equals(expectContinue) - && "100-continue".equalsIgnoreCase(request.getHeader(HttpHeader.EXPECT.asString()))) { - request.getInputStream(); + && "100-continue".equalsIgnoreCase(request.getHeaders().get(HttpHeader.EXPECT))) { + // TODO: what is this for? + Request.asInputStream(request); } if (username != null && password != null) { - if (checkBasicAuth(request.getHeader(HttpHeader.AUTHORIZATION.asString()), username, password)) { - return; + if (checkBasicAuth(request.getHeaders().get(HttpHeader.AUTHORIZATION), username, password)) { + return false; } - req.setHandled(true); - response.setHeader(HttpHeader.WWW_AUTHENTICATE.asString(), "basic realm=\"Test-Realm\""); + response.getHeaders().add(HttpHeader.WWW_AUTHENTICATE, "Basic realm=\"Test-Realm\""); response.setStatus(HttpServletResponse.SC_UNAUTHORIZED); + callback.succeeded(); + return true; } + return false; } } - private class ProxyAuthHandler extends AbstractHandler { + private class ProxyAuthHandler extends Handler.Abstract { @Override - public void handle(String target, Request req, HttpServletRequest request, HttpServletResponse response) { + public boolean handle(Request req, Response response, Callback callback) throws Exception { if (proxyUsername != null && proxyPassword != null) { if (checkBasicAuth( - request.getHeader(HttpHeader.PROXY_AUTHORIZATION.asString()), proxyUsername, proxyPassword)) { - return; + req.getHeaders().get(HttpHeader.PROXY_AUTHORIZATION), proxyUsername, proxyPassword)) { + return false; } - req.setHandled(true); - response.setHeader(HttpHeader.PROXY_AUTHENTICATE.asString(), "basic realm=\"Test-Realm\""); + response.getHeaders().add(HttpHeader.PROXY_AUTHENTICATE, "basic realm=\"Test-Realm\""); response.setStatus(HttpServletResponse.SC_PROXY_AUTHENTICATION_REQUIRED); + callback.succeeded(); + return true; + } else { + return false; } } } diff --git a/maven-resolver-transport-jetty/pom.xml b/maven-resolver-transport-jetty/pom.xml index 8925783f7..c398e2901 100644 --- a/maven-resolver-transport-jetty/pom.xml +++ b/maven-resolver-transport-jetty/pom.xml @@ -30,10 +30,10 @@ <packaging>jar</packaging> <name>Maven Artifact Resolver Transport Jetty</name> - <description>Maven Artifact Transport Jetty.</description> + <description>Maven Artifact Transport based on Jetty 12.1.</description> <properties> - <javaVersion>11</javaVersion> + <javaVersion>17</javaVersion> </properties> <dependencies> @@ -66,11 +66,11 @@ </dependency> <dependency> <groupId>org.eclipse.jetty.http2</groupId> - <artifactId>http2-client</artifactId> + <artifactId>jetty-http2-client</artifactId> </dependency> <dependency> <groupId>org.eclipse.jetty.http2</groupId> - <artifactId>http2-http-client-transport</artifactId> + <artifactId>jetty-http2-client-transport</artifactId> </dependency> <dependency> diff --git a/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/JettyRFC9457Reporter.java b/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/JettyRFC9457Reporter.java index 1defcf471..2c5e5ca39 100644 --- a/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/JettyRFC9457Reporter.java +++ b/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/JettyRFC9457Reporter.java @@ -27,8 +27,8 @@ import java.util.concurrent.TimeoutException; import org.eclipse.aether.spi.connector.transport.http.HttpTransporterException; import org.eclipse.aether.spi.connector.transport.http.RFC9457.RFC9457Reporter; -import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.util.InputStreamResponseListener; +import org.eclipse.jetty.client.InputStreamResponseListener; +import org.eclipse.jetty.client.Response; import org.eclipse.jetty.http.HttpHeader; public class JettyRFC9457Reporter extends RFC9457Reporter<InputStreamResponseListener, HttpTransporterException> { diff --git a/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/JettyTransporter.java b/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/JettyTransporter.java index a4fc66885..3a90132a7 100644 --- a/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/JettyTransporter.java +++ b/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/JettyTransporter.java @@ -55,18 +55,18 @@ import org.eclipse.aether.spi.io.PathProcessor; import org.eclipse.aether.transfer.NoTransporterException; import org.eclipse.aether.transfer.TransferCancelledException; import org.eclipse.aether.util.ConfigUtils; +import org.eclipse.jetty.client.Authentication; +import org.eclipse.jetty.client.BasicAuthentication; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpProxy; -import org.eclipse.jetty.client.api.Authentication; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; -import org.eclipse.jetty.client.http.HttpClientConnectionFactory; -import org.eclipse.jetty.client.util.BasicAuthentication; -import org.eclipse.jetty.client.util.InputStreamResponseListener; +import org.eclipse.jetty.client.InputStreamResponseListener; +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.client.Response; +import org.eclipse.jetty.client.transport.HttpClientConnectionFactory; +import org.eclipse.jetty.client.transport.HttpClientTransportDynamic; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http2.client.HTTP2Client; -import org.eclipse.jetty.http2.client.http.ClientConnectionFactoryOverHTTP2; +import org.eclipse.jetty.http2.client.transport.ClientConnectionFactoryOverHTTP2; import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -353,7 +353,7 @@ final class JettyTransporter extends AbstractTransporter implements HttpTranspor if (preemptiveAuth || preemptivePutAuth) { mayApplyPreemptiveAuth(request); } - request.body(new PutTaskRequestContent(task)); + request.body(PutTaskRequestContent.from(task)); AtomicBoolean started = new AtomicBoolean(false); Response response; try { diff --git a/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/PutTaskRequestContent.java b/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/PutTaskRequestContent.java index b14d29c17..361ed92be 100644 --- a/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/PutTaskRequestContent.java +++ b/maven-resolver-transport-jetty/src/main/java/org/eclipse/aether/transport/jetty/PutTaskRequestContent.java @@ -18,118 +18,260 @@ */ package org.eclipse.aether.transport.jetty; -import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; import java.nio.channels.Channels; +import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.StandardOpenOption; +import java.util.Objects; import org.eclipse.aether.spi.connector.transport.PutTask; -import org.eclipse.jetty.client.util.AbstractRequestContent; +import org.eclipse.jetty.client.ByteBufferRequestContent; +import org.eclipse.jetty.client.Request; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.internal.ByteChannelContentSource; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.SerializedInvoker; -class PutTaskRequestContent extends AbstractRequestContent { - private final PutTask putTask; - private final int bufferSize; - private ByteBufferPool bufferPool; - private boolean useDirectByteBuffers = true; +/** + * Heavily inspired by Jetty's org.eclipse.jetty.io.internal.ByteChannelContentSource but adjusted to deal with + * ReadableByteChannels. + * Also Jetty's ByteChannelContentSource is an internal package so should not be used directly. + */ +public class PutTaskRequestContent extends ByteBufferRequestContent implements Request.Content { - PutTaskRequestContent(PutTask putTask) { - this(putTask, 4096); + public static Request.Content from(PutTask putTask) throws IOException { + ReadableByteChannel channel; + if (putTask.getDataPath() != null) { + channel = Files.newByteChannel(putTask.getDataPath(), StandardOpenOption.READ); + } else { + // TODO: support rewind for retries when using InputStream + channel = Channels.newChannel(putTask.newInputStream()); + } + return new PutTaskRequestContent(null, channel, 0L, putTask.getDataLength()); } - PutTaskRequestContent(PutTask putTask, int bufferSize) { - super("application/octet-stream"); - this.putTask = putTask; - this.bufferSize = bufferSize; + private final AutoLock lock = new AutoLock(); + private final SerializedInvoker invoker = new SerializedInvoker(ByteChannelContentSource.class); + private final ByteBufferPool.Sized byteBufferPool; + private ReadableByteChannel byteChannel; + private final long offset; + private final long length; + private RetainableByteBuffer buffer; + private long offsetRemaining; + private long totalRead; + private Runnable demandCallback; + private Content.Chunk terminal; + + /** + * Create a {@link ByteChannelContentSource} which reads from a {@link ByteChannel}. + * @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers. + * @param byteChannel The {@link ByteChannel}s to use as the source. + */ + protected PutTaskRequestContent(ByteBufferPool.Sized byteBufferPool, ReadableByteChannel byteChannel) { + this(byteBufferPool, byteChannel, 0L, -1L); } - @Override - public long getLength() { - return putTask.getDataLength(); + /** + * Create a {@link ByteChannelContentSource} which reads from a {@link ByteChannel}. + * If the {@link ByteChannel} is an instance of {@link SeekableByteChannel} the implementation will use + * {@link SeekableByteChannel#position(long)} to navigate to the starting offset. + * @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers. + * @param byteChannel The {@link ByteChannel}s to use as the source. + * @param offset the offset byte of the content to start from. + * Must be greater than or equal to 0 and less than the content length (if known). + * @param length the length of the content to make available, -1 for the full length. + * If the size of the content is known, the length may be truncated to the content size minus the offset. + * @throws IndexOutOfBoundsException if the offset or length are out of range. + * @see TypeUtil#checkOffsetLengthSize(long, long, long) + */ + protected PutTaskRequestContent( + ByteBufferPool.Sized byteBufferPool, ReadableByteChannel byteChannel, long offset, long length) { + this.byteBufferPool = Objects.requireNonNullElse(byteBufferPool, ByteBufferPool.SIZED_NON_POOLING); + this.byteChannel = byteChannel; + this.offset = offset; + this.length = TypeUtil.checkOffsetLengthSize(offset, length, -1L); + offsetRemaining = offset; } - @Override - public boolean isReproducible() { - return true; + protected ReadableByteChannel open() throws IOException { + return byteChannel; } - public ByteBufferPool getByteBufferPool() { - return bufferPool; + @Override + public void demand(Runnable demandCallback) { + try (AutoLock ignored = lock.lock()) { + if (this.demandCallback != null) { + throw new IllegalStateException("demand pending"); + } + this.demandCallback = demandCallback; + } + invoker.run(this::invokeDemandCallback); } - public void setByteBufferPool(ByteBufferPool byteBufferPool) { - this.bufferPool = byteBufferPool; + private void invokeDemandCallback() { + Runnable demandCallback; + try (AutoLock ignored = lock.lock()) { + demandCallback = this.demandCallback; + this.demandCallback = null; + } + if (demandCallback != null) { + ExceptionUtil.run(demandCallback, this::fail); + } } - public boolean isUseDirectByteBuffers() { - return useDirectByteBuffers; + protected void lockedSetTerminal(Content.Chunk terminal) { + assert lock.isHeldByCurrentThread(); + if (terminal == null) { + terminal = Objects.requireNonNull(terminal); + } else { + ExceptionUtil.addSuppressedIfNotAssociated(terminal.getFailure(), terminal.getFailure()); + } + IO.close(byteChannel); + if (buffer != null) { + buffer.release(); + } + buffer = null; } - public void setUseDirectByteBuffers(boolean useDirectByteBuffers) { - this.useDirectByteBuffers = useDirectByteBuffers; + private void lockedEnsureOpenOrTerminal() { + assert lock.isHeldByCurrentThread(); + if (terminal == null && (byteChannel == null || !byteChannel.isOpen())) { + try { + byteChannel = open(); + if (byteChannel == null || !byteChannel.isOpen()) { + lockedSetTerminal(Content.Chunk.from(new ClosedChannelException(), true)); + } else if (byteChannel instanceof SeekableByteChannel) { + ((SeekableByteChannel) byteChannel).position(offset); + offsetRemaining = 0; + } + } catch (IOException e) { + lockedSetTerminal(Content.Chunk.from(e, true)); + } + } } @Override - protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent) { - return new SubscriptionImpl(consumer, emitInitialContent); - } + public Content.Chunk read() { + try (AutoLock ignored = lock.lock()) { + lockedEnsureOpenOrTerminal(); - private class SubscriptionImpl extends AbstractSubscription { - private ReadableByteChannel channel; - private long readTotal; + if (terminal != null) { + return terminal; + } - private SubscriptionImpl(Consumer consumer, boolean emitInitialContent) { - super(consumer, emitInitialContent); - } + if (length == 0) { + lockedSetTerminal(Content.Chunk.EOF); + return Content.Chunk.EOF; + } + + if (buffer == null) { + buffer = byteBufferPool.acquire(); + } else if (buffer.isRetained()) { + buffer.release(); + buffer = byteBufferPool.acquire(); + } + + try { + ByteBuffer byteBuffer = buffer.getByteBuffer(); + if (offsetRemaining > 0) { + // Discard all bytes read until we reach the staring offset. + while (offsetRemaining > 0) { + BufferUtil.clearToFill(byteBuffer); + byteBuffer.limit((int) Math.min(buffer.capacity(), offsetRemaining)); + int read = byteChannel.read(byteBuffer); + if (read < 0) { + lockedSetTerminal(Content.Chunk.EOF); + return terminal; + } + if (read == 0) { + return null; + } + + offsetRemaining -= read; + } + } - @Override - protected boolean produceContent(Producer producer) throws IOException { - ByteBuffer buffer; - boolean last; - if (channel == null) { - if (putTask.getDataPath() != null) { - channel = Files.newByteChannel(putTask.getDataPath(), StandardOpenOption.READ); - } else { - channel = Channels.newChannel(putTask.newInputStream()); + BufferUtil.clearToFill(byteBuffer); + if (length > 0) { + byteBuffer.limit((int) Math.min(buffer.capacity(), length - totalRead)); + } + int read = byteChannel.read(byteBuffer); + BufferUtil.flipToFlush(byteBuffer, 0); + if (read == 0) { + return null; } + if (read > 0) { + totalRead += read; + buffer.retain(); + if (length < 0 || totalRead < length) { + return Content.Chunk.asChunk(byteBuffer, false, buffer); + } + + Content.Chunk last = Content.Chunk.asChunk(byteBuffer, true, buffer); + lockedSetTerminal(Content.Chunk.EOF); + return last; + } + lockedSetTerminal(Content.Chunk.EOF); + } catch (Throwable t) { + lockedSetTerminal(Content.Chunk.from(t, true)); } + } + return terminal; + } - buffer = bufferPool == null - ? BufferUtil.allocate(bufferSize, isUseDirectByteBuffers()) - : bufferPool.acquire(bufferSize, isUseDirectByteBuffers()); + @Override + public void fail(Throwable failure) { + try (AutoLock ignored = lock.lock()) { + lockedSetTerminal(Content.Chunk.from(failure, true)); + } + } + + @Override + public long getLength() { + return length; + } - BufferUtil.clearToFill(buffer); - int read = channel.read(buffer); - BufferUtil.flipToFlush(buffer, 0); - if (!channel.isOpen() && read < 0) { - throw new EOFException("EOF reached for " + putTask); + @Override + public boolean rewind() { + try (AutoLock ignored = lock.lock()) { + // We can only rewind if we have a SeekableByteChannel. + if (!(byteChannel instanceof SeekableByteChannel)) { + return false; } - if (read > 0) { - readTotal += read; + + // We can remove terminal condition for a rewind that is likely to occur + if (terminal != null + && !Content.Chunk.isFailure(terminal) + && (byteChannel == null || byteChannel instanceof SeekableByteChannel)) { + terminal = null; } - last = readTotal == getLength(); - if (last) { - IO.close(channel); + + lockedEnsureOpenOrTerminal(); + if (terminal != null || byteChannel == null || !byteChannel.isOpen()) { + return false; } - return producer.produce(buffer, last, Callback.from(() -> release(buffer))); - } - private void release(ByteBuffer buffer) { - if (bufferPool != null) { - bufferPool.release(buffer); + try { + ((SeekableByteChannel) byteChannel).position(offset); + offsetRemaining = 0; + totalRead = 0; + return true; + } catch (Throwable t) { + lockedSetTerminal(Content.Chunk.from(t, true)); } - } - @Override - public void fail(Throwable failure) { - super.fail(failure); - IO.close(channel); + return true; } } } diff --git a/maven-resolver-transport-jetty/src/test/java/org/eclipse/aether/transport/jetty/JettyTransporterTest.java b/maven-resolver-transport-jetty/src/test/java/org/eclipse/aether/transport/jetty/JettyTransporterTest.java index 94bfaa92e..5f3c5f247 100644 --- a/maven-resolver-transport-jetty/src/test/java/org/eclipse/aether/transport/jetty/JettyTransporterTest.java +++ b/maven-resolver-transport-jetty/src/test/java/org/eclipse/aether/transport/jetty/JettyTransporterTest.java @@ -43,6 +43,11 @@ class JettyTransporterTest extends HttpTransporterTest { @Test protected void testPut_Unauthenticated() {} + @Override + @Disabled("Currently cannot retry PUT requests based on InputStreams") + @Test + protected void testPut_AuthCache() throws Exception {} + @Override @Disabled @Test diff --git a/pom.xml b/pom.xml index 17bd2226c..aa166e1f5 100644 --- a/pom.xml +++ b/pom.xml @@ -109,8 +109,8 @@ <testcontainersVersion>2.0.3</testcontainersVersion> <bouncycastleVersion>1.83</bouncycastleVersion> - <!-- Used by Jetty Transport (client) and Test HTTP (server) --> - <jettyVersion>10.0.26</jettyVersion> + <!-- Used by Jetty Transport (client) and Test HTTP (server), 12.1 introduced support for different compressions (https://github.com/jetty/jetty.project/issues/8769) --> + <jettyVersion>12.1.5</jettyVersion> <!-- used by supplier and demo only --> <maven3Version>3.9.12</maven3Version> <maven4Version>4.0.0-rc-5</maven4Version>
