This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit deca3219983b6f906f0d6b20f5c87277de028862 Author: Ian Maxon <[email protected]> AuthorDate: Wed Oct 29 11:26:23 2025 -0700 [ASTERIXDB-3636][API] Migrate UDF API to domain sockets - user model changes: yes - storage format changes: no - interface changes: no Details: Migrate the UDF API to use a domain socket, instead of being restricted to localhost on the normal NC servlet Change-Id: I5f8ac2170fd6b2beef14d99c38b9141af0f12ba3 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20492 Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> --- .../apache/asterix/translator/Receptionist.java | 8 +- asterixdb/asterix-app/pom.xml | 30 ++ .../api/http/server/AbstractNCUdfServlet.java | 276 ++++++++++++++++++- .../asterix/api/http/server/NCUdfApiServlet.java | 302 ++------------------- .../asterix/api/http/server/NCUdfDSApiServlet.java | 116 ++++++++ .../api/http/server/NCUdfRecoveryServlet.java | 18 +- .../asterix/hyracks/bootstrap/NCApplication.java | 48 +++- .../api/common/CloudStorageIntegrationUtil.java | 10 +- .../asterix/app/external/CloudUDFLibrarian.java | 112 ++++++++ .../asterix/app/external/ExternalUDFLibrarian.java | 13 +- .../app/external/IExternalUDFLibrarian.java | 12 +- .../test/cloud_storage/CloudPythonTest.java | 7 +- .../apache/asterix/test/common/TestExecutor.java | 13 +- .../src/test/resources/cc-cloud-storage-main.conf | 4 +- .../src/main/user-defined_function/udf.md | 4 +- .../asterix/test/podman/PodmanUDFLibrarian.java | 14 +- .../control/common/controllers/NCConfig.java | 8 + .../apache/hyracks/http/api/IServletRequest.java | 4 + .../apache/hyracks/http/server/BaseRequest.java | 19 ++ .../org/apache/hyracks/http/server/CLFLogger.java | 8 +- .../org/apache/hyracks/http/server/HttpServer.java | 95 ++++++- .../hyracks/http/server/HttpServerInitializer.java | 6 +- 22 files changed, 762 insertions(+), 365 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java index 524af430e6..12619a96fd 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.translator; -import java.net.InetSocketAddress; import java.util.UUID; import org.apache.asterix.common.api.IClientRequest; @@ -31,18 +30,15 @@ import org.apache.http.HttpHeaders; import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.http.api.IServletRequest; -import org.apache.hyracks.util.NetworkUtil; public class Receptionist implements IReceptionist { @Override public IRequestReference welcome(IServletRequest request) { final String uuid = UUID.randomUUID().toString(); - final InetSocketAddress localAddress = request.getLocalAddress(); - final RequestReference ref = - RequestReference.of(uuid, NetworkUtil.toHostPort(localAddress), System.currentTimeMillis()); + final RequestReference ref = RequestReference.of(uuid, request.getHostPort(), System.currentTimeMillis()); ref.setUserAgent(request.getHeader(HttpHeaders.USER_AGENT)); - ref.setRemoteAddr(NetworkUtil.toHostPort(request.getRemoteAddress())); + ref.setRemoteAddr(request.getRemotePort()); return ref; } diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index 3b27957944..c9486c6149 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -842,6 +842,36 @@ <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </dependency> + <dependency> + <groupId>org.apache.httpcomponents.client5</groupId> + <artifactId>httpclient5</artifactId> + <version>5.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.projectreactor.netty</groupId> + <artifactId>reactor-netty</artifactId> + <version>1.2.10</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport-classes-epoll</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-epoll</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-kqueue</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport-classes-kqueue</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.apache.asterix</groupId> <artifactId>asterix-lang-common</artifactId> diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java index 8c5a93b213..3f22014547 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java @@ -18,38 +18,71 @@ */ package org.apache.asterix.api.http.server; +import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER; import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA; import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON; +import static org.apache.asterix.common.library.LibraryDescriptor.FIELD_HASH; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.PrintWriter; +import java.net.InetAddress; import java.net.URI; import java.nio.file.Files; import java.nio.file.OpenOption; import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import org.apache.asterix.app.message.CreateLibraryRequestMessage; +import org.apache.asterix.app.message.DropLibraryRequestMessage; +import org.apache.asterix.app.message.InternalRequestResponse; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.api.IReceptionist; +import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.functions.ExternalFunctionLanguage; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.common.metadata.Namespace; +import org.apache.asterix.external.util.ExternalLibraryUtils; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.IFormattedException; -import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.common.work.SynchronizableWork; import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; import org.apache.hyracks.http.server.utils.HttpUtil; +import org.apache.hyracks.util.JSONUtil; +import org.apache.hyracks.util.file.FileUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; + +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpScheme; import io.netty.handler.codec.http.multipart.FileUpload; import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; import io.netty.handler.codec.http.multipart.InterfaceHttpData; @@ -61,8 +94,13 @@ public abstract class AbstractNCUdfServlet extends AbstractServlet { INCServiceContext srvCtx; protected final IApplicationContext plainAppCtx; - private final HttpScheme httpServerProtocol; - private final int httpServerPort; + protected final IReceptionist receptionist; + protected final int timeout; + protected ILibraryManager libraryManager; + protected Path workingDir; + protected String sysAuthHeader; + + private static final Logger LOGGER = LogManager.getLogger(); public static final String GET_UDF_DIST_ENDPOINT = "/dist"; public static final String TYPE_PARAMETER = "type"; @@ -90,15 +128,148 @@ public abstract class AbstractNCUdfServlet extends AbstractServlet { } - public AbstractNCUdfServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, - HttpScheme httpServerProtocol, int httpServerPort) { + public AbstractNCUdfServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx) { super(ctx, paths); this.plainAppCtx = appCtx; - this.httpServerProtocol = httpServerProtocol; - this.httpServerPort = httpServerPort; + this.timeout = appCtx.getExternalProperties().getLibraryDeployTimeout(); + this.receptionist = appCtx.getReceptionist(); + } + + public void init() throws IOException { + appCtx = (INcApplicationContext) plainAppCtx; + this.libraryManager = appCtx.getLibraryManager(); + srvCtx = this.appCtx.getServiceContext(); + workingDir = Paths.get(appCtx.getLibraryManager().getDistributionDir().getAbsolutePath()).normalize(); + initAuth(); + initStorage(); + } + + protected void initAuth() { + sysAuthHeader = (String) ctx.get(SYS_AUTH_HEADER); + } + + protected void initStorage() throws IOException { + // prepare working directory + if (Files.isDirectory(workingDir)) { + try { + FileUtils.cleanDirectory(workingDir.toFile()); + } catch (IOException e) { + LOGGER.warn("Could not clean directory: " + workingDir, e); + } + } else { + Files.deleteIfExists(workingDir); + FileUtil.forceMkdirs(workingDir.toFile()); + } + } + + protected enum LibraryOperation { + UPSERT, + DELETE + } + + protected Map<String, String> additionalHttpHeadersFromRequest(IServletRequest request) { + return Collections.emptyMap(); } - void readFromFile(Path filePath, IServletResponse response, String contentType, OpenOption opt) throws Exception { + protected void doCreate(Namespace libNamespace, String libraryName, ExternalFunctionLanguage language, String hash, + URI downloadURI, boolean replaceIfExists, String sysAuthHeader, IRequestReference requestReference, + IServletRequest request) throws Exception { + INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); + MessageFuture responseFuture = ncMb.registerMessageFuture(); + CreateLibraryRequestMessage req = new CreateLibraryRequestMessage(srvCtx.getNodeId(), + responseFuture.getFutureId(), libNamespace, libraryName, language, hash, downloadURI, replaceIfExists, + sysAuthHeader, requestReference, additionalHttpHeadersFromRequest(request)); + sendMessage(req, responseFuture); + } + + protected void doDrop(Namespace namespace, String libraryName, boolean replaceIfExists, + IRequestReference requestReference, IServletRequest request) throws Exception { + INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); + MessageFuture responseFuture = ncMb.registerMessageFuture(); + DropLibraryRequestMessage req = new DropLibraryRequestMessage(srvCtx.getNodeId(), responseFuture.getFutureId(), + namespace, libraryName, replaceIfExists, requestReference, additionalHttpHeadersFromRequest(request)); + sendMessage(req, responseFuture); + } + + private void sendMessage(ICcAddressedMessage requestMessage, MessageFuture responseFuture) throws Exception { + // Running on NC -> send 'execute' message to CC + INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); + InternalRequestResponse responseMsg; + try { + ncMb.sendMessageToPrimaryCC(requestMessage); + responseMsg = (InternalRequestResponse) responseFuture.get(timeout, TimeUnit.SECONDS); + } finally { + ncMb.deregisterMessageFuture(responseFuture.getFutureId()); + } + + Throwable err = responseMsg.getError(); + if (err != null) { + if (err instanceof Error) { + throw (Error) err; + } else if (err instanceof Exception) { + throw (Exception) err; + } else { + throw new Exception(err.toString(), err); + } + } + } + + protected void handleModification(IServletRequest request, IServletResponse response, LibraryOperation op) { + HttpRequest httpRequest = request.getHttpRequest(); + Path libraryTempFile = null; + FileOutputStream libTmpOut = null; + HttpPostRequestDecoder requestDecoder = null; + String localPath = localPath(request); + try { + Pair<Namespace, String> namespaceAndName = decodeDvAndLibFromLocalPath(localPath); + String libName = namespaceAndName.second; + Namespace libNamespace = namespaceAndName.first; + DataverseName libDv = libNamespace.getDataverseName(); + IRequestReference requestReference = receptionist.welcome(request); + if (op == LibraryOperation.UPSERT) { + requestDecoder = new HttpPostRequestDecoder(httpRequest); + LibraryUploadData uploadData = decodeMultiPartLibraryOptions(requestDecoder); + ExternalFunctionLanguage language = uploadData.type; + String fileExt = FilenameUtils.getExtension(uploadData.fileUpload.getFilename()); + MessageDigest digest = MessageDigest.getInstance("MD5"); + distributeLibrary(uploadData, libDv, libName, fileExt, language, digest, libNamespace, requestReference, + request); + } else if (op == LibraryOperation.DELETE) { + //DELETE semantics imply ifExists + doDrop(libNamespace, libName, false, requestReference, request); + } + response.setStatus(HttpResponseStatus.OK); + PrintWriter responseWriter = response.writer(); + String emptyJson = "{}"; + responseWriter.write(emptyJson); + responseWriter.flush(); + } catch (Exception e) { + writeException(e, response); + LOGGER.info("Error modifying library", e); + } finally { + if (requestDecoder != null) { + requestDecoder.destroy(); + } + try { + if (libraryTempFile != null) { + if (libTmpOut != null) { + libTmpOut.close(); + } + Files.deleteIfExists(libraryTempFile); + } + } catch (IOException e) { + LOGGER.warn("Could not delete temporary file " + libraryTempFile, e); + } + } + } + + protected void distributeLibrary(LibraryUploadData uploadData, DataverseName libDv, String libName, String fileExt, + ExternalFunctionLanguage language, MessageDigest digest, Namespace namespace, + IRequestReference requestReference, IServletRequest request) throws Exception { + } + + protected void readFromFile(Path filePath, IServletResponse response, String contentType, OpenOption opt) + throws Exception { class InputStreamGetter extends SynchronizableWork { private InputStream is; @@ -132,12 +303,6 @@ public abstract class AbstractNCUdfServlet extends AbstractServlet { return DATAVERSE_KEY; } - URI createDownloadURI(Path file) throws Exception { - String host = appCtx.getServiceContext().getAppConfig().getString(NCConfig.Option.PUBLIC_ADDRESS); - String path = paths[0].substring(0, servletPathLengths[0]) + GET_UDF_DIST_ENDPOINT + '/' + file.getFileName(); - return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null); - } - private boolean isNotAttribute(InterfaceHttpData field) { return field == null || !field.getHttpDataType().equals(InterfaceHttpData.HttpDataType.Attribute); } @@ -207,4 +372,85 @@ public abstract class AbstractNCUdfServlet extends AbstractServlet { return HttpResponseStatus.INTERNAL_SERVER_ERROR; } + @Override + protected void get(IServletRequest request, IServletResponse response) throws Exception { + String localPath = localPath(request); + try { + if (localPath.equals("/") || localPath.equals("")) { + //TODO: nicer way to get this into display form? + Map<Namespace, Map<String, String>> dvToLibHashes = + ExternalLibraryUtils.produceLibraryListing(libraryManager); + List<Map<String, Object>> libraryList = new ArrayList<>(); + for (Map.Entry<Namespace, Map<String, String>> dvAndLibs : dvToLibHashes.entrySet()) { + for (Map.Entry<String, String> libsInDv : dvAndLibs.getValue().entrySet()) { + Map<String, Object> libraryEntry = new HashMap<>(); + libraryEntry.put(getDataverseKey(), libraryManager.getNsOrDv(dvAndLibs.getKey())); + libraryEntry.put(NAME_KEY, libsInDv.getKey()); + libraryEntry.put(FIELD_HASH, libsInDv.getValue()); + libraryList.add(libraryEntry); + } + } + JsonNode libraryListing = OBJECT_MAPPER.valueToTree(libraryList); + response.setStatus(HttpResponseStatus.OK); + HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request); + PrintWriter responseWriter = response.writer(); + JSONUtil.writeNode(responseWriter, libraryListing); + responseWriter.flush(); + } else if (localPath(request).startsWith(GET_UDF_DIST_ENDPOINT)) { + localPath = localPath(request).substring(GET_UDF_DIST_ENDPOINT.length()); + while (localPath.startsWith("/")) { + localPath = localPath.substring(1); + } + if (localPath.isEmpty()) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + Path filePath = workingDir.resolve(localPath).normalize(); + if (!filePath.startsWith(workingDir)) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + readFromFile(filePath, response, HttpUtil.ContentType.APPLICATION_OCTET_STREAM, null); + } else { + response.setStatus(HttpResponseStatus.NOT_FOUND); + } + } catch (Exception e) { + writeException(e, response); + LOGGER.error("Error reading library", e); + } + } + + protected void writeException(Exception e, IServletResponse response) { + response.setStatus(toHttpErrorStatus(e)); + PrintWriter responseWriter = response.writer(); + Map<String, String> error = Collections.singletonMap("error", e.getMessage()); + String errorJson = ""; + try { + errorJson = OBJECT_MAPPER.writeValueAsString(error); + } catch (JsonProcessingException ex) { + responseWriter.write("{ \"error\": \"Unable to process error message!\" }"); + } + responseWriter.write(errorJson); + responseWriter.flush(); + } + + abstract protected boolean isRequestPermitted(IServletRequest request, IServletResponse response) + throws IOException; + + protected boolean isRequestOnLoopback(IServletRequest request) { + if (request.getLocalAddress() != null && request.getRemoteAddress() != null) { + InetAddress local = request.getLocalAddress().getAddress(); + InetAddress remote = request.getRemoteAddress().getAddress(); + return remote.isLoopbackAddress() && local.isLoopbackAddress(); + } else { + return false; + } + } + + protected void rejectForbidden(IServletResponse response) throws IOException { + // TODO: why this JSON format, do we use this anywhere else? + sendError(response, HttpUtil.ContentType.APPLICATION_JSON, HttpResponseStatus.FORBIDDEN, + "{ \"error\": \"Forbidden\" }"); + } + } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java index 02a99f657e..7194d5d86b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java @@ -18,245 +18,60 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER; -import static org.apache.asterix.common.library.LibraryDescriptor.FIELD_HASH; -import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME; -import static org.apache.asterix.external.library.ExternalLibraryManager.writeDescriptor; - import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.PrintWriter; -import java.net.InetAddress; import java.net.URI; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.security.DigestOutputStream; import java.security.MessageDigest; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import org.apache.asterix.app.message.CreateLibraryRequestMessage; -import org.apache.asterix.app.message.DropLibraryRequestMessage; -import org.apache.asterix.app.message.InternalRequestResponse; import org.apache.asterix.common.api.IApplicationContext; -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.api.IReceptionist; import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.common.functions.ExternalFunctionLanguage; -import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.common.library.LibraryDescriptor; -import org.apache.asterix.common.messaging.api.ICcAddressedMessage; -import org.apache.asterix.common.messaging.api.INCMessageBroker; -import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.common.metadata.Namespace; import org.apache.asterix.external.util.ExternalLibraryUtils; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.IFileHandle; -import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; -import org.apache.hyracks.http.server.utils.HttpUtil; -import org.apache.hyracks.util.JSONUtil; -import org.apache.hyracks.util.file.FileUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; - import io.netty.buffer.ByteBufInputStream; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpScheme; -import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; public class NCUdfApiServlet extends AbstractNCUdfServlet { - protected final IReceptionist receptionist; - - protected Path workingDir; - private String sysAuthHeader; - private ILibraryManager libraryManager; - private int timeout; + private final HttpScheme httpServerProtocol; + private final int httpServerPort; private static final Logger LOGGER = LogManager.getLogger(); public NCUdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, HttpScheme httpServerProtocol, int httpServerPort) { - super(ctx, paths, appCtx, httpServerProtocol, httpServerPort); - this.receptionist = appCtx.getReceptionist(); - this.timeout = appCtx.getExternalProperties().getLibraryDeployTimeout(); + super(ctx, paths, appCtx); + this.httpServerProtocol = httpServerProtocol; + this.httpServerPort = httpServerPort; } - private enum LibraryOperation { - UPSERT, - DELETE + URI createDownloadURI(Path file) throws Exception { + String host = appCtx.getServiceContext().getAppConfig().getString(NCConfig.Option.PUBLIC_ADDRESS); + String path = paths[0].substring(0, servletPathLengths[0]) + GET_UDF_DIST_ENDPOINT + '/' + file.getFileName(); + return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null); } @Override - public void init() throws IOException { - appCtx = (INcApplicationContext) plainAppCtx; - this.libraryManager = appCtx.getLibraryManager(); - srvCtx = this.appCtx.getServiceContext(); - workingDir = Paths.get(appCtx.getLibraryManager().getDistributionDir().getAbsolutePath()).normalize(); - initAuth(); - initStorage(); - } - - protected void initAuth() { - sysAuthHeader = (String) ctx.get(SYS_AUTH_HEADER); - } - - protected void initStorage() throws IOException { - // prepare working directory - if (Files.isDirectory(workingDir)) { - try { - FileUtils.cleanDirectory(workingDir.toFile()); - } catch (IOException e) { - LOGGER.warn("Could not clean directory: " + workingDir, e); - } - } else { - Files.deleteIfExists(workingDir); - FileUtil.forceMkdirs(workingDir.toFile()); - } - } - - protected Map<String, String> additionalHttpHeadersFromRequest(IServletRequest request) { - return Collections.emptyMap(); - } - - private void doCreate(Namespace libNamespace, String libraryName, ExternalFunctionLanguage language, String hash, - URI downloadURI, boolean replaceIfExists, String sysAuthHeader, IRequestReference requestReference, - IServletRequest request) throws Exception { - INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); - MessageFuture responseFuture = ncMb.registerMessageFuture(); - CreateLibraryRequestMessage req = new CreateLibraryRequestMessage(srvCtx.getNodeId(), - responseFuture.getFutureId(), libNamespace, libraryName, language, hash, downloadURI, replaceIfExists, - sysAuthHeader, requestReference, additionalHttpHeadersFromRequest(request)); - sendMessage(req, responseFuture); - } - - private void doDrop(Namespace namespace, String libraryName, boolean replaceIfExists, + protected void distributeLibrary(LibraryUploadData uploadData, DataverseName libDv, String libName, String fileExt, + ExternalFunctionLanguage language, MessageDigest digest, Namespace libNamespace, IRequestReference requestReference, IServletRequest request) throws Exception { - INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); - MessageFuture responseFuture = ncMb.registerMessageFuture(); - DropLibraryRequestMessage req = new DropLibraryRequestMessage(srvCtx.getNodeId(), responseFuture.getFutureId(), - namespace, libraryName, replaceIfExists, requestReference, additionalHttpHeadersFromRequest(request)); - sendMessage(req, responseFuture); - } - - private void sendMessage(ICcAddressedMessage requestMessage, MessageFuture responseFuture) throws Exception { - // Running on NC -> send 'execute' message to CC - INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); - InternalRequestResponse responseMsg; - try { - ncMb.sendMessageToPrimaryCC(requestMessage); - responseMsg = (InternalRequestResponse) responseFuture.get(timeout, TimeUnit.SECONDS); - } finally { - ncMb.deregisterMessageFuture(responseFuture.getFutureId()); - } - - Throwable err = responseMsg.getError(); - if (err != null) { - if (err instanceof Error) { - throw (Error) err; - } else if (err instanceof Exception) { - throw (Exception) err; - } else { - throw new Exception(err.toString(), err); - } - } - } - - @Override - protected void get(IServletRequest request, IServletResponse response) throws Exception { - String localPath = localPath(request); - try { - if (localPath.equals("/") || localPath.equals("")) { - //TODO: nicer way to get this into display form? - Map<Namespace, Map<String, String>> dvToLibHashes = - ExternalLibraryUtils.produceLibraryListing(libraryManager); - List<Map<String, Object>> libraryList = new ArrayList<>(); - for (Map.Entry<Namespace, Map<String, String>> dvAndLibs : dvToLibHashes.entrySet()) { - for (Map.Entry<String, String> libsInDv : dvAndLibs.getValue().entrySet()) { - Map<String, Object> libraryEntry = new HashMap<>(); - libraryEntry.put(getDataverseKey(), libraryManager.getNsOrDv(dvAndLibs.getKey())); - libraryEntry.put(NAME_KEY, libsInDv.getKey()); - libraryEntry.put(FIELD_HASH, libsInDv.getValue()); - libraryList.add(libraryEntry); - } - } - JsonNode libraryListing = OBJECT_MAPPER.valueToTree(libraryList); - response.setStatus(HttpResponseStatus.OK); - HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request); - PrintWriter responseWriter = response.writer(); - JSONUtil.writeNode(responseWriter, libraryListing); - responseWriter.flush(); - } else if (localPath(request).startsWith(GET_UDF_DIST_ENDPOINT)) { - localPath = localPath(request).substring(GET_UDF_DIST_ENDPOINT.length()); - while (localPath.startsWith("/")) { - localPath = localPath.substring(1); - } - if (localPath.isEmpty()) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - return; - } - Path filePath = workingDir.resolve(localPath).normalize(); - if (!filePath.startsWith(workingDir)) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - return; - } - readFromFile(filePath, response, HttpUtil.ContentType.APPLICATION_OCTET_STREAM, null); - } else { - response.setStatus(HttpResponseStatus.NOT_FOUND); - } - } catch (Exception e) { - writeException(e, response); - LOGGER.error("Error reading library", e); - } - } - - private void writeLibToCloud(LibraryUploadData uploadData, Namespace libNamespace, String libName, - MessageDigest digest, ExternalFunctionLanguage language) throws IOException { - FileReference libDir = libraryManager.getLibraryDir(libNamespace, libName); - IIOManager cloudIoMgr = libraryManager.getCloudIOManager(); - FileReference lib = libDir.getChild(ILibraryManager.LIBRARY_ARCHIVE_NAME); - if (!libDir.getFile().exists()) { - Files.createDirectories(lib.getFile().toPath().getParent()); - } - if (!lib.getFile().exists()) { - Files.createFile(lib.getFile().toPath()); - } - IFileHandle fh = cloudIoMgr.open(lib, IIOManager.FileReadWriteMode.READ_WRITE, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); - WritableByteChannel outChannel = cloudIoMgr.newWritableChannel(fh); - byte[] writeBuf = new byte[4096]; - FileReference targetDescFile = libDir.getChild(DESCRIPTOR_FILE_NAME); - try (OutputStream outputStream = new DigestOutputStream(Channels.newOutputStream(outChannel), digest); - InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) { - IOUtils.copyLarge(ui, outputStream, writeBuf); - outputStream.flush(); - cloudIoMgr.sync(fh, true); - writeDescriptor(libraryManager, targetDescFile, - new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)), true, writeBuf); - } finally { - cloudIoMgr.close(fh); - } + URI downloadURI = cacheLibAndDistribute(uploadData, libDv, libName, fileExt, digest); + doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), downloadURI, true, + getSysAuthHeader(), requestReference, request); } private URI cacheLibAndDistribute(LibraryUploadData uploadData, DataverseName libDv, String libName, String fileExt, @@ -274,80 +89,11 @@ public class NCUdfApiServlet extends AbstractNCUdfServlet { return createDownloadURI(libraryTempFile); } - private void handleModification(IServletRequest request, IServletResponse response, LibraryOperation op) { - HttpRequest httpRequest = request.getHttpRequest(); - Path libraryTempFile = null; - FileOutputStream libTmpOut = null; - HttpPostRequestDecoder requestDecoder = null; - String localPath = localPath(request); - try { - Pair<Namespace, String> namespaceAndName = decodeDvAndLibFromLocalPath(localPath); - String libName = namespaceAndName.second; - Namespace libNamespace = namespaceAndName.first; - DataverseName libDv = libNamespace.getDataverseName(); - IRequestReference requestReference = receptionist.welcome(request); - if (op == LibraryOperation.UPSERT) { - requestDecoder = new HttpPostRequestDecoder(httpRequest); - LibraryUploadData uploadData = decodeMultiPartLibraryOptions(requestDecoder); - ExternalFunctionLanguage language = uploadData.type; - String fileExt = FilenameUtils.getExtension(uploadData.fileUpload.getFilename()); - MessageDigest digest = MessageDigest.getInstance("MD5"); - if (appCtx.isCloudDeployment()) { - writeLibToCloud(uploadData, libNamespace, libName, digest, language); - doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), null, - true, getSysAuthHeader(), requestReference, request); - } else { - URI downloadURI = cacheLibAndDistribute(uploadData, libDv, libName, fileExt, digest); - doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), - downloadURI, true, getSysAuthHeader(), requestReference, request); - } - } else if (op == LibraryOperation.DELETE) { - //DELETE semantics imply ifExists - doDrop(libNamespace, libName, false, requestReference, request); - } - response.setStatus(HttpResponseStatus.OK); - PrintWriter responseWriter = response.writer(); - String emptyJson = "{}"; - responseWriter.write(emptyJson); - responseWriter.flush(); - } catch (Exception e) { - writeException(e, response); - LOGGER.info("Error modifying library", e); - } finally { - if (requestDecoder != null) { - requestDecoder.destroy(); - } - try { - if (libraryTempFile != null) { - if (libTmpOut != null) { - libTmpOut.close(); - } - Files.deleteIfExists(libraryTempFile); - } - } catch (IOException e) { - LOGGER.warn("Could not delete temporary file " + libraryTempFile, e); - } - } - } - protected String getSysAuthHeader() { return sysAuthHeader; } - private void writeException(Exception e, IServletResponse response) { - response.setStatus(toHttpErrorStatus(e)); - PrintWriter responseWriter = response.writer(); - Map<String, String> error = Collections.singletonMap("error", e.getMessage()); - String errorJson = ""; - try { - errorJson = OBJECT_MAPPER.writeValueAsString(error); - } catch (JsonProcessingException ex) { - responseWriter.write("{ \"error\": \"Unable to process error message!\" }"); - } - responseWriter.write(errorJson); - responseWriter.flush(); - } - + @Override protected boolean isRequestPermitted(IServletRequest request, IServletResponse response) throws IOException { if (!isRequestOnLoopback(request)) { rejectForbidden(response); @@ -356,22 +102,6 @@ public class NCUdfApiServlet extends AbstractNCUdfServlet { return true; } - protected boolean isRequestOnLoopback(IServletRequest request) { - if (request.getLocalAddress() != null && request.getRemoteAddress() != null) { - InetAddress local = request.getLocalAddress().getAddress(); - InetAddress remote = request.getRemoteAddress().getAddress(); - return remote.isLoopbackAddress() && local.isLoopbackAddress(); - } else { - return false; - } - } - - protected void rejectForbidden(IServletResponse response) throws IOException { - // TODO: why this JSON format, do we use this anywhere else? - sendError(response, HttpUtil.ContentType.APPLICATION_JSON, HttpResponseStatus.FORBIDDEN, - "{ \"error\": \"Forbidden\" }"); - } - @Override protected void post(IServletRequest request, IServletResponse response) throws IOException { if (isRequestPermitted(request, response)) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfDSApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfDSApiServlet.java new file mode 100644 index 0000000000..d5311cedde --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfDSApiServlet.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME; +import static org.apache.asterix.external.library.ExternalLibraryManager.writeDescriptor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Files; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.functions.ExternalFunctionLanguage; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.library.LibraryDescriptor; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.metadata.Namespace; +import org.apache.asterix.external.util.ExternalLibraryUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IFileHandle; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; + +import io.netty.buffer.ByteBufInputStream; + +public class NCUdfDSApiServlet extends AbstractNCUdfServlet { + + public NCUdfDSApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx) { + super(ctx, paths, appCtx); + } + + protected void distributeLibrary(LibraryUploadData uploadData, DataverseName libDv, String libName, String fileExt, + ExternalFunctionLanguage language, MessageDigest digest, Namespace namespace, + IRequestReference requestReference, IServletRequest request) throws Exception { + writeLibToCloud(uploadData, namespace, libName, digest, language); + doCreate(namespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), null, true, + getSysAuthHeader(), requestReference, request); + } + + private void writeLibToCloud(LibraryUploadData uploadData, Namespace libNamespace, String libName, + MessageDigest digest, ExternalFunctionLanguage language) throws IOException { + FileReference libDir = libraryManager.getLibraryDir(libNamespace, libName); + IIOManager cloudIoMgr = libraryManager.getCloudIOManager(); + FileReference lib = libDir.getChild(ILibraryManager.LIBRARY_ARCHIVE_NAME); + if (!libDir.getFile().exists()) { + Files.createDirectories(lib.getFile().toPath().getParent()); + } + if (!lib.getFile().exists()) { + Files.createFile(lib.getFile().toPath()); + } + IFileHandle fh = cloudIoMgr.open(lib, IIOManager.FileReadWriteMode.READ_WRITE, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + WritableByteChannel outChannel = cloudIoMgr.newWritableChannel(fh); + byte[] writeBuf = new byte[4096]; + FileReference targetDescFile = libDir.getChild(DESCRIPTOR_FILE_NAME); + try (OutputStream outputStream = new DigestOutputStream(Channels.newOutputStream(outChannel), digest); + InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) { + IOUtils.copyLarge(ui, outputStream, writeBuf); + outputStream.flush(); + cloudIoMgr.sync(fh, true); + writeDescriptor(libraryManager, targetDescFile, + new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)), true, writeBuf); + } finally { + cloudIoMgr.close(fh); + } + } + + protected String getSysAuthHeader() { + return sysAuthHeader; + } + + @Override + protected boolean isRequestPermitted(IServletRequest request, IServletResponse response) throws IOException { + return true; + } + + @Override + protected void post(IServletRequest request, IServletResponse response) throws IOException { + if (isRequestPermitted(request, response)) { + handleModification(request, response, LibraryOperation.UPSERT); + } + } + + @Override + protected void delete(IServletRequest request, IServletResponse response) throws IOException { + if (isRequestPermitted(request, response)) { + handleModification(request, response, LibraryOperation.DELETE); + } + } + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java index 2c29d1480e..ca8bc3bcb6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java @@ -29,33 +29,33 @@ import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.utils.HttpUtil; -import io.netty.handler.codec.http.HttpScheme; - public class NCUdfRecoveryServlet extends AbstractNCUdfServlet { - ExternalLibraryManager libraryManager; - public static final String GET_ALL_UDF_ENDPOINT = "/all"; - public NCUdfRecoveryServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, - HttpScheme httpServerProtocol, int httpServerPort) { - super(ctx, paths, appCtx, httpServerProtocol, httpServerPort); + public NCUdfRecoveryServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx) { + super(ctx, paths, appCtx); } @Override public void init() { appCtx = (INcApplicationContext) plainAppCtx; srvCtx = this.appCtx.getServiceContext(); - this.libraryManager = (ExternalLibraryManager) appCtx.getLibraryManager(); } @Override protected void get(IServletRequest request, IServletResponse response) throws Exception { String localPath = localPath(request); if (localPath.equals(GET_ALL_UDF_ENDPOINT)) { - Path zippedLibs = libraryManager.zipAllLibs(); + Path zippedLibs = ((ExternalLibraryManager) libraryManager).zipAllLibs(); readFromFile(zippedLibs, response, HttpUtil.ContentType.APPLICATION_ZIP, StandardOpenOption.DELETE_ON_CLOSE); } } + + @Override + protected boolean isRequestPermitted(IServletRequest request, IServletResponse response) { + //gated by system auth + return true; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 7566206087..40a362e418 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -31,12 +31,18 @@ import static org.apache.hyracks.control.common.controllers.ControllerConfig.Opt import java.io.File; import java.io.IOException; import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.asterix.algebra.base.ILangExtension; import org.apache.asterix.api.http.server.BasicAuthServlet; @@ -44,6 +50,7 @@ import org.apache.asterix.api.http.server.NCQueryResultApiServlet; import org.apache.asterix.api.http.server.NCQueryServiceServlet; import org.apache.asterix.api.http.server.NCQueryStatusApiServlet; import org.apache.asterix.api.http.server.NCUdfApiServlet; +import org.apache.asterix.api.http.server.NCUdfDSApiServlet; import org.apache.asterix.api.http.server.NCUdfRecoveryServlet; import org.apache.asterix.api.http.server.NetDiagnosticsApiServlet; import org.apache.asterix.api.http.server.ServletConstants; @@ -98,6 +105,7 @@ import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.client.NodeStatus; +import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -131,6 +139,7 @@ public class NCApplication extends BaseNCApplication { protected boolean startupCompleted; protected WebManager webManager; private HttpServer apiServer; + private HttpServer udfServer; @Override public void registerConfig(IConfigManager configManager) { @@ -265,19 +274,38 @@ public class NCApplication extends BaseNCApplication { apiServer.setAttribute(ServletConstants.CREDENTIAL_MAP, parseCredentialMap(((NodeControllerService) ncServiceCtx.getControllerService()).getConfiguration() .getCredentialFilePath())); - Pair<Map<String, String>, Map<String, String>> auth = BasicAuthServlet.generateSysAuthHeader(apiServer.ctx()); - apiServer - .addServlet(new BasicAuthServlet(apiServer.ctx(), - new NCUdfApiServlet(apiServer.ctx(), new String[] { UDF }, getApplicationContext(), - apiServer.getScheme(), apiServer.getAddress().getPort()), - auth.getFirst(), auth.getSecond())); - apiServer.addServlet(new BasicAuthServlet( - apiServer.ctx(), new NCUdfRecoveryServlet(apiServer.ctx(), new String[] { UDF_RECOVERY }, - getApplicationContext(), apiServer.getScheme(), apiServer.getAddress().getPort()), - auth.getFirst(), auth.getSecond())); + if (!getApplicationContext().isCloudDeployment()) { + Pair<Map<String, String>, Map<String, String>> auth = + BasicAuthServlet.generateSysAuthHeader(apiServer.ctx()); + apiServer.addServlet((new BasicAuthServlet(apiServer.ctx(), + new NCUdfApiServlet(apiServer.ctx(), new String[] { UDF }, getApplicationContext(), + apiServer.getScheme(), externalProperties.getNcApiPort()), + auth.getFirst(), auth.getSecond()))); + apiServer.addServlet(new BasicAuthServlet(apiServer.ctx(), + new NCUdfRecoveryServlet(apiServer.ctx(), new String[] { UDF_RECOVERY }, getApplicationContext()), + auth.getFirst(), auth.getSecond())); + } else { + //UDF deployment server + IApplicationConfig appCfg = getApplicationContext().getServiceContext().getAppConfig(); + Path udfSockPath = Path.of(appCfg.getString(NCConfig.Option.UDF_API_DS_PATH)).toAbsolutePath(); + if (!Files.exists(udfSockPath.getParent())) { + Set<PosixFilePermission> noOthers = PosixFilePermissions.fromString("rwxr-x---"); + FileAttribute<?> permissions = PosixFilePermissions.asFileAttribute(noOthers); + Files.createDirectories(udfSockPath.getParent(), permissions); + } + udfServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), udfSockPath, config); + Pair<Map<String, String>, Map<String, String>> auth = + BasicAuthServlet.generateSysAuthHeader(apiServer.ctx()); + udfServer.addServlet(new BasicAuthServlet(apiServer.ctx(), + new NCUdfDSApiServlet(apiServer.ctx(), new String[] { UDF }, getApplicationContext()), + auth.getFirst(), auth.getSecond())); + webManager.add(udfServer); + + } apiServer.addServlet(new NCQueryStatusApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_STATUS)); apiServer.addServlet(new NCQueryResultApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_RESULT)); webManager.add(apiServer); + } protected List<AsterixExtension> getExtensions() throws Exception { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java index b86730b07d..4cac409039 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java @@ -28,14 +28,14 @@ import com.adobe.testing.s3mock.testcontainers.S3MockContainer; public class CloudStorageIntegrationUtil extends AsterixHyracksIntegrationUtil { public static final String RESOURCES_PATH = joinPath(getProjectPath().toString(), "src", "test", "resources"); - public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, "cc-cloud-storage-main.conf"); - public static final String CONFIG_FILE_TEMPLATE = joinPath(RESOURCES_PATH, "cc-cloud-storage-main.ftl"); + public static final String TARGET_PATH = joinPath(getProjectPath().toString(), "target"); + public static final String CONFIG_FILE = joinPath(TARGET_PATH, "cc-cloud-storage.conf"); + public static final String CONFIG_FILE_TEMPLATE = joinPath(RESOURCES_PATH, "cc-cloud-storage.conf.ftl"); public static void main(String[] args) throws Exception { - boolean cleanStart = Boolean.getBoolean("cleanup.start"); - LocalCloudUtilAdobeMock.startS3CloudEnvironment(cleanStart); - final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + boolean cleanStart = true; try (S3MockContainer s3Mock = LocalCloudUtilAdobeMock.startS3CloudEnvironment(cleanStart)) { + final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); fillConfigTemplate(MOCK_SERVER_HOSTNAME_FRAGMENT + s3Mock.getHttpServerPort(), CONFIG_FILE_TEMPLATE, CONFIG_FILE); integrationUtil.run(cleanStart, Boolean.getBoolean("cleanup.shutdown"), diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/CloudUDFLibrarian.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/CloudUDFLibrarian.java new file mode 100644 index 0000000000..fac50e7f93 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/CloudUDFLibrarian.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.external; + +import java.io.File; +import java.io.IOException; +import java.net.UnixDomainSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.hyracks.algebricks.common.utils.Pair; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.codec.http.HttpResponseStatus; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientResponse; + +@SuppressWarnings("squid:S134") +public class CloudUDFLibrarian implements IExternalUDFLibrarian { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private String sockPath; + private final EventLoopGroup elg; + + public CloudUDFLibrarian(String sockPath) { + this.sockPath = sockPath; + //force nio even if epoll/kqueue are on classpath + this.elg = new NioEventLoopGroup(); + } + + private static String createAuthHeader(Pair<String, String> credentials) { + String auth = credentials.first + ":" + credentials.second; + byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.UTF_8)); + return "Basic " + new String(encodedAuth); + } + + public void setCredentials(Pair<String, String> credentials) { + } + + public void setAddress(String address) { + this.sockPath = address; + } + + @Override + public void install(String path, String type, String libPath, Pair<String, String> credentials) throws Exception { + File lib = new File(libPath); + UnixDomainSocketAddress sockAddr = UnixDomainSocketAddress.of(sockPath); + + HttpClient hc = HttpClient.create().runOn(elg) + .headers(h -> h.set("Authorization", createAuthHeader(credentials))).remoteAddress(() -> sockAddr); + hc.post().uri(path) + .sendForm((req, form) -> form.multipart(true).attr("type", type).file("data", lib, + "application/octet-stream")) + .responseSingle((response, content) -> handleResponse(response, content)).block(); + } + + @Override + public void uninstall(String path, Pair<String, String> credentials) throws IOException, AsterixException { + UnixDomainSocketAddress sockAddr = UnixDomainSocketAddress.of(sockPath); + HttpClient hc = HttpClient.create().runOn(elg) + .headers(h -> h.set("Authorization", createAuthHeader(credentials))).remoteAddress(() -> sockAddr); + hc.delete().uri(path).responseSingle((response, content) -> handleResponse(response, content)).block(); + } + + private Mono<Void> handleResponse(HttpClientResponse response, reactor.netty.ByteBufMono content) { + return content.asString().defaultIfEmpty("").flatMap(body -> { + int respCode = response.status().code(); + if (respCode == HttpResponseStatus.OK.code()) { + return Mono.empty(); + } + + String errorMessage; + if (respCode == HttpResponseStatus.INTERNAL_SERVER_ERROR.code() + || respCode == HttpResponseStatus.BAD_REQUEST.code()) { + try { + errorMessage = OBJECT_MAPPER.readTree(body).get("error").asText(); + } catch (IOException e) { + errorMessage = "Failed to parse error response: " + body; + } + } else { + errorMessage = response.status().toString(); + } + return Mono.error(new AsterixException(errorMessage)); + }); + } + + @Override + public SocketType getSocketType() { + return SocketType.DOMAIN; + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java index e32f8eaafc..760adf441f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java @@ -57,8 +57,8 @@ public class ExternalUDFLibrarian implements IExternalUDFLibrarian { } @Override - public void install(URI path, String type, String libPath, Pair<String, String> credentials) throws Exception { - HttpClientContext hcCtx = createHttpClientContext(path, credentials); + public void install(String path, String type, String libPath, Pair<String, String> credentials) throws Exception { + HttpClientContext hcCtx = createHttpClientContext(URI.create(path), credentials); HttpPost post = new HttpPost(path); File lib = new File(libPath); MultipartEntityBuilder entity = MultipartEntityBuilder.create().setMode(HttpMultipartMode.STRICT); @@ -70,8 +70,8 @@ public class ExternalUDFLibrarian implements IExternalUDFLibrarian { } @Override - public void uninstall(URI path, Pair<String, String> credentials) throws IOException, AsterixException { - HttpClientContext hcCtx = createHttpClientContext(path, credentials); + public void uninstall(String path, Pair<String, String> credentials) throws IOException, AsterixException { + HttpClientContext hcCtx = createHttpClientContext(URI.create(path), credentials); HttpDelete del = new HttpDelete(path); HttpResponse response = hc.execute(del, hcCtx); handleResponse(response); @@ -103,4 +103,9 @@ public class ExternalUDFLibrarian implements IExternalUDFLibrarian { throw new AsterixException(resp); } } + + @Override + public SocketType getSocketType() { + return SocketType.LOOPBACK; + } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java index 998fa78ace..b47d07a733 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java @@ -19,13 +19,19 @@ package org.apache.asterix.app.external; import java.io.IOException; -import java.net.URI; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.hyracks.algebricks.common.utils.Pair; public interface IExternalUDFLibrarian { - void install(URI path, String type, String libPath, Pair<String, String> credentials) throws Exception; + public enum SocketType { + LOOPBACK, + DOMAIN + } - void uninstall(URI path, Pair<String, String> credentials) throws IOException, AsterixException; + void install(String path, String type, String libPath, Pair<String, String> credentials) throws Exception; + + void uninstall(String path, Pair<String, String> credentials) throws IOException, AsterixException; + + SocketType getSocketType(); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudPythonTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudPythonTest.java index 00dab0eeb3..ad6f6b46a7 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudPythonTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudPythonTest.java @@ -21,11 +21,13 @@ package org.apache.asterix.test.cloud_storage; import static org.apache.asterix.api.common.LocalCloudUtilAdobeMock.fillConfigTemplate; import static org.apache.asterix.test.cloud_storage.CloudStorageTest.MOCK_SERVER_HOSTNAME_FRAGMENT; import static org.apache.asterix.test.runtime.ExternalPythonFunctionIT.setNcEndpoints; +import static org.apache.hyracks.util.file.FileUtil.joinPath; import java.util.Collection; import java.util.List; import org.apache.asterix.api.common.LocalCloudUtilAdobeMock; +import org.apache.asterix.app.external.CloudUDFLibrarian; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.test.common.TestConstants; import org.apache.asterix.test.common.TestExecutor; @@ -48,7 +50,7 @@ import org.junit.runners.Parameterized.Parameters; import com.adobe.testing.s3mock.testcontainers.S3MockContainer; /** - * Run tests in cloud deployment environment + * Run Python UDF tests in cloud deployment environment */ @RunWith(Parameterized.class) @FixMethodOrder(MethodSorters.NAME_ASCENDING) @@ -63,6 +65,7 @@ public class CloudPythonTest { public static final String CONFIG_FILE_TEMPLATE = "src/test/resources/cc-cloud-storage.conf.ftl"; public static final String CONFIG_FILE = "target/cc-cloud-storage.conf"; private static final String EXCLUDED_TESTS = "MP"; + private static final String dsPath = joinPath("/", "tmp", "asterixdb_udf", "asterix_nc1_udf.sock"); public CloudPythonTest(TestCaseContext tcCtx) { this.tcCtx = tcCtx; @@ -78,7 +81,7 @@ public class CloudPythonTest { TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH); testExecutor.executorId = "cloud"; testExecutor.stripSubstring = "//DB:"; - LangExecutionUtil.setUp(CONFIG_FILE, testExecutor); + LangExecutionUtil.setUp(CONFIG_FILE, testExecutor, false, false, new CloudUDFLibrarian(dsPath)); setNcEndpoints(testExecutor); System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 81ad57a769..bb2aa34c5f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -1475,7 +1475,13 @@ public class TestExecutor { //TODO: this is not right. URLEncoder does not properly encode paths. String dataverse = URLEncoder.encode(command[1], StandardCharsets.US_ASCII.name()); String library = URLEncoder.encode(command[2], StandardCharsets.US_ASCII.name()); - URI path = createEndpointURI("/admin/udf/" + dataverse + "/" + library); + String basePath = "/admin/udf/" + dataverse + "/" + library; + String path = ""; + switch (librarian.getSocketType()){ + case DOMAIN -> path = basePath; + case LOOPBACK -> path = createEndpointURI(basePath).toString(); + default -> path = createEndpointURI(basePath).toString(); + } if (command.length < 2) { throw new Exception("invalid library command: " + line); } @@ -2742,6 +2748,11 @@ public class TestExecutor { return uri; } + protected URI createUDFDomainSockURI(String pathAndQuery) { + URI uri = URI.create("unix://target/tmp/asterix_nc1/udf.sock" + pathAndQuery); + return uri; + } + public URI getEndpoint(String servlet) throws URISyntaxException { return createEndpointURI(Servlets.getAbsolutePath(getPath(servlet))); } diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf index 8745d9d7fd..9efabaa036 100644 --- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf +++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf @@ -21,6 +21,7 @@ core.dump.dir=target/tmp/asterix_nc1/coredump iodevices=target/tmp/asterix_nc1/iodevice1 iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2 nc.api.port=19004 +udf.api.ds.path=target/tmp/asterix_nc1/udf.sock #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006 [nc/asterix_nc2] @@ -29,6 +30,7 @@ txn.log.dir=target/tmp/asterix_nc2/txnlog core.dump.dir=target/tmp/asterix_nc2/coredump iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2 nc.api.port=19005 +udf.api.ds.path=target/tmp/asterix_nc2/udf.sock #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007 [nc] @@ -68,7 +70,7 @@ storage.partitioning=static cloud.storage.scheme=s3 cloud.storage.bucket=cloud-storage-container cloud.storage.region=us-west-2 -cloud.storage.endpoint=http://127.0.0.1:8001 +cloud.storage.endpoint=http://127.0.0.1:46677 cloud.storage.anonymous.auth=true cloud.storage.cache.policy=selective cloud.max.write.requests.per.second=2000 diff --git a/asterixdb/asterix-doc/src/main/user-defined_function/udf.md b/asterixdb/asterix-doc/src/main/user-defined_function/udf.md index 71e0fa2e44..9768c81cd5 100644 --- a/asterixdb/asterix-doc/src/main/user-defined_function/udf.md +++ b/asterixdb/asterix-doc/src/main/user-defined_function/udf.md @@ -52,7 +52,7 @@ Now,restart the cluster if it was already started to allow the Cluster Controlle ## <a name="installingUDF">Installing a Java UDF Library</a> To install a UDF package to the cluster, we need to send a Multipart Form-data HTTP request to the `/admin/udf` endpoint -of the CC at the normal API port (`19004` by default). Any suitable tool will do, but for the example here I will use +of the cluster through the special UDF domain socket. Any suitable tool will do, but for the example here I will use `curl` which is widely available. For example, to install a library with the following criteria: @@ -65,7 +65,7 @@ For example, to install a library with the following criteria: we would execute - curl -v -u admin:admin -X POST -F 'data=@./lib.zip' -F 'type=java' localhost:19004/admin/udf/udfs/testlib + curl -v -u admin:admin --unix-socket udf.sock -F 'data=@./lib.zip' -F 'type=java' localhost/admin/udf/udfs/testlib Any response other than `200` indicates an error in deployment. diff --git a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java index 025f607a30..9a9bd85ebe 100644 --- a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java +++ b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java @@ -19,7 +19,6 @@ package org.apache.asterix.test.podman; import java.io.IOException; -import java.net.URI; import org.apache.asterix.app.external.IExternalUDFLibrarian; import org.apache.asterix.common.exceptions.AsterixException; @@ -42,7 +41,7 @@ public class PodmanUDFLibrarian implements IExternalUDFLibrarian { } @Override - public void install(URI path, String type, String libPath, Pair<String, String> credentials) throws Exception { + public void install(String path, String type, String libPath, Pair<String, String> credentials) throws Exception { Container.ExecResult curlResult = null; int retryCt = 0; while (retryCt < 10) { @@ -50,7 +49,7 @@ public class PodmanUDFLibrarian implements IExternalUDFLibrarian { curlResult = asterix.execInContainer("curl", "--no-progress-meter", "-X", "POST", "-u", credentials.first + ":" + credentials.second, "-F", "data=@" + "/var/tmp/asterix-app/" + libPath, "-F", "type=" + type, - "http://localhost:19004" + path.getRawPath()); + "http://localhost:19004" + path); handleResponse(curlResult); return; } catch (RuntimeException e) { @@ -62,16 +61,21 @@ public class PodmanUDFLibrarian implements IExternalUDFLibrarian { } @Override - public void uninstall(URI path, Pair<String, String> credentials) throws IOException, AsterixException { + public void uninstall(String path, Pair<String, String> credentials) throws IOException, AsterixException { try { Container.ExecResult curlResult = asterix.execInContainer("curl", "-X", "DELETE", "-u", - credentials.first + ":" + credentials.second, "http://localhost:19004" + path.getPath()); + credentials.first + ":" + credentials.second, "http://localhost:19004" + path); handleResponse(curlResult); } catch (InterruptedException e) { throw new IOException(e); } } + @Override + public SocketType getSocketType() { + return SocketType.LOOPBACK; + } + private void handleResponse(Container.ExecResult result) throws AsterixException, JsonProcessingException { if (result.getExitCode() != 0) { throw new AsterixException(result.getStderr()); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index f55dd59c79..c3571934c6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -27,6 +27,7 @@ import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTE import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_LONG_BYTE_UNIT; import static org.apache.hyracks.control.common.config.OptionTypes.STRING; import static org.apache.hyracks.control.common.config.OptionTypes.STRING_ARRAY; +import static org.apache.hyracks.control.common.utils.ConfigurationUtil.JAVA_IO_TMPDIR; import java.net.InetAddress; import java.util.ArrayList; @@ -103,6 +104,11 @@ public class NCConfig extends ControllerConfig { PYTHON_ARGS(STRING_ARRAY, (String[]) null), PYTHON_ENV(STRING_ARRAY, (String[]) null), PYTHON_DS_PATH(STRING, (String) null), + UDF_API_DS_PATH( + STRING, + appConfig -> FileUtil.joinPath(System.getProperty(JAVA_IO_TMPDIR), "asterixdb_udf", + appConfig.getString(NODE_ID) + "_udf.sock"), + "Defaults to temporary directory/asterixdb_udf/{$NODE_ID}_udf.sock. Path must be <100 characters"), LIBRARY_MAX_FILE_SIZE(POSITIVE_LONG_BYTE_UNIT, 250L * 1024 * 1024), //250MB LIBRARY_MAX_EXTRACTED_SIZE(POSITIVE_LONG_BYTE_UNIT, 1000L * 1024 * 1024), //1GB LIBRARY_MAX_ARCHIVE_ENTRIES(INTEGER, 4096), @@ -244,6 +250,8 @@ public class NCConfig extends ControllerConfig { return "Number of threads per partition used to write and read from storage"; case IO_QUEUE_SIZE: return "Length of the queue used for requests to write and read"; + case UDF_API_DS_PATH: + return "Path and name for domain socket used to deploy UDFs to the cluster"; case PYTHON_CMD: return "Absolute path to python interpreter"; case PYTHON_ADDITIONAL_PACKAGES: diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java index 59c0ae05f2..faab5d30cd 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java @@ -114,5 +114,9 @@ public interface IServletRequest { */ InetSocketAddress getLocalAddress(); + String getHostPort(); + + String getRemotePort(); + Channel getChannel(); } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java index 4a72d539b4..0aaf8feaf9 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java @@ -19,6 +19,7 @@ package org.apache.hyracks.http.server; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -28,6 +29,7 @@ import java.util.Map; import java.util.Set; import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.util.NetworkUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -114,6 +116,23 @@ public class BaseRequest implements IServletRequest { return (InetSocketAddress) channel.localAddress(); } + @Override + public String getHostPort() { + return getPort(channel.localAddress()); + } + + @Override + public String getRemotePort() { + return getPort(channel.remoteAddress()); + } + + private String getPort(SocketAddress s) { + if (s instanceof InetSocketAddress) { + return NetworkUtil.toHostPort((InetSocketAddress) channel.localAddress()); + } + return "0"; + } + @Override public Channel getChannel() { return channel; diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java index 6476ab9e3d..16a29aa4af 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java @@ -30,6 +30,8 @@ import org.apache.logging.log4j.Logger; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioDomainSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpContent; @@ -71,7 +73,11 @@ public class CLFLogger extends ChannelDuplexHandler { if (msg instanceof HttpRequest) { HttpRequest req = (HttpRequest) msg; try { - clientIp = ((NioSocketChannel) ctx.channel()).remoteAddress().getAddress().toString().substring(1); + if (ctx.channel() instanceof SocketChannel) { + clientIp = ((NioSocketChannel) ctx.channel()).remoteAddress().getAddress().toString().substring(1); + } else if (ctx.channel() instanceof NioDomainSocketChannel) { + clientIp = ctx.channel().remoteAddress().toString(); + } } catch (Exception e) { LOGGER.debug("ignoring {} obtaining client ip for {}", e, ctx.channel()); clientIp = "-"; diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index 1198945eb9..4cc8bc9ae7 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -20,6 +20,10 @@ package org.apache.hyracks.http.server; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnixDomainSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -52,8 +56,11 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.ServerChannel; import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.socket.DuplexChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerDomainSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpScheme; @@ -70,6 +77,7 @@ public class HttpServer { new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK); protected static final int RECEIVE_BUFFER_SIZE = 4096; private static final Logger LOGGER = LogManager.getLogger(); + private static final Class<InetSocketAddress> INSA = InetSocketAddress.class; private static final int FAILED = -1; private static final int STOPPED = 0; private static final int STARTING = 1; @@ -85,7 +93,8 @@ public class HttpServer { private final ServletRegistry servlets; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; - private final Set<InetSocketAddress> addresses; + private final Set<SocketAddress> addresses; + private final Set<SocketAddress> boundAddresses; private final ThreadPoolExecutor executor; // Mutable members private volatile int state = STOPPED; @@ -93,6 +102,7 @@ public class HttpServer { private final List<Channel> channels; private Throwable cause; private HttpServerConfig config; + private Class<? extends ServerChannel> channelImpl = NioServerSocketChannel.class; private final GenericFutureListener<Future<Void>> channelCloseListener = f -> { // This listener is invoked from within a netty IO thread. Hence, we can never block it @@ -118,14 +128,30 @@ public class HttpServer { this(bossGroup, workerGroup, Collections.singletonList(address), config, closeHandler); } - public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, Collection<InetSocketAddress> addresses, - HttpServerConfig config, IChannelClosedHandler closeHandler) { + public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, Path path, HttpServerConfig config) { + this(bossGroup, workerGroup, Collections.singletonList(UnixDomainSocketAddress.of(path)), config, null); + } + + public static String getPortOrPath(SocketAddress addr) { + if (addr instanceof InetSocketAddress) { + return "port:" + ((InetSocketAddress) addr).getPort(); + } else if (addr instanceof UnixDomainSocketAddress) { + return "path:" + ((UnixDomainSocketAddress) addr).getPath().toString(); + } else { + return addr.toString(); + } + } + + public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, + Collection<? extends SocketAddress> addresses, HttpServerConfig config, + IChannelClosedHandler closeHandler) { if (addresses.isEmpty()) { throw new IllegalArgumentException("no addresses specified"); } this.bossGroup = bossGroup; this.workerGroup = workerGroup; this.addresses = new LinkedHashSet<>(addresses); + this.boundAddresses = new LinkedHashSet<>(addresses); this.closedHandler = closeHandler; this.config = config; channels = new ArrayList<>(); @@ -133,13 +159,21 @@ public class HttpServer { servlets = new ServletRegistry(); workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize()); int numExecutorThreads = config.getThreadCount(); - int[] ports = this.addresses.stream().mapToInt(InetSocketAddress::getPort).distinct().toArray(); + int[] ports = addresses.stream().filter(INSA::isInstance).map(INSA::cast).mapToInt(InetSocketAddress::getPort) + .distinct().toArray(); String desc; if (ports.length > 1) { - desc = this.addresses.stream().map(a -> a.getAddress().getHostAddress() + ":" + a.getPort()) + desc = addresses.stream().filter(INSA::isInstance).map(INSA::cast) + .map(a -> a.getAddress().getHostAddress() + ":" + a.getPort()) .collect(Collectors.joining(",", "[", "]")); - } else { + } else if (ports.length == 1) { desc = "port:" + ports[0]; + } else { + List<String> paths = addresses.stream().filter(UnixDomainSocketAddress.class::isInstance) + .map(UnixDomainSocketAddress.class::cast).map(UnixDomainSocketAddress::getPath) + .map(Object::toString).distinct().toList(); + desc = "paths: " + " [ " + String.join(" ],[ ", paths) + " ]"; + channelImpl = NioServerDomainSocketChannel.class; } executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue, runnable -> new Thread(runnable, "HttpExecutor(" + desc + ")-" + threadId.getAndIncrement())); @@ -278,18 +312,31 @@ public class HttpServer { private void bind() throws Exception { ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + b.group(bossGroup, workerGroup).channel(channelImpl) .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE)) .childOption(ChannelOption.AUTO_READ, Boolean.FALSE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK) .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer()); - List<Pair<InetSocketAddress, ChannelFuture>> channelFutures = new ArrayList<>(); - for (InetSocketAddress address : addresses) { + List<Pair<SocketAddress, ChannelFuture>> channelFutures = new ArrayList<>(); + for (SocketAddress address : addresses) { + if(address instanceof UnixDomainSocketAddress udr){ + Path udrPath = udr.getPath(); + //if the socket can't be connected to, but: + // - the node exists + // - it is not a regular file + // - it is not a directory + // - and is 0 length + // then it should be safe to unlink + if(!sockAlive(udr) && Files.exists(udrPath) && !Files.isRegularFile(udrPath) && !Files.isDirectory(udrPath) && udrPath.toFile().length() == 0){ + Files.deleteIfExists(udrPath); + } + } channelFutures.add(org.apache.commons.lang3.tuple.Pair.of(address, b.bind(address))); + boundAddresses.add(address); } Exception failure = null; - for (Pair<InetSocketAddress, ChannelFuture> addressFuture : channelFutures) { + for (Pair<SocketAddress, ChannelFuture> addressFuture : channelFutures) { try { Channel channel = addressFuture.getRight().sync().channel(); channel.closeFuture().addListener(channelCloseListener); @@ -384,6 +431,7 @@ public class HttpServer { LOGGER.log(Level.ERROR, "Error while shutting down http server executor", e); } closeChannels(); + deleteDomainSockFiles(); } public IServlet getServlet(FullHttpRequest request) { @@ -394,7 +442,7 @@ public class HttpServer { return new HttpServerHandler<>(this, chunkSize); } - protected ChannelInitializer<SocketChannel> getChannelInitializer() { + protected ChannelInitializer<DuplexChannel> getChannelInitializer() { return new HttpServerInitializer(this); } @@ -429,7 +477,7 @@ public class HttpServer { } @Deprecated // this returns an arbitrary (the first supplied if collection is ordered) address - public InetSocketAddress getAddress() { + public SocketAddress getAddress() { return addresses.iterator().next(); } @@ -443,4 +491,27 @@ public class HttpServer { channels.clear(); } } + + // Domain socket nodes exist outside the lifetime of the socket itself, so we should attempt to remove them on close + void deleteDomainSockFiles() { + for (UnixDomainSocketAddress address : boundAddresses.stream().filter(UnixDomainSocketAddress.class::isInstance) + .map(UnixDomainSocketAddress.class::cast).toList()) { + try { + Files.deleteIfExists(address.getPath()); + LOGGER.info("Deleted domain socket file {}", address.getPath()); + } catch (IOException e) { + LOGGER.warn("Failed to delete domain socket file {}", address.getPath(), e); + } + } + } + + //the only real way to know if the file given to us in the config is an active socket is to attempt to connect to it + boolean sockAlive(UnixDomainSocketAddress addr) { + try (var chan = java.nio.channels.SocketChannel.open(addr)) { + return true; + } catch (Exception e) { + return false; + } + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java index ad8a61f4a0..32b1dbbd22 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java @@ -20,11 +20,11 @@ package org.apache.hyracks.http.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.DuplexChannel; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; -public class HttpServerInitializer extends ChannelInitializer<SocketChannel> { +public class HttpServerInitializer extends ChannelInitializer<DuplexChannel> { private final HttpServer server; private final int maxRequestSize; @@ -44,7 +44,7 @@ public class HttpServerInitializer extends ChannelInitializer<SocketChannel> { } @Override - public void initChannel(SocketChannel ch) { + public void initChannel(DuplexChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(new HttpRequestCapacityController(server)); p.addLast(new HttpRequestDecoder(maxRequestInitialLineLength, maxRequestHeaderSize, maxRequestChunkSize));
