This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit deca3219983b6f906f0d6b20f5c87277de028862
Author: Ian Maxon <[email protected]>
AuthorDate: Wed Oct 29 11:26:23 2025 -0700

    [ASTERIXDB-3636][API] Migrate UDF API to domain sockets
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: no
    
    Details:
    Migrate the UDF API to use a domain socket, instead of
    being restricted to localhost on the normal NC servlet
    
    Change-Id: I5f8ac2170fd6b2beef14d99c38b9141af0f12ba3
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20492
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Ian Maxon <[email protected]>
---
 .../apache/asterix/translator/Receptionist.java    |   8 +-
 asterixdb/asterix-app/pom.xml                      |  30 ++
 .../api/http/server/AbstractNCUdfServlet.java      | 276 ++++++++++++++++++-
 .../asterix/api/http/server/NCUdfApiServlet.java   | 302 ++-------------------
 .../asterix/api/http/server/NCUdfDSApiServlet.java | 116 ++++++++
 .../api/http/server/NCUdfRecoveryServlet.java      |  18 +-
 .../asterix/hyracks/bootstrap/NCApplication.java   |  48 +++-
 .../api/common/CloudStorageIntegrationUtil.java    |  10 +-
 .../asterix/app/external/CloudUDFLibrarian.java    | 112 ++++++++
 .../asterix/app/external/ExternalUDFLibrarian.java |  13 +-
 .../app/external/IExternalUDFLibrarian.java        |  12 +-
 .../test/cloud_storage/CloudPythonTest.java        |   7 +-
 .../apache/asterix/test/common/TestExecutor.java   |  13 +-
 .../src/test/resources/cc-cloud-storage-main.conf  |   4 +-
 .../src/main/user-defined_function/udf.md          |   4 +-
 .../asterix/test/podman/PodmanUDFLibrarian.java    |  14 +-
 .../control/common/controllers/NCConfig.java       |   8 +
 .../apache/hyracks/http/api/IServletRequest.java   |   4 +
 .../apache/hyracks/http/server/BaseRequest.java    |  19 ++
 .../org/apache/hyracks/http/server/CLFLogger.java  |   8 +-
 .../org/apache/hyracks/http/server/HttpServer.java |  95 ++++++-
 .../hyracks/http/server/HttpServerInitializer.java |   6 +-
 22 files changed, 762 insertions(+), 365 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index 524af430e6..12619a96fd 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.translator;
 
-import java.net.InetSocketAddress;
 import java.util.UUID;
 
 import org.apache.asterix.common.api.IClientRequest;
@@ -31,18 +30,15 @@ import org.apache.http.HttpHeaders;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
-import org.apache.hyracks.util.NetworkUtil;
 
 public class Receptionist implements IReceptionist {
 
     @Override
     public IRequestReference welcome(IServletRequest request) {
         final String uuid = UUID.randomUUID().toString();
-        final InetSocketAddress localAddress = request.getLocalAddress();
-        final RequestReference ref =
-                RequestReference.of(uuid, 
NetworkUtil.toHostPort(localAddress), System.currentTimeMillis());
+        final RequestReference ref = RequestReference.of(uuid, 
request.getHostPort(), System.currentTimeMillis());
         ref.setUserAgent(request.getHeader(HttpHeaders.USER_AGENT));
-        ref.setRemoteAddr(NetworkUtil.toHostPort(request.getRemoteAddress()));
+        ref.setRemoteAddr(request.getRemotePort());
         return ref;
     }
 
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 3b27957944..c9486c6149 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -842,6 +842,36 @@
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpclient</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents.client5</groupId>
+      <artifactId>httpclient5</artifactId>
+      <version>5.5</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.projectreactor.netty</groupId>
+      <artifactId>reactor-netty</artifactId>
+      <version>1.2.10</version>
+      <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-classes-epoll</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-epoll</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-kqueue</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-classes-kqueue</artifactId>
+          </exclusion>
+       </exclusions>
+    </dependency>
     <dependency>
       <groupId>org.apache.asterix</groupId>
       <artifactId>asterix-lang-common</artifactId>
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java
index 8c5a93b213..3f22014547 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java
@@ -18,38 +18,71 @@
  */
 package org.apache.asterix.api.http.server;
 
+import static 
org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
 import static 
org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA;
 import static 
org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON;
+import static org.apache.asterix.common.library.LibraryDescriptor.FIELD_HASH;
 
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.InetAddress;
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.OpenOption;
 import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.asterix.app.message.CreateLibraryRequestMessage;
+import org.apache.asterix.app.message.DropLibraryRequestMessage;
+import org.apache.asterix.app.message.InternalRequestResponse;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.metadata.Namespace;
+import org.apache.asterix.external.util.ExternalLibraryUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.IFormattedException;
-import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.AbstractServlet;
 import org.apache.hyracks.http.server.utils.HttpUtil;
+import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpScheme;
 import io.netty.handler.codec.http.multipart.FileUpload;
 import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
 import io.netty.handler.codec.http.multipart.InterfaceHttpData;
@@ -61,8 +94,13 @@ public abstract class AbstractNCUdfServlet extends 
AbstractServlet {
     INCServiceContext srvCtx;
 
     protected final IApplicationContext plainAppCtx;
-    private final HttpScheme httpServerProtocol;
-    private final int httpServerPort;
+    protected final IReceptionist receptionist;
+    protected final int timeout;
+    protected ILibraryManager libraryManager;
+    protected Path workingDir;
+    protected String sysAuthHeader;
+
+    private static final Logger LOGGER = LogManager.getLogger();
 
     public static final String GET_UDF_DIST_ENDPOINT = "/dist";
     public static final String TYPE_PARAMETER = "type";
@@ -90,15 +128,148 @@ public abstract class AbstractNCUdfServlet extends 
AbstractServlet {
 
     }
 
-    public AbstractNCUdfServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, IApplicationContext appCtx,
-            HttpScheme httpServerProtocol, int httpServerPort) {
+    public AbstractNCUdfServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, IApplicationContext appCtx) {
         super(ctx, paths);
         this.plainAppCtx = appCtx;
-        this.httpServerProtocol = httpServerProtocol;
-        this.httpServerPort = httpServerPort;
+        this.timeout = 
appCtx.getExternalProperties().getLibraryDeployTimeout();
+        this.receptionist = appCtx.getReceptionist();
+    }
+
+    public void init() throws IOException {
+        appCtx = (INcApplicationContext) plainAppCtx;
+        this.libraryManager = appCtx.getLibraryManager();
+        srvCtx = this.appCtx.getServiceContext();
+        workingDir = 
Paths.get(appCtx.getLibraryManager().getDistributionDir().getAbsolutePath()).normalize();
+        initAuth();
+        initStorage();
+    }
+
+    protected void initAuth() {
+        sysAuthHeader = (String) ctx.get(SYS_AUTH_HEADER);
+    }
+
+    protected void initStorage() throws IOException {
+        // prepare working directory
+        if (Files.isDirectory(workingDir)) {
+            try {
+                FileUtils.cleanDirectory(workingDir.toFile());
+            } catch (IOException e) {
+                LOGGER.warn("Could not clean directory: " + workingDir, e);
+            }
+        } else {
+            Files.deleteIfExists(workingDir);
+            FileUtil.forceMkdirs(workingDir.toFile());
+        }
+    }
+
+    protected enum LibraryOperation {
+        UPSERT,
+        DELETE
+    }
+
+    protected Map<String, String> 
additionalHttpHeadersFromRequest(IServletRequest request) {
+        return Collections.emptyMap();
     }
 
-    void readFromFile(Path filePath, IServletResponse response, String 
contentType, OpenOption opt) throws Exception {
+    protected void doCreate(Namespace libNamespace, String libraryName, 
ExternalFunctionLanguage language, String hash,
+            URI downloadURI, boolean replaceIfExists, String sysAuthHeader, 
IRequestReference requestReference,
+            IServletRequest request) throws Exception {
+        INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker();
+        MessageFuture responseFuture = ncMb.registerMessageFuture();
+        CreateLibraryRequestMessage req = new 
CreateLibraryRequestMessage(srvCtx.getNodeId(),
+                responseFuture.getFutureId(), libNamespace, libraryName, 
language, hash, downloadURI, replaceIfExists,
+                sysAuthHeader, requestReference, 
additionalHttpHeadersFromRequest(request));
+        sendMessage(req, responseFuture);
+    }
+
+    protected void doDrop(Namespace namespace, String libraryName, boolean 
replaceIfExists,
+            IRequestReference requestReference, IServletRequest request) 
throws Exception {
+        INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker();
+        MessageFuture responseFuture = ncMb.registerMessageFuture();
+        DropLibraryRequestMessage req = new 
DropLibraryRequestMessage(srvCtx.getNodeId(), responseFuture.getFutureId(),
+                namespace, libraryName, replaceIfExists, requestReference, 
additionalHttpHeadersFromRequest(request));
+        sendMessage(req, responseFuture);
+    }
+
+    private void sendMessage(ICcAddressedMessage requestMessage, MessageFuture 
responseFuture) throws Exception {
+        // Running on NC -> send 'execute' message to CC
+        INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker();
+        InternalRequestResponse responseMsg;
+        try {
+            ncMb.sendMessageToPrimaryCC(requestMessage);
+            responseMsg = (InternalRequestResponse) 
responseFuture.get(timeout, TimeUnit.SECONDS);
+        } finally {
+            ncMb.deregisterMessageFuture(responseFuture.getFutureId());
+        }
+
+        Throwable err = responseMsg.getError();
+        if (err != null) {
+            if (err instanceof Error) {
+                throw (Error) err;
+            } else if (err instanceof Exception) {
+                throw (Exception) err;
+            } else {
+                throw new Exception(err.toString(), err);
+            }
+        }
+    }
+
+    protected void handleModification(IServletRequest request, 
IServletResponse response, LibraryOperation op) {
+        HttpRequest httpRequest = request.getHttpRequest();
+        Path libraryTempFile = null;
+        FileOutputStream libTmpOut = null;
+        HttpPostRequestDecoder requestDecoder = null;
+        String localPath = localPath(request);
+        try {
+            Pair<Namespace, String> namespaceAndName = 
decodeDvAndLibFromLocalPath(localPath);
+            String libName = namespaceAndName.second;
+            Namespace libNamespace = namespaceAndName.first;
+            DataverseName libDv = libNamespace.getDataverseName();
+            IRequestReference requestReference = receptionist.welcome(request);
+            if (op == LibraryOperation.UPSERT) {
+                requestDecoder = new HttpPostRequestDecoder(httpRequest);
+                LibraryUploadData uploadData = 
decodeMultiPartLibraryOptions(requestDecoder);
+                ExternalFunctionLanguage language = uploadData.type;
+                String fileExt = 
FilenameUtils.getExtension(uploadData.fileUpload.getFilename());
+                MessageDigest digest = MessageDigest.getInstance("MD5");
+                distributeLibrary(uploadData, libDv, libName, fileExt, 
language, digest, libNamespace, requestReference,
+                        request);
+            } else if (op == LibraryOperation.DELETE) {
+                //DELETE semantics imply ifExists
+                doDrop(libNamespace, libName, false, requestReference, 
request);
+            }
+            response.setStatus(HttpResponseStatus.OK);
+            PrintWriter responseWriter = response.writer();
+            String emptyJson = "{}";
+            responseWriter.write(emptyJson);
+            responseWriter.flush();
+        } catch (Exception e) {
+            writeException(e, response);
+            LOGGER.info("Error modifying library", e);
+        } finally {
+            if (requestDecoder != null) {
+                requestDecoder.destroy();
+            }
+            try {
+                if (libraryTempFile != null) {
+                    if (libTmpOut != null) {
+                        libTmpOut.close();
+                    }
+                    Files.deleteIfExists(libraryTempFile);
+                }
+            } catch (IOException e) {
+                LOGGER.warn("Could not delete temporary file " + 
libraryTempFile, e);
+            }
+        }
+    }
+
+    protected void distributeLibrary(LibraryUploadData uploadData, 
DataverseName libDv, String libName, String fileExt,
+            ExternalFunctionLanguage language, MessageDigest digest, Namespace 
namespace,
+            IRequestReference requestReference, IServletRequest request) 
throws Exception {
+    }
+
+    protected void readFromFile(Path filePath, IServletResponse response, 
String contentType, OpenOption opt)
+            throws Exception {
         class InputStreamGetter extends SynchronizableWork {
             private InputStream is;
 
@@ -132,12 +303,6 @@ public abstract class AbstractNCUdfServlet extends 
AbstractServlet {
         return DATAVERSE_KEY;
     }
 
-    URI createDownloadURI(Path file) throws Exception {
-        String host = 
appCtx.getServiceContext().getAppConfig().getString(NCConfig.Option.PUBLIC_ADDRESS);
-        String path = paths[0].substring(0, servletPathLengths[0]) + 
GET_UDF_DIST_ENDPOINT + '/' + file.getFileName();
-        return new URI(httpServerProtocol.toString(), null, host, 
httpServerPort, path, null, null);
-    }
-
     private boolean isNotAttribute(InterfaceHttpData field) {
         return field == null || 
!field.getHttpDataType().equals(InterfaceHttpData.HttpDataType.Attribute);
     }
@@ -207,4 +372,85 @@ public abstract class AbstractNCUdfServlet extends 
AbstractServlet {
         return HttpResponseStatus.INTERNAL_SERVER_ERROR;
     }
 
+    @Override
+    protected void get(IServletRequest request, IServletResponse response) 
throws Exception {
+        String localPath = localPath(request);
+        try {
+            if (localPath.equals("/") || localPath.equals("")) {
+                //TODO: nicer way to get this into display form?
+                Map<Namespace, Map<String, String>> dvToLibHashes =
+                        
ExternalLibraryUtils.produceLibraryListing(libraryManager);
+                List<Map<String, Object>> libraryList = new ArrayList<>();
+                for (Map.Entry<Namespace, Map<String, String>> dvAndLibs : 
dvToLibHashes.entrySet()) {
+                    for (Map.Entry<String, String> libsInDv : 
dvAndLibs.getValue().entrySet()) {
+                        Map<String, Object> libraryEntry = new HashMap<>();
+                        libraryEntry.put(getDataverseKey(), 
libraryManager.getNsOrDv(dvAndLibs.getKey()));
+                        libraryEntry.put(NAME_KEY, libsInDv.getKey());
+                        libraryEntry.put(FIELD_HASH, libsInDv.getValue());
+                        libraryList.add(libraryEntry);
+                    }
+                }
+                JsonNode libraryListing = 
OBJECT_MAPPER.valueToTree(libraryList);
+                response.setStatus(HttpResponseStatus.OK);
+                HttpUtil.setContentType(response, 
HttpUtil.ContentType.APPLICATION_JSON, request);
+                PrintWriter responseWriter = response.writer();
+                JSONUtil.writeNode(responseWriter, libraryListing);
+                responseWriter.flush();
+            } else if (localPath(request).startsWith(GET_UDF_DIST_ENDPOINT)) {
+                localPath = 
localPath(request).substring(GET_UDF_DIST_ENDPOINT.length());
+                while (localPath.startsWith("/")) {
+                    localPath = localPath.substring(1);
+                }
+                if (localPath.isEmpty()) {
+                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
+                    return;
+                }
+                Path filePath = workingDir.resolve(localPath).normalize();
+                if (!filePath.startsWith(workingDir)) {
+                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
+                    return;
+                }
+                readFromFile(filePath, response, 
HttpUtil.ContentType.APPLICATION_OCTET_STREAM, null);
+            } else {
+                response.setStatus(HttpResponseStatus.NOT_FOUND);
+            }
+        } catch (Exception e) {
+            writeException(e, response);
+            LOGGER.error("Error reading library", e);
+        }
+    }
+
+    protected void writeException(Exception e, IServletResponse response) {
+        response.setStatus(toHttpErrorStatus(e));
+        PrintWriter responseWriter = response.writer();
+        Map<String, String> error = Collections.singletonMap("error", 
e.getMessage());
+        String errorJson = "";
+        try {
+            errorJson = OBJECT_MAPPER.writeValueAsString(error);
+        } catch (JsonProcessingException ex) {
+            responseWriter.write("{ \"error\": \"Unable to process error 
message!\" }");
+        }
+        responseWriter.write(errorJson);
+        responseWriter.flush();
+    }
+
+    abstract protected boolean isRequestPermitted(IServletRequest request, 
IServletResponse response)
+            throws IOException;
+
+    protected boolean isRequestOnLoopback(IServletRequest request) {
+        if (request.getLocalAddress() != null && request.getRemoteAddress() != 
null) {
+            InetAddress local = request.getLocalAddress().getAddress();
+            InetAddress remote = request.getRemoteAddress().getAddress();
+            return remote.isLoopbackAddress() && local.isLoopbackAddress();
+        } else {
+            return false;
+        }
+    }
+
+    protected void rejectForbidden(IServletResponse response) throws 
IOException {
+        // TODO: why this JSON format, do we use this anywhere else?
+        sendError(response, HttpUtil.ContentType.APPLICATION_JSON, 
HttpResponseStatus.FORBIDDEN,
+                "{ \"error\": \"Forbidden\" }");
+    }
+
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
index 02a99f657e..7194d5d86b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
@@ -18,245 +18,60 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
-import static org.apache.asterix.common.library.LibraryDescriptor.FIELD_HASH;
-import static 
org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
-import static 
org.apache.asterix.external.library.ExternalLibraryManager.writeDescriptor;
-
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.net.InetAddress;
 import java.net.URI;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.security.DigestOutputStream;
 import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.asterix.app.message.CreateLibraryRequestMessage;
-import org.apache.asterix.app.message.DropLibraryRequestMessage;
-import org.apache.asterix.app.message.InternalRequestResponse;
 import org.apache.asterix.common.api.IApplicationContext;
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.library.LibraryDescriptor;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.metadata.Namespace;
 import org.apache.asterix.external.util.ExternalLibraryUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
-import org.apache.hyracks.http.server.utils.HttpUtil;
-import org.apache.hyracks.util.JSONUtil;
-import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpScheme;
-import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
 
 public class NCUdfApiServlet extends AbstractNCUdfServlet {
 
-    protected final IReceptionist receptionist;
-
-    protected Path workingDir;
-    private String sysAuthHeader;
-    private ILibraryManager libraryManager;
-    private int timeout;
+    private final HttpScheme httpServerProtocol;
+    private final int httpServerPort;
 
     private static final Logger LOGGER = LogManager.getLogger();
 
     public NCUdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, 
IApplicationContext appCtx,
             HttpScheme httpServerProtocol, int httpServerPort) {
-        super(ctx, paths, appCtx, httpServerProtocol, httpServerPort);
-        this.receptionist = appCtx.getReceptionist();
-        this.timeout = 
appCtx.getExternalProperties().getLibraryDeployTimeout();
+        super(ctx, paths, appCtx);
+        this.httpServerProtocol = httpServerProtocol;
+        this.httpServerPort = httpServerPort;
     }
 
-    private enum LibraryOperation {
-        UPSERT,
-        DELETE
+    URI createDownloadURI(Path file) throws Exception {
+        String host = 
appCtx.getServiceContext().getAppConfig().getString(NCConfig.Option.PUBLIC_ADDRESS);
+        String path = paths[0].substring(0, servletPathLengths[0]) + 
GET_UDF_DIST_ENDPOINT + '/' + file.getFileName();
+        return new URI(httpServerProtocol.toString(), null, host, 
httpServerPort, path, null, null);
     }
 
     @Override
-    public void init() throws IOException {
-        appCtx = (INcApplicationContext) plainAppCtx;
-        this.libraryManager = appCtx.getLibraryManager();
-        srvCtx = this.appCtx.getServiceContext();
-        workingDir = 
Paths.get(appCtx.getLibraryManager().getDistributionDir().getAbsolutePath()).normalize();
-        initAuth();
-        initStorage();
-    }
-
-    protected void initAuth() {
-        sysAuthHeader = (String) ctx.get(SYS_AUTH_HEADER);
-    }
-
-    protected void initStorage() throws IOException {
-        // prepare working directory
-        if (Files.isDirectory(workingDir)) {
-            try {
-                FileUtils.cleanDirectory(workingDir.toFile());
-            } catch (IOException e) {
-                LOGGER.warn("Could not clean directory: " + workingDir, e);
-            }
-        } else {
-            Files.deleteIfExists(workingDir);
-            FileUtil.forceMkdirs(workingDir.toFile());
-        }
-    }
-
-    protected Map<String, String> 
additionalHttpHeadersFromRequest(IServletRequest request) {
-        return Collections.emptyMap();
-    }
-
-    private void doCreate(Namespace libNamespace, String libraryName, 
ExternalFunctionLanguage language, String hash,
-            URI downloadURI, boolean replaceIfExists, String sysAuthHeader, 
IRequestReference requestReference,
-            IServletRequest request) throws Exception {
-        INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker();
-        MessageFuture responseFuture = ncMb.registerMessageFuture();
-        CreateLibraryRequestMessage req = new 
CreateLibraryRequestMessage(srvCtx.getNodeId(),
-                responseFuture.getFutureId(), libNamespace, libraryName, 
language, hash, downloadURI, replaceIfExists,
-                sysAuthHeader, requestReference, 
additionalHttpHeadersFromRequest(request));
-        sendMessage(req, responseFuture);
-    }
-
-    private void doDrop(Namespace namespace, String libraryName, boolean 
replaceIfExists,
+    protected void distributeLibrary(LibraryUploadData uploadData, 
DataverseName libDv, String libName, String fileExt,
+            ExternalFunctionLanguage language, MessageDigest digest, Namespace 
libNamespace,
             IRequestReference requestReference, IServletRequest request) 
throws Exception {
-        INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker();
-        MessageFuture responseFuture = ncMb.registerMessageFuture();
-        DropLibraryRequestMessage req = new 
DropLibraryRequestMessage(srvCtx.getNodeId(), responseFuture.getFutureId(),
-                namespace, libraryName, replaceIfExists, requestReference, 
additionalHttpHeadersFromRequest(request));
-        sendMessage(req, responseFuture);
-    }
-
-    private void sendMessage(ICcAddressedMessage requestMessage, MessageFuture 
responseFuture) throws Exception {
-        // Running on NC -> send 'execute' message to CC
-        INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker();
-        InternalRequestResponse responseMsg;
-        try {
-            ncMb.sendMessageToPrimaryCC(requestMessage);
-            responseMsg = (InternalRequestResponse) 
responseFuture.get(timeout, TimeUnit.SECONDS);
-        } finally {
-            ncMb.deregisterMessageFuture(responseFuture.getFutureId());
-        }
-
-        Throwable err = responseMsg.getError();
-        if (err != null) {
-            if (err instanceof Error) {
-                throw (Error) err;
-            } else if (err instanceof Exception) {
-                throw (Exception) err;
-            } else {
-                throw new Exception(err.toString(), err);
-            }
-        }
-    }
-
-    @Override
-    protected void get(IServletRequest request, IServletResponse response) 
throws Exception {
-        String localPath = localPath(request);
-        try {
-            if (localPath.equals("/") || localPath.equals("")) {
-                //TODO: nicer way to get this into display form?
-                Map<Namespace, Map<String, String>> dvToLibHashes =
-                        
ExternalLibraryUtils.produceLibraryListing(libraryManager);
-                List<Map<String, Object>> libraryList = new ArrayList<>();
-                for (Map.Entry<Namespace, Map<String, String>> dvAndLibs : 
dvToLibHashes.entrySet()) {
-                    for (Map.Entry<String, String> libsInDv : 
dvAndLibs.getValue().entrySet()) {
-                        Map<String, Object> libraryEntry = new HashMap<>();
-                        libraryEntry.put(getDataverseKey(), 
libraryManager.getNsOrDv(dvAndLibs.getKey()));
-                        libraryEntry.put(NAME_KEY, libsInDv.getKey());
-                        libraryEntry.put(FIELD_HASH, libsInDv.getValue());
-                        libraryList.add(libraryEntry);
-                    }
-                }
-                JsonNode libraryListing = 
OBJECT_MAPPER.valueToTree(libraryList);
-                response.setStatus(HttpResponseStatus.OK);
-                HttpUtil.setContentType(response, 
HttpUtil.ContentType.APPLICATION_JSON, request);
-                PrintWriter responseWriter = response.writer();
-                JSONUtil.writeNode(responseWriter, libraryListing);
-                responseWriter.flush();
-            } else if (localPath(request).startsWith(GET_UDF_DIST_ENDPOINT)) {
-                localPath = 
localPath(request).substring(GET_UDF_DIST_ENDPOINT.length());
-                while (localPath.startsWith("/")) {
-                    localPath = localPath.substring(1);
-                }
-                if (localPath.isEmpty()) {
-                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
-                    return;
-                }
-                Path filePath = workingDir.resolve(localPath).normalize();
-                if (!filePath.startsWith(workingDir)) {
-                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
-                    return;
-                }
-                readFromFile(filePath, response, 
HttpUtil.ContentType.APPLICATION_OCTET_STREAM, null);
-            } else {
-                response.setStatus(HttpResponseStatus.NOT_FOUND);
-            }
-        } catch (Exception e) {
-            writeException(e, response);
-            LOGGER.error("Error reading library", e);
-        }
-    }
-
-    private void writeLibToCloud(LibraryUploadData uploadData, Namespace 
libNamespace, String libName,
-            MessageDigest digest, ExternalFunctionLanguage language) throws 
IOException {
-        FileReference libDir = libraryManager.getLibraryDir(libNamespace, 
libName);
-        IIOManager cloudIoMgr = libraryManager.getCloudIOManager();
-        FileReference lib = 
libDir.getChild(ILibraryManager.LIBRARY_ARCHIVE_NAME);
-        if (!libDir.getFile().exists()) {
-            Files.createDirectories(lib.getFile().toPath().getParent());
-        }
-        if (!lib.getFile().exists()) {
-            Files.createFile(lib.getFile().toPath());
-        }
-        IFileHandle fh = cloudIoMgr.open(lib, 
IIOManager.FileReadWriteMode.READ_WRITE,
-                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-        WritableByteChannel outChannel = cloudIoMgr.newWritableChannel(fh);
-        byte[] writeBuf = new byte[4096];
-        FileReference targetDescFile = libDir.getChild(DESCRIPTOR_FILE_NAME);
-        try (OutputStream outputStream = new 
DigestOutputStream(Channels.newOutputStream(outChannel), digest);
-                InputStream ui = new 
ByteBufInputStream((uploadData.fileUpload).getByteBuf())) {
-            IOUtils.copyLarge(ui, outputStream, writeBuf);
-            outputStream.flush();
-            cloudIoMgr.sync(fh, true);
-            writeDescriptor(libraryManager, targetDescFile,
-                    new LibraryDescriptor(language, 
ExternalLibraryUtils.digestToHexString(digest)), true, writeBuf);
-        } finally {
-            cloudIoMgr.close(fh);
-        }
+        URI downloadURI = cacheLibAndDistribute(uploadData, libDv, libName, 
fileExt, digest);
+        doCreate(libNamespace, libName, language, 
ExternalLibraryUtils.digestToHexString(digest), downloadURI, true,
+                getSysAuthHeader(), requestReference, request);
     }
 
     private URI cacheLibAndDistribute(LibraryUploadData uploadData, 
DataverseName libDv, String libName, String fileExt,
@@ -274,80 +89,11 @@ public class NCUdfApiServlet extends AbstractNCUdfServlet {
         return createDownloadURI(libraryTempFile);
     }
 
-    private void handleModification(IServletRequest request, IServletResponse 
response, LibraryOperation op) {
-        HttpRequest httpRequest = request.getHttpRequest();
-        Path libraryTempFile = null;
-        FileOutputStream libTmpOut = null;
-        HttpPostRequestDecoder requestDecoder = null;
-        String localPath = localPath(request);
-        try {
-            Pair<Namespace, String> namespaceAndName = 
decodeDvAndLibFromLocalPath(localPath);
-            String libName = namespaceAndName.second;
-            Namespace libNamespace = namespaceAndName.first;
-            DataverseName libDv = libNamespace.getDataverseName();
-            IRequestReference requestReference = receptionist.welcome(request);
-            if (op == LibraryOperation.UPSERT) {
-                requestDecoder = new HttpPostRequestDecoder(httpRequest);
-                LibraryUploadData uploadData = 
decodeMultiPartLibraryOptions(requestDecoder);
-                ExternalFunctionLanguage language = uploadData.type;
-                String fileExt = 
FilenameUtils.getExtension(uploadData.fileUpload.getFilename());
-                MessageDigest digest = MessageDigest.getInstance("MD5");
-                if (appCtx.isCloudDeployment()) {
-                    writeLibToCloud(uploadData, libNamespace, libName, digest, 
language);
-                    doCreate(libNamespace, libName, language, 
ExternalLibraryUtils.digestToHexString(digest), null,
-                            true, getSysAuthHeader(), requestReference, 
request);
-                } else {
-                    URI downloadURI = cacheLibAndDistribute(uploadData, libDv, 
libName, fileExt, digest);
-                    doCreate(libNamespace, libName, language, 
ExternalLibraryUtils.digestToHexString(digest),
-                            downloadURI, true, getSysAuthHeader(), 
requestReference, request);
-                }
-            } else if (op == LibraryOperation.DELETE) {
-                //DELETE semantics imply ifExists
-                doDrop(libNamespace, libName, false, requestReference, 
request);
-            }
-            response.setStatus(HttpResponseStatus.OK);
-            PrintWriter responseWriter = response.writer();
-            String emptyJson = "{}";
-            responseWriter.write(emptyJson);
-            responseWriter.flush();
-        } catch (Exception e) {
-            writeException(e, response);
-            LOGGER.info("Error modifying library", e);
-        } finally {
-            if (requestDecoder != null) {
-                requestDecoder.destroy();
-            }
-            try {
-                if (libraryTempFile != null) {
-                    if (libTmpOut != null) {
-                        libTmpOut.close();
-                    }
-                    Files.deleteIfExists(libraryTempFile);
-                }
-            } catch (IOException e) {
-                LOGGER.warn("Could not delete temporary file " + 
libraryTempFile, e);
-            }
-        }
-    }
-
     protected String getSysAuthHeader() {
         return sysAuthHeader;
     }
 
-    private void writeException(Exception e, IServletResponse response) {
-        response.setStatus(toHttpErrorStatus(e));
-        PrintWriter responseWriter = response.writer();
-        Map<String, String> error = Collections.singletonMap("error", 
e.getMessage());
-        String errorJson = "";
-        try {
-            errorJson = OBJECT_MAPPER.writeValueAsString(error);
-        } catch (JsonProcessingException ex) {
-            responseWriter.write("{ \"error\": \"Unable to process error 
message!\" }");
-        }
-        responseWriter.write(errorJson);
-        responseWriter.flush();
-    }
-
+    @Override
     protected boolean isRequestPermitted(IServletRequest request, 
IServletResponse response) throws IOException {
         if (!isRequestOnLoopback(request)) {
             rejectForbidden(response);
@@ -356,22 +102,6 @@ public class NCUdfApiServlet extends AbstractNCUdfServlet {
         return true;
     }
 
-    protected boolean isRequestOnLoopback(IServletRequest request) {
-        if (request.getLocalAddress() != null && request.getRemoteAddress() != 
null) {
-            InetAddress local = request.getLocalAddress().getAddress();
-            InetAddress remote = request.getRemoteAddress().getAddress();
-            return remote.isLoopbackAddress() && local.isLoopbackAddress();
-        } else {
-            return false;
-        }
-    }
-
-    protected void rejectForbidden(IServletResponse response) throws 
IOException {
-        // TODO: why this JSON format, do we use this anywhere else?
-        sendError(response, HttpUtil.ContentType.APPLICATION_JSON, 
HttpResponseStatus.FORBIDDEN,
-                "{ \"error\": \"Forbidden\" }");
-    }
-
     @Override
     protected void post(IServletRequest request, IServletResponse response) 
throws IOException {
         if (isRequestPermitted(request, response)) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfDSApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfDSApiServlet.java
new file mode 100644
index 0000000000..d5311cedde
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfDSApiServlet.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import static 
org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
+import static 
org.apache.asterix.external.library.ExternalLibraryManager.writeDescriptor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.library.LibraryDescriptor;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.Namespace;
+import org.apache.asterix.external.util.ExternalLibraryUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+
+import io.netty.buffer.ByteBufInputStream;
+
+public class NCUdfDSApiServlet extends AbstractNCUdfServlet {
+
+    public NCUdfDSApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, IApplicationContext appCtx) {
+        super(ctx, paths, appCtx);
+    }
+
+    protected void distributeLibrary(LibraryUploadData uploadData, 
DataverseName libDv, String libName, String fileExt,
+            ExternalFunctionLanguage language, MessageDigest digest, Namespace 
namespace,
+            IRequestReference requestReference, IServletRequest request) 
throws Exception {
+        writeLibToCloud(uploadData, namespace, libName, digest, language);
+        doCreate(namespace, libName, language, 
ExternalLibraryUtils.digestToHexString(digest), null, true,
+                getSysAuthHeader(), requestReference, request);
+    }
+
+    private void writeLibToCloud(LibraryUploadData uploadData, Namespace 
libNamespace, String libName,
+            MessageDigest digest, ExternalFunctionLanguage language) throws 
IOException {
+        FileReference libDir = libraryManager.getLibraryDir(libNamespace, 
libName);
+        IIOManager cloudIoMgr = libraryManager.getCloudIOManager();
+        FileReference lib = 
libDir.getChild(ILibraryManager.LIBRARY_ARCHIVE_NAME);
+        if (!libDir.getFile().exists()) {
+            Files.createDirectories(lib.getFile().toPath().getParent());
+        }
+        if (!lib.getFile().exists()) {
+            Files.createFile(lib.getFile().toPath());
+        }
+        IFileHandle fh = cloudIoMgr.open(lib, 
IIOManager.FileReadWriteMode.READ_WRITE,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        WritableByteChannel outChannel = cloudIoMgr.newWritableChannel(fh);
+        byte[] writeBuf = new byte[4096];
+        FileReference targetDescFile = libDir.getChild(DESCRIPTOR_FILE_NAME);
+        try (OutputStream outputStream = new 
DigestOutputStream(Channels.newOutputStream(outChannel), digest);
+                InputStream ui = new 
ByteBufInputStream((uploadData.fileUpload).getByteBuf())) {
+            IOUtils.copyLarge(ui, outputStream, writeBuf);
+            outputStream.flush();
+            cloudIoMgr.sync(fh, true);
+            writeDescriptor(libraryManager, targetDescFile,
+                    new LibraryDescriptor(language, 
ExternalLibraryUtils.digestToHexString(digest)), true, writeBuf);
+        } finally {
+            cloudIoMgr.close(fh);
+        }
+    }
+
+    protected String getSysAuthHeader() {
+        return sysAuthHeader;
+    }
+
+    @Override
+    protected boolean isRequestPermitted(IServletRequest request, 
IServletResponse response) throws IOException {
+        return true;
+    }
+
+    @Override
+    protected void post(IServletRequest request, IServletResponse response) 
throws IOException {
+        if (isRequestPermitted(request, response)) {
+            handleModification(request, response, LibraryOperation.UPSERT);
+        }
+    }
+
+    @Override
+    protected void delete(IServletRequest request, IServletResponse response) 
throws IOException {
+        if (isRequestPermitted(request, response)) {
+            handleModification(request, response, LibraryOperation.DELETE);
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java
index 2c29d1480e..ca8bc3bcb6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java
@@ -29,33 +29,33 @@ import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 
-import io.netty.handler.codec.http.HttpScheme;
-
 public class NCUdfRecoveryServlet extends AbstractNCUdfServlet {
 
-    ExternalLibraryManager libraryManager;
-
     public static final String GET_ALL_UDF_ENDPOINT = "/all";
 
-    public NCUdfRecoveryServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, IApplicationContext appCtx,
-            HttpScheme httpServerProtocol, int httpServerPort) {
-        super(ctx, paths, appCtx, httpServerProtocol, httpServerPort);
+    public NCUdfRecoveryServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, IApplicationContext appCtx) {
+        super(ctx, paths, appCtx);
     }
 
     @Override
     public void init() {
         appCtx = (INcApplicationContext) plainAppCtx;
         srvCtx = this.appCtx.getServiceContext();
-        this.libraryManager = (ExternalLibraryManager) 
appCtx.getLibraryManager();
     }
 
     @Override
     protected void get(IServletRequest request, IServletResponse response) 
throws Exception {
         String localPath = localPath(request);
         if (localPath.equals(GET_ALL_UDF_ENDPOINT)) {
-            Path zippedLibs = libraryManager.zipAllLibs();
+            Path zippedLibs = ((ExternalLibraryManager) 
libraryManager).zipAllLibs();
             readFromFile(zippedLibs, response, 
HttpUtil.ContentType.APPLICATION_ZIP,
                     StandardOpenOption.DELETE_ON_CLOSE);
         }
     }
+
+    @Override
+    protected boolean isRequestPermitted(IServletRequest request, 
IServletResponse response) {
+        //gated by system auth
+        return true;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 7566206087..40a362e418 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -31,12 +31,18 @@ import static 
org.apache.hyracks.control.common.controllers.ControllerConfig.Opt
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -44,6 +50,7 @@ import 
org.apache.asterix.api.http.server.NCQueryResultApiServlet;
 import org.apache.asterix.api.http.server.NCQueryServiceServlet;
 import org.apache.asterix.api.http.server.NCQueryStatusApiServlet;
 import org.apache.asterix.api.http.server.NCUdfApiServlet;
+import org.apache.asterix.api.http.server.NCUdfDSApiServlet;
 import org.apache.asterix.api.http.server.NCUdfRecoveryServlet;
 import org.apache.asterix.api.http.server.NetDiagnosticsApiServlet;
 import org.apache.asterix.api.http.server.ServletConstants;
@@ -98,6 +105,7 @@ import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -131,6 +139,7 @@ public class NCApplication extends BaseNCApplication {
     protected boolean startupCompleted;
     protected WebManager webManager;
     private HttpServer apiServer;
+    private HttpServer udfServer;
 
     @Override
     public void registerConfig(IConfigManager configManager) {
@@ -265,19 +274,38 @@ public class NCApplication extends BaseNCApplication {
         apiServer.setAttribute(ServletConstants.CREDENTIAL_MAP,
                 parseCredentialMap(((NodeControllerService) 
ncServiceCtx.getControllerService()).getConfiguration()
                         .getCredentialFilePath()));
-        Pair<Map<String, String>, Map<String, String>> auth = 
BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
-        apiServer
-                .addServlet(new BasicAuthServlet(apiServer.ctx(),
-                        new NCUdfApiServlet(apiServer.ctx(), new String[] { 
UDF }, getApplicationContext(),
-                                apiServer.getScheme(), 
apiServer.getAddress().getPort()),
-                        auth.getFirst(), auth.getSecond()));
-        apiServer.addServlet(new BasicAuthServlet(
-                apiServer.ctx(), new NCUdfRecoveryServlet(apiServer.ctx(), new 
String[] { UDF_RECOVERY },
-                        getApplicationContext(), apiServer.getScheme(), 
apiServer.getAddress().getPort()),
-                auth.getFirst(), auth.getSecond()));
+        if (!getApplicationContext().isCloudDeployment()) {
+            Pair<Map<String, String>, Map<String, String>> auth =
+                    BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
+            apiServer.addServlet((new BasicAuthServlet(apiServer.ctx(),
+                    new NCUdfApiServlet(apiServer.ctx(), new String[] { UDF }, 
getApplicationContext(),
+                            apiServer.getScheme(), 
externalProperties.getNcApiPort()),
+                    auth.getFirst(), auth.getSecond())));
+            apiServer.addServlet(new BasicAuthServlet(apiServer.ctx(),
+                    new NCUdfRecoveryServlet(apiServer.ctx(), new String[] { 
UDF_RECOVERY }, getApplicationContext()),
+                    auth.getFirst(), auth.getSecond()));
+        } else {
+            //UDF deployment server
+            IApplicationConfig appCfg = 
getApplicationContext().getServiceContext().getAppConfig();
+            Path udfSockPath = 
Path.of(appCfg.getString(NCConfig.Option.UDF_API_DS_PATH)).toAbsolutePath();
+            if (!Files.exists(udfSockPath.getParent())) {
+                Set<PosixFilePermission> noOthers = 
PosixFilePermissions.fromString("rwxr-x---");
+                FileAttribute<?> permissions = 
PosixFilePermissions.asFileAttribute(noOthers);
+                Files.createDirectories(udfSockPath.getParent(), permissions);
+            }
+            udfServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), udfSockPath, config);
+            Pair<Map<String, String>, Map<String, String>> auth =
+                    BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
+            udfServer.addServlet(new BasicAuthServlet(apiServer.ctx(),
+                    new NCUdfDSApiServlet(apiServer.ctx(), new String[] { UDF 
}, getApplicationContext()),
+                    auth.getFirst(), auth.getSecond()));
+            webManager.add(udfServer);
+
+        }
         apiServer.addServlet(new NCQueryStatusApiServlet(apiServer.ctx(), 
getApplicationContext(), QUERY_STATUS));
         apiServer.addServlet(new NCQueryResultApiServlet(apiServer.ctx(), 
getApplicationContext(), QUERY_RESULT));
         webManager.add(apiServer);
+
     }
 
     protected List<AsterixExtension> getExtensions() throws Exception {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
index b86730b07d..4cac409039 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
@@ -28,14 +28,14 @@ import 
com.adobe.testing.s3mock.testcontainers.S3MockContainer;
 public class CloudStorageIntegrationUtil extends AsterixHyracksIntegrationUtil 
{
 
     public static final String RESOURCES_PATH = 
joinPath(getProjectPath().toString(), "src", "test", "resources");
-    public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, 
"cc-cloud-storage-main.conf");
-    public static final String CONFIG_FILE_TEMPLATE = joinPath(RESOURCES_PATH, 
"cc-cloud-storage-main.ftl");
+    public static final String TARGET_PATH = 
joinPath(getProjectPath().toString(), "target");
+    public static final String CONFIG_FILE = joinPath(TARGET_PATH, 
"cc-cloud-storage.conf");
+    public static final String CONFIG_FILE_TEMPLATE = joinPath(RESOURCES_PATH, 
"cc-cloud-storage.conf.ftl");
 
     public static void main(String[] args) throws Exception {
-        boolean cleanStart = Boolean.getBoolean("cleanup.start");
-        LocalCloudUtilAdobeMock.startS3CloudEnvironment(cleanStart);
-        final AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
+        boolean cleanStart = true;
         try (S3MockContainer s3Mock = 
LocalCloudUtilAdobeMock.startS3CloudEnvironment(cleanStart)) {
+            final AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
             fillConfigTemplate(MOCK_SERVER_HOSTNAME_FRAGMENT + 
s3Mock.getHttpServerPort(), CONFIG_FILE_TEMPLATE,
                     CONFIG_FILE);
             integrationUtil.run(cleanStart, 
Boolean.getBoolean("cleanup.shutdown"),
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/CloudUDFLibrarian.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/CloudUDFLibrarian.java
new file mode 100644
index 0000000000..fac50e7f93
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/CloudUDFLibrarian.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.UnixDomainSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClientResponse;
+
+@SuppressWarnings("squid:S134")
+public class CloudUDFLibrarian implements IExternalUDFLibrarian {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private String sockPath;
+    private final EventLoopGroup elg;
+
+    public CloudUDFLibrarian(String sockPath) {
+        this.sockPath = sockPath;
+        //force nio even if epoll/kqueue are on classpath
+        this.elg = new NioEventLoopGroup();
+    }
+
+    private static String createAuthHeader(Pair<String, String> credentials) {
+        String auth = credentials.first + ":" + credentials.second;
+        byte[] encodedAuth = 
Base64.getEncoder().encode(auth.getBytes(StandardCharsets.UTF_8));
+        return "Basic " + new String(encodedAuth);
+    }
+
+    public void setCredentials(Pair<String, String> credentials) {
+    }
+
+    public void setAddress(String address) {
+        this.sockPath = address;
+    }
+
+    @Override
+    public void install(String path, String type, String libPath, Pair<String, 
String> credentials) throws Exception {
+        File lib = new File(libPath);
+        UnixDomainSocketAddress sockAddr = 
UnixDomainSocketAddress.of(sockPath);
+
+        HttpClient hc = HttpClient.create().runOn(elg)
+                .headers(h -> h.set("Authorization", 
createAuthHeader(credentials))).remoteAddress(() -> sockAddr);
+        hc.post().uri(path)
+                .sendForm((req, form) -> form.multipart(true).attr("type", 
type).file("data", lib,
+                        "application/octet-stream"))
+                .responseSingle((response, content) -> 
handleResponse(response, content)).block();
+    }
+
+    @Override
+    public void uninstall(String path, Pair<String, String> credentials) 
throws IOException, AsterixException {
+        UnixDomainSocketAddress sockAddr = 
UnixDomainSocketAddress.of(sockPath);
+        HttpClient hc = HttpClient.create().runOn(elg)
+                .headers(h -> h.set("Authorization", 
createAuthHeader(credentials))).remoteAddress(() -> sockAddr);
+        hc.delete().uri(path).responseSingle((response, content) -> 
handleResponse(response, content)).block();
+    }
+
+    private Mono<Void> handleResponse(HttpClientResponse response, 
reactor.netty.ByteBufMono content) {
+        return content.asString().defaultIfEmpty("").flatMap(body -> {
+            int respCode = response.status().code();
+            if (respCode == HttpResponseStatus.OK.code()) {
+                return Mono.empty();
+            }
+
+            String errorMessage;
+            if (respCode == HttpResponseStatus.INTERNAL_SERVER_ERROR.code()
+                    || respCode == HttpResponseStatus.BAD_REQUEST.code()) {
+                try {
+                    errorMessage = 
OBJECT_MAPPER.readTree(body).get("error").asText();
+                } catch (IOException e) {
+                    errorMessage = "Failed to parse error response: " + body;
+                }
+            } else {
+                errorMessage = response.status().toString();
+            }
+            return Mono.error(new AsterixException(errorMessage));
+        });
+    }
+
+    @Override
+    public SocketType getSocketType() {
+        return SocketType.DOMAIN;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
index e32f8eaafc..760adf441f 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
@@ -57,8 +57,8 @@ public class ExternalUDFLibrarian implements 
IExternalUDFLibrarian {
     }
 
     @Override
-    public void install(URI path, String type, String libPath, Pair<String, 
String> credentials) throws Exception {
-        HttpClientContext hcCtx = createHttpClientContext(path, credentials);
+    public void install(String path, String type, String libPath, Pair<String, 
String> credentials) throws Exception {
+        HttpClientContext hcCtx = createHttpClientContext(URI.create(path), 
credentials);
         HttpPost post = new HttpPost(path);
         File lib = new File(libPath);
         MultipartEntityBuilder entity = 
MultipartEntityBuilder.create().setMode(HttpMultipartMode.STRICT);
@@ -70,8 +70,8 @@ public class ExternalUDFLibrarian implements 
IExternalUDFLibrarian {
     }
 
     @Override
-    public void uninstall(URI path, Pair<String, String> credentials) throws 
IOException, AsterixException {
-        HttpClientContext hcCtx = createHttpClientContext(path, credentials);
+    public void uninstall(String path, Pair<String, String> credentials) 
throws IOException, AsterixException {
+        HttpClientContext hcCtx = createHttpClientContext(URI.create(path), 
credentials);
         HttpDelete del = new HttpDelete(path);
         HttpResponse response = hc.execute(del, hcCtx);
         handleResponse(response);
@@ -103,4 +103,9 @@ public class ExternalUDFLibrarian implements 
IExternalUDFLibrarian {
             throw new AsterixException(resp);
         }
     }
+
+    @Override
+    public SocketType getSocketType() {
+        return SocketType.LOOPBACK;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
index 998fa78ace..b47d07a733 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
@@ -19,13 +19,19 @@
 package org.apache.asterix.app.external;
 
 import java.io.IOException;
-import java.net.URI;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
 public interface IExternalUDFLibrarian {
-    void install(URI path, String type, String libPath, Pair<String, String> 
credentials) throws Exception;
+    public enum SocketType {
+        LOOPBACK,
+        DOMAIN
+    }
 
-    void uninstall(URI path, Pair<String, String> credentials) throws 
IOException, AsterixException;
+    void install(String path, String type, String libPath, Pair<String, 
String> credentials) throws Exception;
+
+    void uninstall(String path, Pair<String, String> credentials) throws 
IOException, AsterixException;
+
+    SocketType getSocketType();
 }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudPythonTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudPythonTest.java
index 00dab0eeb3..ad6f6b46a7 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudPythonTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudPythonTest.java
@@ -21,11 +21,13 @@ package org.apache.asterix.test.cloud_storage;
 import static 
org.apache.asterix.api.common.LocalCloudUtilAdobeMock.fillConfigTemplate;
 import static 
org.apache.asterix.test.cloud_storage.CloudStorageTest.MOCK_SERVER_HOSTNAME_FRAGMENT;
 import static 
org.apache.asterix.test.runtime.ExternalPythonFunctionIT.setNcEndpoints;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
 
 import java.util.Collection;
 import java.util.List;
 
 import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
+import org.apache.asterix.app.external.CloudUDFLibrarian;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.test.common.TestConstants;
 import org.apache.asterix.test.common.TestExecutor;
@@ -48,7 +50,7 @@ import org.junit.runners.Parameterized.Parameters;
 import com.adobe.testing.s3mock.testcontainers.S3MockContainer;
 
 /**
- * Run tests in cloud deployment environment
+ * Run Python UDF tests in cloud deployment environment
  */
 @RunWith(Parameterized.class)
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
@@ -63,6 +65,7 @@ public class CloudPythonTest {
     public static final String CONFIG_FILE_TEMPLATE = 
"src/test/resources/cc-cloud-storage.conf.ftl";
     public static final String CONFIG_FILE = "target/cc-cloud-storage.conf";
     private static final String EXCLUDED_TESTS = "MP";
+    private static final String dsPath = joinPath("/", "tmp", "asterixdb_udf", 
"asterix_nc1_udf.sock");
 
     public CloudPythonTest(TestCaseContext tcCtx) {
         this.tcCtx = tcCtx;
@@ -78,7 +81,7 @@ public class CloudPythonTest {
         TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
         testExecutor.executorId = "cloud";
         testExecutor.stripSubstring = "//DB:";
-        LangExecutionUtil.setUp(CONFIG_FILE, testExecutor);
+        LangExecutionUtil.setUp(CONFIG_FILE, testExecutor, false, false, new 
CloudUDFLibrarian(dsPath));
         setNcEndpoints(testExecutor);
         System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE);
     }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 81ad57a769..bb2aa34c5f 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1475,7 +1475,13 @@ public class TestExecutor {
                     //TODO: this is not right. URLEncoder does not properly 
encode paths.
                     String dataverse = URLEncoder.encode(command[1], 
StandardCharsets.US_ASCII.name());
                     String library = URLEncoder.encode(command[2], 
StandardCharsets.US_ASCII.name());
-                    URI path = createEndpointURI("/admin/udf/" + dataverse + 
"/" + library);
+                    String basePath = "/admin/udf/" + dataverse + "/" + 
library;
+                    String path = "";
+                    switch (librarian.getSocketType()){
+                        case DOMAIN -> path = basePath;
+                        case LOOPBACK -> path = 
createEndpointURI(basePath).toString();
+                        default -> path = 
createEndpointURI(basePath).toString();
+                    }
                     if (command.length < 2) {
                         throw new Exception("invalid library command: " + 
line);
                     }
@@ -2742,6 +2748,11 @@ public class TestExecutor {
         return uri;
     }
 
+    protected URI createUDFDomainSockURI(String pathAndQuery) {
+        URI uri = URI.create("unix://target/tmp/asterix_nc1/udf.sock" + 
pathAndQuery);
+        return uri;
+    }
+
     public URI getEndpoint(String servlet) throws URISyntaxException {
         return createEndpointURI(Servlets.getAbsolutePath(getPath(servlet)));
     }
diff --git 
a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf 
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
index 8745d9d7fd..9efabaa036 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-main.conf
@@ -21,6 +21,7 @@ core.dump.dir=target/tmp/asterix_nc1/coredump
 iodevices=target/tmp/asterix_nc1/iodevice1
 iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
 nc.api.port=19004
+udf.api.ds.path=target/tmp/asterix_nc1/udf.sock
 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
 
 [nc/asterix_nc2]
@@ -29,6 +30,7 @@ txn.log.dir=target/tmp/asterix_nc2/txnlog
 core.dump.dir=target/tmp/asterix_nc2/coredump
 
iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
 nc.api.port=19005
+udf.api.ds.path=target/tmp/asterix_nc2/udf.sock
 #jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
 
 [nc]
@@ -68,7 +70,7 @@ storage.partitioning=static
 cloud.storage.scheme=s3
 cloud.storage.bucket=cloud-storage-container
 cloud.storage.region=us-west-2
-cloud.storage.endpoint=http://127.0.0.1:8001
+cloud.storage.endpoint=http://127.0.0.1:46677
 cloud.storage.anonymous.auth=true
 cloud.storage.cache.policy=selective
 cloud.max.write.requests.per.second=2000
diff --git a/asterixdb/asterix-doc/src/main/user-defined_function/udf.md 
b/asterixdb/asterix-doc/src/main/user-defined_function/udf.md
index 71e0fa2e44..9768c81cd5 100644
--- a/asterixdb/asterix-doc/src/main/user-defined_function/udf.md
+++ b/asterixdb/asterix-doc/src/main/user-defined_function/udf.md
@@ -52,7 +52,7 @@ Now,restart the cluster if it was already started to allow 
the Cluster Controlle
 ## <a name="installingUDF">Installing a Java UDF Library</a>
 
 To install a UDF package to the cluster, we need to send a Multipart Form-data 
HTTP request to the `/admin/udf` endpoint
-of the CC at the normal API port (`19004` by default). Any suitable tool will 
do, but for the example here I will use
+of the cluster through the special UDF domain socket. Any suitable tool will 
do, but for the example here I will use
 `curl` which is widely available.
 
 For example, to install a library with the following criteria:
@@ -65,7 +65,7 @@ For example, to install a library with the following criteria:
 
 we would execute
 
-    curl -v -u admin:admin -X POST -F 'data=@./lib.zip' -F 'type=java' 
localhost:19004/admin/udf/udfs/testlib
+    curl -v -u admin:admin --unix-socket udf.sock -F 'data=@./lib.zip' -F 
'type=java' localhost/admin/udf/udfs/testlib
 
 Any response other than `200` indicates an error in deployment.
 
diff --git 
a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
 
b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
index 025f607a30..9a9bd85ebe 100644
--- 
a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
+++ 
b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.test.podman;
 
 import java.io.IOException;
-import java.net.URI;
 
 import org.apache.asterix.app.external.IExternalUDFLibrarian;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -42,7 +41,7 @@ public class PodmanUDFLibrarian implements 
IExternalUDFLibrarian {
     }
 
     @Override
-    public void install(URI path, String type, String libPath, Pair<String, 
String> credentials) throws Exception {
+    public void install(String path, String type, String libPath, Pair<String, 
String> credentials) throws Exception {
         Container.ExecResult curlResult = null;
         int retryCt = 0;
         while (retryCt < 10) {
@@ -50,7 +49,7 @@ public class PodmanUDFLibrarian implements 
IExternalUDFLibrarian {
                 curlResult = asterix.execInContainer("curl", 
"--no-progress-meter", "-X", "POST", "-u",
                         credentials.first + ":" + credentials.second, "-F",
                         "data=@" + "/var/tmp/asterix-app/" + libPath, "-F", 
"type=" + type,
-                        "http://localhost:19004"; + path.getRawPath());
+                        "http://localhost:19004"; + path);
                 handleResponse(curlResult);
                 return;
             } catch (RuntimeException e) {
@@ -62,16 +61,21 @@ public class PodmanUDFLibrarian implements 
IExternalUDFLibrarian {
     }
 
     @Override
-    public void uninstall(URI path, Pair<String, String> credentials) throws 
IOException, AsterixException {
+    public void uninstall(String path, Pair<String, String> credentials) 
throws IOException, AsterixException {
         try {
             Container.ExecResult curlResult = asterix.execInContainer("curl", 
"-X", "DELETE", "-u",
-                    credentials.first + ":" + credentials.second, 
"http://localhost:19004"; + path.getPath());
+                    credentials.first + ":" + credentials.second, 
"http://localhost:19004"; + path);
             handleResponse(curlResult);
         } catch (InterruptedException e) {
             throw new IOException(e);
         }
     }
 
+    @Override
+    public SocketType getSocketType() {
+        return SocketType.LOOPBACK;
+    }
+
     private void handleResponse(Container.ExecResult result) throws 
AsterixException, JsonProcessingException {
         if (result.getExitCode() != 0) {
             throw new AsterixException(result.getStderr());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index f55dd59c79..c3571934c6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -27,6 +27,7 @@ import static 
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTE
 import static 
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_LONG_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.STRING_ARRAY;
+import static 
org.apache.hyracks.control.common.utils.ConfigurationUtil.JAVA_IO_TMPDIR;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -103,6 +104,11 @@ public class NCConfig extends ControllerConfig {
         PYTHON_ARGS(STRING_ARRAY, (String[]) null),
         PYTHON_ENV(STRING_ARRAY, (String[]) null),
         PYTHON_DS_PATH(STRING, (String) null),
+        UDF_API_DS_PATH(
+                STRING,
+                appConfig -> 
FileUtil.joinPath(System.getProperty(JAVA_IO_TMPDIR), "asterixdb_udf",
+                        appConfig.getString(NODE_ID) + "_udf.sock"),
+                "Defaults to temporary 
directory/asterixdb_udf/{$NODE_ID}_udf.sock. Path must be <100 characters"),
         LIBRARY_MAX_FILE_SIZE(POSITIVE_LONG_BYTE_UNIT, 250L * 1024 * 1024), 
//250MB
         LIBRARY_MAX_EXTRACTED_SIZE(POSITIVE_LONG_BYTE_UNIT, 1000L * 1024 * 
1024), //1GB
         LIBRARY_MAX_ARCHIVE_ENTRIES(INTEGER, 4096),
@@ -244,6 +250,8 @@ public class NCConfig extends ControllerConfig {
                     return "Number of threads per partition used to write and 
read from storage";
                 case IO_QUEUE_SIZE:
                     return "Length of the queue used for requests to write and 
read";
+                case UDF_API_DS_PATH:
+                    return "Path and name for domain socket used to deploy 
UDFs to the cluster";
                 case PYTHON_CMD:
                     return "Absolute path to python interpreter";
                 case PYTHON_ADDITIONAL_PACKAGES:
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
index 59c0ae05f2..faab5d30cd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java
@@ -114,5 +114,9 @@ public interface IServletRequest {
      */
     InetSocketAddress getLocalAddress();
 
+    String getHostPort();
+
+    String getRemotePort();
+
     Channel getChannel();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
index 4a72d539b4..0aaf8feaf9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/BaseRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.http.server;
 
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.util.NetworkUtil;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
@@ -114,6 +116,23 @@ public class BaseRequest implements IServletRequest {
         return (InetSocketAddress) channel.localAddress();
     }
 
+    @Override
+    public String getHostPort() {
+        return getPort(channel.localAddress());
+    }
+
+    @Override
+    public String getRemotePort() {
+        return getPort(channel.remoteAddress());
+    }
+
+    private String getPort(SocketAddress s) {
+        if (s instanceof InetSocketAddress) {
+            return NetworkUtil.toHostPort((InetSocketAddress) 
channel.localAddress());
+        }
+        return "0";
+    }
+
     @Override
     public Channel getChannel() {
         return channel;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
index 6476ab9e3d..16a29aa4af 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/CLFLogger.java
@@ -30,6 +30,8 @@ import org.apache.logging.log4j.Logger;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioDomainSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultHttpContent;
@@ -71,7 +73,11 @@ public class CLFLogger extends ChannelDuplexHandler {
         if (msg instanceof HttpRequest) {
             HttpRequest req = (HttpRequest) msg;
             try {
-                clientIp = ((NioSocketChannel) 
ctx.channel()).remoteAddress().getAddress().toString().substring(1);
+                if (ctx.channel() instanceof SocketChannel) {
+                    clientIp = ((NioSocketChannel) 
ctx.channel()).remoteAddress().getAddress().toString().substring(1);
+                } else if (ctx.channel() instanceof NioDomainSocketChannel) {
+                    clientIp = ctx.channel().remoteAddress().toString();
+                }
             } catch (Exception e) {
                 LOGGER.debug("ignoring {} obtaining client ip for {}", e, 
ctx.channel());
                 clientIp = "-";
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 1198945eb9..4cc8bc9ae7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -20,6 +20,10 @@ package org.apache.hyracks.http.server;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnixDomainSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -52,8 +56,11 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.ServerChannel;
 import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.socket.DuplexChannel;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerDomainSocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpScheme;
@@ -70,6 +77,7 @@ public class HttpServer {
             new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, 
HIGH_WRITE_BUFFER_WATER_MARK);
     protected static final int RECEIVE_BUFFER_SIZE = 4096;
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final Class<InetSocketAddress> INSA = 
InetSocketAddress.class;
     private static final int FAILED = -1;
     private static final int STOPPED = 0;
     private static final int STARTING = 1;
@@ -85,7 +93,8 @@ public class HttpServer {
     private final ServletRegistry servlets;
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
-    private final Set<InetSocketAddress> addresses;
+    private final Set<SocketAddress> addresses;
+    private final Set<SocketAddress> boundAddresses;
     private final ThreadPoolExecutor executor;
     // Mutable members
     private volatile int state = STOPPED;
@@ -93,6 +102,7 @@ public class HttpServer {
     private final List<Channel> channels;
     private Throwable cause;
     private HttpServerConfig config;
+    private Class<? extends ServerChannel> channelImpl = 
NioServerSocketChannel.class;
 
     private final GenericFutureListener<Future<Void>> channelCloseListener = f 
-> {
         // This listener is invoked from within a netty IO thread. Hence, we 
can never block it
@@ -118,14 +128,30 @@ public class HttpServer {
         this(bossGroup, workerGroup, Collections.singletonList(address), 
config, closeHandler);
     }
 
-    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, 
Collection<InetSocketAddress> addresses,
-            HttpServerConfig config, IChannelClosedHandler closeHandler) {
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, 
Path path, HttpServerConfig config) {
+        this(bossGroup, workerGroup, 
Collections.singletonList(UnixDomainSocketAddress.of(path)), config, null);
+    }
+
+    public static String getPortOrPath(SocketAddress addr) {
+        if (addr instanceof InetSocketAddress) {
+            return "port:" + ((InetSocketAddress) addr).getPort();
+        } else if (addr instanceof UnixDomainSocketAddress) {
+            return "path:" + ((UnixDomainSocketAddress) 
addr).getPath().toString();
+        } else {
+            return addr.toString();
+        }
+    }
+
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
+            Collection<? extends SocketAddress> addresses, HttpServerConfig 
config,
+            IChannelClosedHandler closeHandler) {
         if (addresses.isEmpty()) {
             throw new IllegalArgumentException("no addresses specified");
         }
         this.bossGroup = bossGroup;
         this.workerGroup = workerGroup;
         this.addresses = new LinkedHashSet<>(addresses);
+        this.boundAddresses = new LinkedHashSet<>(addresses);
         this.closedHandler = closeHandler;
         this.config = config;
         channels = new ArrayList<>();
@@ -133,13 +159,21 @@ public class HttpServer {
         servlets = new ServletRegistry();
         workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
         int numExecutorThreads = config.getThreadCount();
-        int[] ports = 
this.addresses.stream().mapToInt(InetSocketAddress::getPort).distinct().toArray();
+        int[] ports = 
addresses.stream().filter(INSA::isInstance).map(INSA::cast).mapToInt(InetSocketAddress::getPort)
+                .distinct().toArray();
         String desc;
         if (ports.length > 1) {
-            desc = this.addresses.stream().map(a -> 
a.getAddress().getHostAddress() + ":" + a.getPort())
+            desc = addresses.stream().filter(INSA::isInstance).map(INSA::cast)
+                    .map(a -> a.getAddress().getHostAddress() + ":" + 
a.getPort())
                     .collect(Collectors.joining(",", "[", "]"));
-        } else {
+        } else if (ports.length == 1) {
             desc = "port:" + ports[0];
+        } else {
+            List<String> paths = 
addresses.stream().filter(UnixDomainSocketAddress.class::isInstance)
+                    
.map(UnixDomainSocketAddress.class::cast).map(UnixDomainSocketAddress::getPath)
+                    .map(Object::toString).distinct().toList();
+            desc = "paths: " + " [ " + String.join(" ],[ ", paths) + " ]";
+            channelImpl = NioServerDomainSocketChannel.class;
         }
         executor = new ThreadPoolExecutor(numExecutorThreads, 
numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
                 runnable -> new Thread(runnable, "HttpExecutor(" + desc + ")-" 
+ threadId.getAndIncrement()));
@@ -278,18 +312,31 @@ public class HttpServer {
 
     private void bind() throws Exception {
         ServerBootstrap b = new ServerBootstrap();
-        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+        b.group(bossGroup, workerGroup).channel(channelImpl)
                 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new 
FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
                 .childOption(ChannelOption.AUTO_READ, Boolean.FALSE)
                 .childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
WRITE_BUFFER_WATER_MARK)
                 .handler(new 
LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer());
-        List<Pair<InetSocketAddress, ChannelFuture>> channelFutures = new 
ArrayList<>();
-        for (InetSocketAddress address : addresses) {
+        List<Pair<SocketAddress, ChannelFuture>> channelFutures = new 
ArrayList<>();
+        for (SocketAddress address : addresses) {
+            if(address instanceof UnixDomainSocketAddress udr){
+                Path udrPath = udr.getPath();
+                //if the socket can't be connected to, but:
+                // - the node exists
+                // - it is not a regular file
+                // - it is not a directory
+                // - and is 0 length
+                // then it should be safe to unlink
+                if(!sockAlive(udr) && Files.exists(udrPath) && 
!Files.isRegularFile(udrPath)  && !Files.isDirectory(udrPath) && 
udrPath.toFile().length() == 0){
+                    Files.deleteIfExists(udrPath);
+                }
+            }
             channelFutures.add(org.apache.commons.lang3.tuple.Pair.of(address, 
b.bind(address)));
+            boundAddresses.add(address);
         }
         Exception failure = null;
-        for (Pair<InetSocketAddress, ChannelFuture> addressFuture : 
channelFutures) {
+        for (Pair<SocketAddress, ChannelFuture> addressFuture : 
channelFutures) {
             try {
                 Channel channel = addressFuture.getRight().sync().channel();
                 channel.closeFuture().addListener(channelCloseListener);
@@ -384,6 +431,7 @@ public class HttpServer {
             LOGGER.log(Level.ERROR, "Error while shutting down http server 
executor", e);
         }
         closeChannels();
+        deleteDomainSockFiles();
     }
 
     public IServlet getServlet(FullHttpRequest request) {
@@ -394,7 +442,7 @@ public class HttpServer {
         return new HttpServerHandler<>(this, chunkSize);
     }
 
-    protected ChannelInitializer<SocketChannel> getChannelInitializer() {
+    protected ChannelInitializer<DuplexChannel> getChannelInitializer() {
         return new HttpServerInitializer(this);
     }
 
@@ -429,7 +477,7 @@ public class HttpServer {
     }
 
     @Deprecated // this returns an arbitrary (the first supplied if collection 
is ordered) address
-    public InetSocketAddress getAddress() {
+    public SocketAddress getAddress() {
         return addresses.iterator().next();
     }
 
@@ -443,4 +491,27 @@ public class HttpServer {
             channels.clear();
         }
     }
+
+    // Domain socket nodes exist outside the lifetime of the socket itself, so 
we should attempt to remove them on close
+    void deleteDomainSockFiles() {
+        for (UnixDomainSocketAddress address : 
boundAddresses.stream().filter(UnixDomainSocketAddress.class::isInstance)
+                .map(UnixDomainSocketAddress.class::cast).toList()) {
+            try {
+                Files.deleteIfExists(address.getPath());
+                LOGGER.info("Deleted domain socket file {}", 
address.getPath());
+            } catch (IOException e) {
+                LOGGER.warn("Failed to delete domain socket file {}", 
address.getPath(), e);
+            }
+        }
+    }
+
+    //the only real way to know if the file given to us in the config is an 
active socket is to attempt to connect to it
+    boolean sockAlive(UnixDomainSocketAddress addr) {
+        try (var chan = java.nio.channels.SocketChannel.open(addr)) {
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index ad8a61f4a0..32b1dbbd22 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -20,11 +20,11 @@ package org.apache.hyracks.http.server;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.DuplexChannel;
 import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;
 
-public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
+public class HttpServerInitializer extends ChannelInitializer<DuplexChannel> {
 
     private final HttpServer server;
     private final int maxRequestSize;
@@ -44,7 +44,7 @@ public class HttpServerInitializer extends 
ChannelInitializer<SocketChannel> {
     }
 
     @Override
-    public void initChannel(SocketChannel ch) {
+    public void initChannel(DuplexChannel ch) {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new HttpRequestCapacityController(server));
         p.addLast(new HttpRequestDecoder(maxRequestInitialLineLength, 
maxRequestHeaderSize, maxRequestChunkSize));


Reply via email to