This is an automated email from the ASF dual-hosted git repository. dlych pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 01302d57d0bd1b11737e8c84b9ad85188178744a Author: Ian Maxon <[email protected]> AuthorDate: Thu Feb 11 11:09:48 2021 -0800 [NO ISSUE][REP] Rebalance UDFs - user model changes: no - storage format changes: yes - interface changes: yes Details: - Add a servlet that lets the all the UDFs on the cluster be retrieved - Allow libraries to be copied to a new NC added if the cluster is in ACTIVE state - Store the hash of the uploaded library in the library descriptor - Also store the hash of the uploaded library in Metadata.Library - Send node servlet secrets as part of the registration request - Fix ExternalPythonFunctionIT - Add -Dno.shim to skip putting msgpack in asterix-app Change-Id: Ib6e6ce7debc9c2e07d24163542c1f98886792164 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9083 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> --- asterixdb/asterix-app/pom.xml | 6 +- .../api/http/server/AbstractNCUdfServlet.java | 153 +++++++++++++ .../asterix/api/http/server/BasicAuthServlet.java | 20 +- .../asterix/api/http/server/NCUdfApiServlet.java | 132 ++--------- .../api/http/server/NCUdfRecoveryServlet.java | 61 +++++ ...braryUtil.java => ExternalLibraryJobUtils.java} | 7 +- .../app/message/CreateLibraryRequestMessage.java | 8 +- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 2 +- .../asterix/app/nc/task/RetrieveLibrariesTask.java | 113 +++++++++ .../app/replication/NcLifecycleCoordinator.java | 65 +++++- .../message/RegistrationTasksRequestMessage.java | 27 ++- .../asterix/app/translator/QueryTranslator.java | 26 ++- .../asterix/hyracks/bootstrap/NCApplication.java | 27 ++- .../test/runtime/ExternalPythonFunctionIT.java | 23 +- .../asterix/common/library/ILibraryManager.java | 14 ++ .../asterix/common/library/LibraryDescriptor.java | 16 +- .../org/apache/asterix/common/utils/Servlets.java | 1 + .../external/library/ExternalLibraryManager.java | 252 ++++++++++++++++++++- .../LibraryDeployPrepareOperatorDescriptor.java | 176 ++------------ .../external/util/ExternalLibraryUtils.java | 39 ++++ .../common/statement/CreateLibraryStatement.java | 8 +- .../metadata/MetadataTransactionContext.java | 2 +- .../metadata/bootstrap/MetadataRecordTypes.java | 1 + .../apache/asterix/metadata/entities/Library.java | 8 +- .../LibraryTupleTranslator.java | 19 +- asterixdb/asterix-server/pom.xml | 114 ++++++++++ asterixdb/pom.xml | 23 +- .../apache/hyracks/http/server/utils/HttpUtil.java | 2 + 28 files changed, 1015 insertions(+), 330 deletions(-) diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index e309132..1f22924 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -170,7 +170,7 @@ <executions> <execution> <id>venv</id> - <phase>${pyro-shim.stage}</phase> + <phase>${prepare-env.stage}</phase> <goals> <goal>exec</goal> </goals> @@ -187,7 +187,7 @@ </execution> <execution> <id>shiv-install</id> - <phase>${pyro-shim.stage}</phase> + <phase>${prepare-env.stage}</phase> <goals> <goal>exec</goal> </goals> @@ -209,7 +209,7 @@ </execution> <execution> <id>shiv-msgpack-shim</id> - <phase>${pyro-shim.stage}</phase> + <phase>${shim.stage}</phase> <goals> <goal>exec</goal> </goals> diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java new file mode 100644 index 0000000..ced9b40 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA; +import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON; + +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.functions.ExternalFunctionLanguage; +import org.apache.asterix.common.library.LibraryDescriptor; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.IFormattedException; +import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.hyracks.http.server.utils.HttpUtil; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpScheme; + +public abstract class AbstractNCUdfServlet extends AbstractServlet { + + INcApplicationContext appCtx; + INCServiceContext srvCtx; + + protected final IApplicationContext plainAppCtx; + private final HttpScheme httpServerProtocol; + private final int httpServerPort; + + public AbstractNCUdfServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, + HttpScheme httpServerProtocol, int httpServerPort) { + + super(ctx, paths); + this.plainAppCtx = appCtx; + this.httpServerProtocol = httpServerProtocol; + this.httpServerPort = httpServerPort; + } + + void readFromFile(Path filePath, IServletResponse response, String contentType, OpenOption opt) throws Exception { + class InputStreamGetter extends SynchronizableWork { + private InputStream is; + + @Override + protected void doRun() throws Exception { + if (opt != null) { + is = Files.newInputStream(filePath, opt); + } else { + is = Files.newInputStream(filePath); + } + } + } + + InputStreamGetter r = new InputStreamGetter(); + ((NodeControllerService) srvCtx.getControllerService()).getWorkQueue().scheduleAndSync(r); + + if (r.is == null) { + response.setStatus(HttpResponseStatus.NOT_FOUND); + return; + } + try { + response.setStatus(HttpResponseStatus.OK); + HttpUtil.setContentType(response, contentType); + IOUtils.copyLarge(r.is, response.outputStream()); + } finally { + r.is.close(); + } + } + + URI createDownloadURI(Path file) throws Exception { + String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName(); + String host = getHyracksClientConnection().getHost(); + return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null); + } + + IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR + IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); + if (hcc == null) { + throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR); + } + return hcc; + } + + Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException { + String[] path = StringUtils.split(localPath(request), '/'); + int ln = path.length; + if (ln < 2) { + return null; + } + String libraryName = path[ln - 1]; + DataverseName dataverseName = DataverseName.create(Arrays.asList(path), 0, ln - 1); + return new Pair<>(dataverseName, libraryName); + } + + static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) { + switch (fileExtension) { + case LibraryDescriptor.FILE_EXT_ZIP: + return JAVA; + case LibraryDescriptor.FILE_EXT_PYZ: + return PYTHON; + default: + return null; + } + } + + HttpResponseStatus toHttpErrorStatus(Exception e) { + if (e instanceof IFormattedException) { + IFormattedException fe = (IFormattedException) e; + if (ErrorCode.ASTERIX.equals(fe.getComponent())) { + switch (fe.getErrorCode()) { + case ErrorCode.UNKNOWN_DATAVERSE: + case ErrorCode.UNKNOWN_LIBRARY: + return HttpResponseStatus.NOT_FOUND; + } + } + } + return HttpResponseStatus.INTERNAL_SERVER_ERROR; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java index f25d223..e062cdc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.http.api.IServlet; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; @@ -47,24 +48,31 @@ public class BasicAuthServlet implements IServlet { private Base64.Decoder b64Decoder; Map<String, String> storedCredentials; Map<String, String> ephemeralCredentials; - private String sysAuthHeader; private final IServlet delegate; private ConcurrentMap<String, Object> ctx; - public BasicAuthServlet(ConcurrentMap<String, Object> ctx, IServlet delegate) { + public BasicAuthServlet(ConcurrentMap<String, Object> ctx, IServlet delegate, Map<String, String> storedCredentials, + Map<String, String> ephemeralCredentials) { + this.ctx = ctx; this.delegate = delegate; b64Decoder = Base64.getDecoder(); - storedCredentials = (Map<String, String>) ctx.get(CREDENTIAL_MAP); - this.ctx = ctx; + this.storedCredentials = storedCredentials; + this.ephemeralCredentials = ephemeralCredentials; + } + + public static Pair<Map<String, String>, Map<String, String>> generateSysAuthHeader( + ConcurrentMap<String, Object> ctx) { + Map<String, String> storedCredentials = (Map<String, String>) ctx.get(CREDENTIAL_MAP); // generate internal user String sysUser; do { sysUser = generateRandomString(32); } while (storedCredentials.containsKey(sysUser)); String sysPassword = generateRandomString(128); - ephemeralCredentials = Collections.singletonMap(sysUser, hashPassword(sysPassword)); - sysAuthHeader = createAuthHeader(sysUser, sysPassword); + Map<String, String> ephemeralCredentials = Collections.singletonMap(sysUser, hashPassword(sysPassword)); + String sysAuthHeader = createAuthHeader(sysUser, sysPassword); ctx.put(SYS_AUTH_HEADER, sysAuthHeader); + return new Pair<>(storedCredentials, ephemeralCredentials); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java index 7af7fd4..fc08f49 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java @@ -18,19 +18,19 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER; -import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA; -import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.PrintWriter; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Arrays; +import java.security.DigestOutputStream; +import java.security.MessageDigest; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -43,33 +43,25 @@ import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.IReceptionist; import org.apache.asterix.common.api.IRequestReference; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.functions.ExternalFunctionLanguage; -import org.apache.asterix.common.library.LibraryDescriptor; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.external.util.ExternalLibraryUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.api.application.INCServiceContext; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.exceptions.IFormattedException; -import org.apache.hyracks.control.common.work.SynchronizableWork; -import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; -import org.apache.hyracks.http.server.AbstractServlet; import org.apache.hyracks.http.server.utils.HttpUtil; import org.apache.hyracks.util.file.FileUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import io.netty.buffer.ByteBufInputStream; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpScheme; @@ -77,14 +69,7 @@ import io.netty.handler.codec.http.multipart.FileUpload; import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; import io.netty.handler.codec.http.multipart.InterfaceHttpData; -public class NCUdfApiServlet extends AbstractServlet { - - INcApplicationContext appCtx; - INCServiceContext srvCtx; - - protected final IApplicationContext plainAppCtx; - private final HttpScheme httpServerProtocol; - private final int httpServerPort; +public class NCUdfApiServlet extends AbstractNCUdfServlet { protected final ILangCompilationProvider compilationProvider; protected final IReceptionist receptionist; @@ -96,13 +81,9 @@ public class NCUdfApiServlet extends AbstractServlet { public NCUdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, ILangCompilationProvider compilationProvider, HttpScheme httpServerProtocol, int httpServerPort) { - - super(ctx, paths); - this.plainAppCtx = appCtx; + super(ctx, paths, appCtx, httpServerProtocol, httpServerPort); this.compilationProvider = compilationProvider; this.receptionist = appCtx.getReceptionist(); - this.httpServerProtocol = httpServerProtocol; - this.httpServerPort = httpServerPort; } @Override @@ -137,12 +118,12 @@ public class NCUdfApiServlet extends AbstractServlet { } private void doCreate(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language, - URI downloadURI, boolean replaceIfExists, String sysAuthHeader, IRequestReference requestReference, - IServletRequest request, IServletResponse response) throws Exception { + String hash, URI downloadURI, boolean replaceIfExists, String sysAuthHeader, + IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception { INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker(); MessageFuture responseFuture = ncMb.registerMessageFuture(); CreateLibraryRequestMessage req = new CreateLibraryRequestMessage(srvCtx.getNodeId(), - responseFuture.getFutureId(), dataverseName, libraryName, language, downloadURI, replaceIfExists, + responseFuture.getFutureId(), dataverseName, libraryName, language, hash, downloadURI, replaceIfExists, sysAuthHeader, requestReference, additionalHttpHeadersFromRequest(request)); sendMessage(req, responseFuture, requestReference, request, response); } @@ -197,7 +178,7 @@ public class NCUdfApiServlet extends AbstractServlet { response.setStatus(HttpResponseStatus.BAD_REQUEST); return; } - readFromFile(filePath, response); + readFromFile(filePath, response, HttpUtil.ContentType.APPLICATION_OCTET_STREAM, null); } @Override @@ -209,6 +190,7 @@ public class NCUdfApiServlet extends AbstractServlet { return; } Path libraryTempFile = null; + FileOutputStream libTmpOut = null; HttpPostRequestDecoder requestDecoder = new HttpPostRequestDecoder(httpRequest); try { if (!requestDecoder.hasNext() || requestDecoder.getBodyHttpDatas().size() != 1) { @@ -234,9 +216,15 @@ public class NCUdfApiServlet extends AbstractServlet { LOGGER.debug("Created temporary file " + libraryTempFile + " for library " + libraryName.first + "." + libraryName.second); } - fileUpload.renameTo(libraryTempFile.toFile()); + MessageDigest digest = MessageDigest.getInstance("MD5"); + libTmpOut = new FileOutputStream(libraryTempFile.toFile()); + OutputStream outStream = new DigestOutputStream(libTmpOut, digest); + InputStream uploadInput = new ByteBufInputStream(((FileUpload) httpData).getByteBuf()); + IOUtils.copyLarge(uploadInput, outStream); + outStream.close(); URI downloadURI = createDownloadURI(libraryTempFile); - doCreate(libraryName.first, libraryName.second, language, downloadURI, true, sysAuthHeader, + doCreate(libraryName.first, libraryName.second, language, + ExternalLibraryUtils.digestToHexString(digest), downloadURI, true, sysAuthHeader, requestReference, request, response); response.setStatus(HttpResponseStatus.OK); } catch (Exception e) { @@ -250,6 +238,9 @@ public class NCUdfApiServlet extends AbstractServlet { requestDecoder.destroy(); if (libraryTempFile != null) { try { + if (libTmpOut != null) { + libTmpOut.close(); + } Files.deleteIfExists(libraryTempFile); } catch (IOException e) { LOGGER.warn("Could not delete temporary file " + libraryTempFile, e); @@ -258,20 +249,6 @@ public class NCUdfApiServlet extends AbstractServlet { } } - private URI createDownloadURI(Path file) throws Exception { - String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName(); - String host = getHyracksClientConnection().getHost(); - return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null); - } - - private IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR - IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); - if (hcc == null) { - throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR); - } - return hcc; - } - @Override protected void delete(IServletRequest request, IServletResponse response) { Pair<DataverseName, String> libraryName = parseLibraryName(request); @@ -292,65 +269,4 @@ public class NCUdfApiServlet extends AbstractServlet { } } - private void readFromFile(Path filePath, IServletResponse response) throws Exception { - class InputStreamGetter extends SynchronizableWork { - private InputStream is; - - @Override - protected void doRun() throws Exception { - is = Files.newInputStream(filePath); - } - } - - InputStreamGetter r = new InputStreamGetter(); - ((NodeControllerService) srvCtx.getControllerService()).getWorkQueue().scheduleAndSync(r); - - if (r.is == null) { - response.setStatus(HttpResponseStatus.NOT_FOUND); - return; - } - try { - response.setStatus(HttpResponseStatus.OK); - HttpUtil.setContentType(response, "application/octet-stream"); - IOUtils.copyLarge(r.is, response.outputStream()); - } finally { - r.is.close(); - } - } - - private Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException { - String[] path = StringUtils.split(localPath(request), '/'); - int ln = path.length; - if (ln < 2) { - return null; - } - String libraryName = path[ln - 1]; - DataverseName dataverseName = DataverseName.create(Arrays.asList(path), 0, ln - 1); - return new Pair<>(dataverseName, libraryName); - } - - private static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) { - switch (fileExtension) { - case LibraryDescriptor.FILE_EXT_ZIP: - return JAVA; - case LibraryDescriptor.FILE_EXT_PYZ: - return PYTHON; - default: - return null; - } - } - - private HttpResponseStatus toHttpErrorStatus(Exception e) { - if (e instanceof IFormattedException) { - IFormattedException fe = (IFormattedException) e; - if (ErrorCode.ASTERIX.equals(fe.getComponent())) { - switch (fe.getErrorCode()) { - case ErrorCode.UNKNOWN_DATAVERSE: - case ErrorCode.UNKNOWN_LIBRARY: - return HttpResponseStatus.NOT_FOUND; - } - } - } - return HttpResponseStatus.INTERNAL_SERVER_ERROR; - } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java new file mode 100644 index 0000000..2c29d14 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.external.library.ExternalLibraryManager; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.utils.HttpUtil; + +import io.netty.handler.codec.http.HttpScheme; + +public class NCUdfRecoveryServlet extends AbstractNCUdfServlet { + + ExternalLibraryManager libraryManager; + + public static final String GET_ALL_UDF_ENDPOINT = "/all"; + + public NCUdfRecoveryServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, + HttpScheme httpServerProtocol, int httpServerPort) { + super(ctx, paths, appCtx, httpServerProtocol, httpServerPort); + } + + @Override + public void init() { + appCtx = (INcApplicationContext) plainAppCtx; + srvCtx = this.appCtx.getServiceContext(); + this.libraryManager = (ExternalLibraryManager) appCtx.getLibraryManager(); + } + + @Override + protected void get(IServletRequest request, IServletResponse response) throws Exception { + String localPath = localPath(request); + if (localPath.equals(GET_ALL_UDF_ENDPOINT)) { + Path zippedLibs = libraryManager.zipAllLibs(); + readFromFile(zippedLibs, response, HttpUtil.ContentType.APPLICATION_ZIP, + StandardOpenOption.DELETE_ON_CLOSE); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java similarity index 98% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java index dd1b736..6c6691b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java @@ -45,9 +45,9 @@ import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -public class ExternalLibraryUtil { +public class ExternalLibraryJobUtils { - private ExternalLibraryUtil() { + private ExternalLibraryJobUtils() { } public static Triple<JobSpecification, JobSpecification, JobSpecification> buildCreateLibraryJobSpec( @@ -137,4 +137,5 @@ public class ExternalLibraryUtil { } return splits.toArray(new FileSplit[0]); } -} \ No newline at end of file + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java index 818a098..f23b6e4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java @@ -32,24 +32,26 @@ public final class CreateLibraryRequestMessage extends AbstractInternalRequestMe final DataverseName dataverseName; final String libraryName; final ExternalFunctionLanguage lang; + final String hash; final URI location; final boolean replaceIfExists; final String authToken; private static final long serialVersionUID = 1L; public CreateLibraryRequestMessage(String nodeRequestId, long requestMessageId, DataverseName dataverseName, - String libraryName, ExternalFunctionLanguage lang, URI location, boolean replaceIfExists, String authToken, - IRequestReference requestReference, Map<String, String> additionalParams) { + String libraryName, ExternalFunctionLanguage lang, String hash, URI location, boolean replaceIfExists, + String authToken, IRequestReference requestReference, Map<String, String> additionalParams) { super(nodeRequestId, requestMessageId, requestReference, additionalParams); this.dataverseName = dataverseName; this.libraryName = libraryName; this.lang = lang; + this.hash = hash; this.location = location; this.replaceIfExists = replaceIfExists; this.authToken = authToken; } protected Statement produceStatement() { - return new CreateLibraryStatement(dataverseName, libraryName, lang, location, replaceIfExists, authToken); + return new CreateLibraryStatement(dataverseName, libraryName, lang, hash, location, replaceIfExists, authToken); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 8ac13be..e058d39 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -259,7 +259,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { NodeControllerService ncs = (NodeControllerService) getServiceContext().getControllerService(); FileReference appDir = ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath()); - libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir); + libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir, ioManager); libraryManager.initialize(resetStorageData); /* diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RetrieveLibrariesTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RetrieveLibrariesTask.java new file mode 100644 index 0000000..18d303c --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RetrieveLibrariesTask.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import static org.apache.asterix.api.http.server.NCUdfRecoveryServlet.GET_ALL_UDF_ENDPOINT; +import static org.apache.asterix.common.utils.Servlets.UDF_RECOVERY; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.http.client.utils.URIBuilder; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.util.file.FileUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class RetrieveLibrariesTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LogManager.getLogger(); + private final List<Pair<URI, String>> nodes; + + public RetrieveLibrariesTask(List<Pair<URI, String>> nodes) { + this.nodes = nodes; + if (nodes.size() <= 0) { + throw new IllegalArgumentException("No nodes specified to retrieve from"); + } + } + + @Override + public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { + INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); + boolean success = false; + for (Pair<URI, String> referenceNode : nodes) { + try { + LOGGER.info("Retrieving UDFs from " + referenceNode.getFirst().getHost()); + retrieveLibrary(referenceNode.getFirst(), referenceNode.getSecond(), appContext); + success = true; + break; + } catch (HyracksDataException e) { + LOGGER.error("Unable to retrieve UDFs from: " + referenceNode.getFirst() + ", trying another node.", e); + } + } + if (!success) { + LOGGER.error("Unable to retrieve UDFs from any participant node"); + throw HyracksDataException.create(ErrorCode.TIMEOUT); + } + } + + private void retrieveLibrary(URI baseURI, String authToken, INcApplicationContext appContext) + throws HyracksDataException { + ILibraryManager libraryManager = appContext.getLibraryManager(); + FileReference distributionDir = appContext.getLibraryManager().getDistributionDir(); + URI libraryURI = getNCUdfRetrievalURL(baseURI); + try { + FileUtil.forceMkdirs(distributionDir.getFile()); + Path targetFile = Files.createTempFile(Paths.get(distributionDir.getAbsolutePath()), "all_", ".zip"); + FileReference targetFileRef = distributionDir.getChild(targetFile.getFileName().toString()); + libraryManager.download(targetFileRef, authToken, libraryURI); + Path outputDirPath = libraryManager.getStorageDir().getFile().toPath().toAbsolutePath().normalize(); + FileReference outPath = appContext.getIoManager().resolveAbsolutePath(outputDirPath.toString()); + libraryManager.unzip(targetFileRef, outPath); + } catch (IOException e) { + LOGGER.error("Unable to retrieve UDFs from " + libraryURI.toString() + " before timeout"); + throw HyracksDataException.create(e); + } + } + + public URI getNCUdfRetrievalURL(URI baseURL) { + String endpoint = UDF_RECOVERY.substring(0, UDF_RECOVERY.length() - 1) + GET_ALL_UDF_ENDPOINT; + URIBuilder builder = new URIBuilder(baseURL).setPath(endpoint); + try { + return builder.build(); + } catch (URISyntaxException e) { + LOGGER.error("Could not find URL for NC recovery", e); + } + return null; + } + + @Override + public String toString() { + return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }"; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java index d22e9fc..c4e4f82 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -18,14 +18,20 @@ */ package org.apache.asterix.app.replication; +import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER; +import static org.apache.asterix.common.config.ExternalProperties.Option.NC_API_PORT; import static org.apache.hyracks.api.exceptions.ErrorCode.NODE_FAILED; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -34,6 +40,7 @@ import org.apache.asterix.app.nc.task.CheckpointTask; import org.apache.asterix.app.nc.task.ExportMetadataNodeTask; import org.apache.asterix.app.nc.task.LocalRecoveryTask; import org.apache.asterix.app.nc.task.MetadataBootstrapTask; +import org.apache.asterix.app.nc.task.RetrieveLibrariesTask; import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; import org.apache.asterix.app.nc.task.StartReplicationServiceTask; import org.apache.asterix.app.nc.task.UpdateNodeStatusTask; @@ -42,6 +49,7 @@ import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage; import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; @@ -53,14 +61,20 @@ import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.replication.messaging.ReplicaFailedMessage; +import org.apache.http.client.utils.URIBuilder; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.NodeStatus; +import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.control.IGatekeeper; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import io.netty.handler.codec.http.HttpScheme; + public class NcLifecycleCoordinator implements INcLifecycleCoordinator { private static final Logger LOGGER = LogManager.getLogger(); @@ -70,12 +84,14 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { protected final ICCMessageBroker messageBroker; private final boolean replicationEnabled; private final IGatekeeper gatekeeper; + Map<String, Map<String, Object>> nodeSecretsMap; public NcLifecycleCoordinator(ICCServiceContext serviceCtx, boolean replicationEnabled) { this.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker(); this.replicationEnabled = replicationEnabled; this.gatekeeper = ((ClusterControllerService) serviceCtx.getControllerService()).getApplication().getGatekeeper(); + this.nodeSecretsMap = new HashMap<>(); } @Override @@ -121,6 +137,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { final String nodeId = msg.getNodeId(); + nodeSecretsMap.put(nodeId, msg.getSecrets()); List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState()); RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); try { @@ -193,12 +210,12 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } } - protected List<INCLifecycleTask> buildIdleNcRegTasks(String nodeId, boolean metadataNode, SystemState state) { + protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) { final List<INCLifecycleTask> tasks = new ArrayList<>(); tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING)); if (state == SystemState.CORRUPTED) { // need to perform local recovery for node partitions - LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(nodeId)) + LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId)) .map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); tasks.add(rt); } @@ -210,6 +227,16 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } tasks.add(new CheckpointTask()); tasks.add(new StartLifecycleComponentsTask()); + if (isLibraryFetchEnabled() && clusterManager.getState() == ClusterState.ACTIVE) { + Set<String> nodes = clusterManager.getParticipantNodes(true); + if (nodes.size() > 0) { + try { + tasks.add(nodesToLibraryTask(newNodeId, nodes)); + } catch (HyracksDataException e) { + LOGGER.error("Could not construct library recovery task", e); + } + } + } if (metadataNode) { tasks.add(new ExportMetadataNodeTask(true)); tasks.add(new BindMetadataNodeTask()); @@ -227,6 +254,23 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } } + protected String getNCAuthToken(String node) { + return (String) nodeSecretsMap.get(node).get(SYS_AUTH_HEADER); + } + + protected URI constructNCRecoveryUri(String nodeId) throws HyracksDataException { + Map<IOption, Object> nodeConfig = clusterManager.getNcConfiguration().get(nodeId); + String host = (String) nodeConfig.get(NCConfig.Option.PUBLIC_ADDRESS); + int port = (Integer) nodeConfig.get(NC_API_PORT); + URIBuilder builder = new URIBuilder().setScheme(HttpScheme.HTTP.toString()).setHost(host).setPort(port); + try { + return builder.build(); + } catch (URISyntaxException e) { + LOGGER.error("Could not find URL for NC recovery", e); + throw HyracksDataException.create(e); + } + } + private void requestMetadataNodeTakeover(String node) throws HyracksDataException { MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(true, clusterManager.getMetadataPartition().getPartitionId()); @@ -237,6 +281,23 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } } + protected RetrieveLibrariesTask nodesToLibraryTask(String newNodeId, Set<String> referenceNodes) + throws HyracksDataException { + List<Pair<URI, String>> referenceNodeLocAndAuth = new ArrayList<>(); + for (String node : referenceNodes) { + referenceNodeLocAndAuth.add(new Pair<>(constructNCRecoveryUri(node), getNCAuthToken(node))); + } + return getRetrieveLibrariesTask(referenceNodeLocAndAuth); + } + + protected RetrieveLibrariesTask getRetrieveLibrariesTask(List<Pair<URI, String>> referenceNodeLocAndAuth) { + return new RetrieveLibrariesTask(referenceNodeLocAndAuth); + } + + protected boolean isLibraryFetchEnabled() { + return true; + } + private void notifyFailedReplica(IClusterStateManager clusterManager, String nodeID, InetSocketAddress replicaAddress) { LOGGER.info("notify replica failure of nodeId {} at {}", nodeID, replicaAddress); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java index 3c7182d..c2cc63c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.app.replication.message; +import java.util.HashMap; +import java.util.Map; + import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; @@ -34,22 +37,25 @@ import org.apache.logging.log4j.Logger; public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage { private static final Logger LOGGER = LogManager.getLogger(); - private static final long serialVersionUID = 1L; - private final SystemState state; - private final String nodeId; - private final NodeStatus nodeStatus; + private static final long serialVersionUID = 2L; + protected final SystemState state; + protected final String nodeId; + protected final NodeStatus nodeStatus; + protected final Map<String, Object> secrets; - public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state) { + public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state, + Map<String, Object> secretsEphemeral) { this.state = state; this.nodeId = nodeId; this.nodeStatus = nodeStatus; + this.secrets = new HashMap<>(secretsEphemeral); } - public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState) - throws HyracksDataException { + public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState, + Map<String, Object> secretsEphemeral) throws HyracksDataException { try { RegistrationTasksRequestMessage msg = - new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState); + new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral); ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg); } catch (Exception e) { LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e); @@ -79,4 +85,7 @@ public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICc return MessageType.REGISTRATION_TASKS_REQUEST; } -} \ No newline at end of file + public Map<String, Object> getSecrets() { + return secrets; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 40a3c10..d62c151 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -49,7 +49,7 @@ import org.apache.asterix.api.http.server.ApiServlet; import org.apache.asterix.app.active.ActiveEntityEventsListener; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.active.FeedEventsListener; -import org.apache.asterix.app.external.ExternalLibraryUtil; +import org.apache.asterix.app.external.ExternalLibraryJobUtils; import org.apache.asterix.app.result.ExecutionError; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; @@ -1539,7 +1539,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. prepare jobs which will drop corresponding libraries. List<Library> libraries = MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx, dataverseName); for (Library library : libraries) { - jobsToExecute.add(ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName, library.getName(), + jobsToExecute.add(ExternalLibraryJobUtils.buildDropLibraryJobSpec(dataverseName, library.getName(), metadataProvider)); } @@ -2430,16 +2430,17 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen CreateLibraryStatement cls = (CreateLibraryStatement) stmt; DataverseName dataverseName = getActiveDataverseName(cls.getDataverseName()); String libraryName = cls.getLibraryName(); + String libraryHash = cls.getHash(); lockUtil.createLibraryBegin(lockManager, metadataProvider.getLocks(), dataverseName, libraryName); try { - doCreateLibrary(metadataProvider, dataverseName, libraryName, cls, hcc); + doCreateLibrary(metadataProvider, dataverseName, libraryName, libraryHash, cls, hcc); } finally { metadataProvider.getLocks().unlock(); } } private void doCreateLibrary(MetadataProvider metadataProvider, DataverseName dataverseName, String libraryName, - CreateLibraryStatement cls, IHyracksClientConnection hcc) throws Exception { + String libraryHash, CreateLibraryStatement cls, IHyracksClientConnection hcc) throws Exception { JobUtils.ProgressState progress = ProgressState.NO_PROGRESS; boolean prepareJobSuccessful = false; JobSpecification abortJobSpec = null; @@ -2461,7 +2462,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. add/update library with PendingAddOp Library libraryPendingAdd = - new Library(dataverseName, libraryName, language.name(), MetadataUtil.PENDING_ADD_OP); + new Library(dataverseName, libraryName, language.name(), libraryHash, MetadataUtil.PENDING_ADD_OP); if (existingLibrary == null) { MetadataManager.INSTANCE.addLibrary(mdTxnCtx, libraryPendingAdd); } else { @@ -2470,7 +2471,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. prepare to create library artifacts in NC. Triple<JobSpecification, JobSpecification, JobSpecification> jobSpecs = - ExternalLibraryUtil.buildCreateLibraryJobSpec(dataverseName, libraryName, language, + ExternalLibraryJobUtils.buildCreateLibraryJobSpec(dataverseName, libraryName, language, cls.getLocation(), cls.getAuthToken(), metadataProvider); JobSpecification prepareJobSpec = jobSpecs.first; JobSpecification commitJobSpec = jobSpecs.second; @@ -2490,7 +2491,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); - Library newLibrary = new Library(dataverseName, libraryName, language.name(), MetadataUtil.PENDING_NO_OP); + Library newLibrary = + new Library(dataverseName, libraryName, language.name(), libraryHash, MetadataUtil.PENDING_NO_OP); MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, newLibrary); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -2511,8 +2513,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } else if (existingLibrary == null) { // 'commit' job failed for a new library -> try removing the library try { - JobSpecification dropLibraryJobSpec = ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName, - libraryName, metadataProvider); + JobSpecification dropLibraryJobSpec = ExternalLibraryJobUtils + .buildDropLibraryJobSpec(dataverseName, libraryName, metadataProvider); runJob(hcc, dropLibraryJobSpec, jobFlags); } catch (Exception e2) { e.addSuppressed(e2); @@ -2592,12 +2594,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. mark the existing library as PendingDropOp // do drop instead of update because drop will fail if the library is used by functions/adapters MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName); - MetadataManager.INSTANCE.addLibrary(mdTxnCtx, - new Library(dataverseName, libraryName, library.getLanguage(), MetadataUtil.PENDING_DROP_OP)); + MetadataManager.INSTANCE.addLibrary(mdTxnCtx, new Library(dataverseName, libraryName, library.getLanguage(), + library.getHash(), MetadataUtil.PENDING_DROP_OP)); // #. drop library artifacts in NCs. JobSpecification jobSpec = - ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName, libraryName, metadataProvider); + ExternalLibraryJobUtils.buildDropLibraryJobSpec(dataverseName, libraryName, metadataProvider); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index fd5ecb5..b0cb4de 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -19,16 +19,19 @@ package org.apache.asterix.hyracks.bootstrap; import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER; import static org.apache.asterix.common.utils.Servlets.QUERY_RESULT; import static org.apache.asterix.common.utils.Servlets.QUERY_SERVICE; import static org.apache.asterix.common.utils.Servlets.QUERY_STATUS; import static org.apache.asterix.common.utils.Servlets.UDF; +import static org.apache.asterix.common.utils.Servlets.UDF_RECOVERY; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,6 +41,7 @@ import org.apache.asterix.algebra.base.ILangExtension; import org.apache.asterix.api.http.server.BasicAuthServlet; import org.apache.asterix.api.http.server.NCQueryServiceServlet; import org.apache.asterix.api.http.server.NCUdfApiServlet; +import org.apache.asterix.api.http.server.NCUdfRecoveryServlet; import org.apache.asterix.api.http.server.NetDiagnosticsApiServlet; import org.apache.asterix.api.http.server.QueryResultApiServlet; import org.apache.asterix.api.http.server.QueryStatusApiServlet; @@ -83,6 +87,7 @@ import org.apache.asterix.utils.RedactionUtil; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.config.IConfigManager; @@ -106,11 +111,12 @@ import org.apache.logging.log4j.Logger; public class NCApplication extends BaseNCApplication { private static final Logger LOGGER = LogManager.getLogger(); protected NCExtensionManager ncExtensionManager; - private INcApplicationContext runtimeContext; + protected INcApplicationContext runtimeContext; private String nodeId; private boolean stopInitiated; - private boolean startupCompleted; + protected boolean startupCompleted; protected WebManager webManager; + private HttpServer apiServer; @Override public void registerConfig(IConfigManager configManager) { @@ -203,8 +209,8 @@ public class NCApplication extends BaseNCApplication { final ExternalProperties externalProperties = getApplicationContext().getExternalProperties(); final HttpServerConfig config = HttpServerConfigBuilder.custom().setMaxRequestSize(externalProperties.getMaxWebRequestSize()).build(); - HttpServer apiServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), - externalProperties.getNcApiPort(), config); + apiServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getNcApiPort(), + config); apiServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ncServiceCtx); apiServer.setAttribute(HYRACKS_CONNECTION_ATTR, getApplicationContext().getHcc()); apiServer.addServlet(new StorageApiServlet(apiServer.ctx(), getApplicationContext(), Servlets.STORAGE)); @@ -217,9 +223,15 @@ public class NCApplication extends BaseNCApplication { apiServer.setAttribute(ServletConstants.CREDENTIAL_MAP, parseCredentialMap(((NodeControllerService) ncServiceCtx.getControllerService()).getConfiguration() .getCredentialFilePath())); + Pair<Map<String, String>, Map<String, String>> auth = BasicAuthServlet.generateSysAuthHeader(apiServer.ctx()); apiServer.addServlet(new BasicAuthServlet(apiServer.ctx(), new NCUdfApiServlet(apiServer.ctx(), new String[] { UDF }, getApplicationContext(), - sqlppCompilationProvider, apiServer.getScheme(), apiServer.getAddress().getPort()))); + sqlppCompilationProvider, apiServer.getScheme(), apiServer.getAddress().getPort()), + auth.getFirst(), auth.getSecond())); + apiServer.addServlet(new BasicAuthServlet( + apiServer.ctx(), new NCUdfRecoveryServlet(apiServer.ctx(), new String[] { UDF_RECOVERY }, + getApplicationContext(), apiServer.getScheme(), apiServer.getAddress().getPort()), + auth.getFirst(), auth.getSecond())); apiServer.addServlet(new QueryStatusApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_STATUS)); apiServer.addServlet(new QueryResultApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_RESULT)); webManager.add(apiServer); @@ -283,8 +295,11 @@ public class NCApplication extends BaseNCApplication { final NodeStatus currentStatus = ncs.getNodeStatus(); final SystemState systemState = isPendingStartupTasks(currentStatus, ncs.getPrimaryCcId(), ccId) ? getCurrentSystemState() : SystemState.HEALTHY; + final Map httpSecrets = + apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER)) + : Collections.emptyMap(); RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(), - currentStatus, systemState); + currentStatus, systemState, httpSecrets); } @Override diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExternalPythonFunctionIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExternalPythonFunctionIT.java index ac2bc6b..d72d494 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExternalPythonFunctionIT.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExternalPythonFunctionIT.java @@ -20,10 +20,16 @@ package org.apache.asterix.test.runtime; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.hyracks.control.nc.NodeControllerService; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -40,7 +46,9 @@ public class ExternalPythonFunctionIT { @BeforeClass public static void setUp() throws Exception { - LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor()); + final TestExecutor testExecutor = new TestExecutor(); + LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor); + setNcEndpoints(testExecutor); } @AfterClass @@ -63,4 +71,17 @@ public class ExternalPythonFunctionIT { public void test() throws Exception { LangExecutionUtil.test(tcCtx); } + + private static void setNcEndpoints(TestExecutor testExecutor) { + final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs; + final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>(); + final String ip = InetAddress.getLoopbackAddress().getHostAddress(); + for (NodeControllerService nc : ncs) { + final String nodeId = nc.getId(); + final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext(); + int apiPort = appCtx.getExternalProperties().getNcApiPort(); + ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort)); + } + testExecutor.setNcEndPoints(ncEndPoints); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java index 6d1c059..ef390f4 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java @@ -19,9 +19,15 @@ package org.apache.asterix.common.library; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.security.MessageDigest; + import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.external.ipc.ExternalFunctionResultRouter; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.ipc.impl.IPCSystem; @@ -33,6 +39,8 @@ public interface ILibraryManager { // deployment helpers + FileReference getStorageDir(); + FileReference getLibraryDir(DataverseName dataverseName, String libraryName) throws HyracksDataException; FileReference getDistributionDir(); @@ -44,4 +52,10 @@ public interface ILibraryManager { ExternalFunctionResultRouter getRouter(); IPCSystem getIPCI(); + + MessageDigest download(FileReference targetFile, String authToken, URI libLocation) throws HyracksException; + + void unzip(FileReference sourceFile, FileReference outputDir) throws IOException; + + void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuf) throws IOException; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java index 72bde09..6f128dc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java @@ -31,9 +31,10 @@ import com.fasterxml.jackson.databind.node.ObjectNode; */ public class LibraryDescriptor implements IJsonSerializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private static final String FIELD_LANGUAGE = "lang"; + private static final String FIELD_HASH = "hash"; public static final String FILE_EXT_ZIP = "zip"; @@ -44,23 +45,32 @@ public class LibraryDescriptor implements IJsonSerializable { */ private final ExternalFunctionLanguage lang; - public LibraryDescriptor(ExternalFunctionLanguage language) { + private final String hash; + + public LibraryDescriptor(ExternalFunctionLanguage language, String hash) { this.lang = language; + this.hash = hash; } public ExternalFunctionLanguage getLanguage() { return lang; } + public String getHash() { + return hash; + } + public JsonNode toJson(IPersistedResourceRegistry registry) { ObjectNode jsonNode = registry.getClassIdentifier(LibraryDescriptor.class, serialVersionUID); jsonNode.put(FIELD_LANGUAGE, lang.name()); + jsonNode.put(FIELD_HASH, hash); return jsonNode; } public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) { String langText = json.get(FIELD_LANGUAGE).asText(); ExternalFunctionLanguage lang = ExternalFunctionLanguage.valueOf(langText); - return new LibraryDescriptor(lang); + String hash = json.get(FIELD_HASH).asText(); + return new LibraryDescriptor(lang, hash); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java index 69e9267..5edc186 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java @@ -36,6 +36,7 @@ public class Servlets { public static final String STORAGE = "/admin/storage/*"; public static final String NET_DIAGNOSTICS = "/admin/net/*"; public static final String UDF = "/admin/udf/*"; + public static final String UDF_RECOVERY = "/admin/libraryrecovery/*"; private Servlets() { } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java index 2ae8612..c5b9b53 100755 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java @@ -21,15 +21,35 @@ package org.apache.asterix.external.library; import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY; import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.BasicFileAttributes; +import java.security.DigestOutputStream; +import java.security.KeyStore; +import java.security.MessageDigest; +import java.util.Enumeration; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + +import javax.net.ssl.SSLContext; import org.apache.asterix.common.functions.ExternalFunctionLanguage; import org.apache.asterix.common.library.ILibrary; @@ -37,12 +57,30 @@ import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.library.LibraryDescriptor; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.external.ipc.ExternalFunctionResultRouter; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; +import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.ssl.SSLContexts; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IFileHandle; +import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.io.IPersistedResourceRegistry; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; +import org.apache.hyracks.api.network.INetworkSecurityConfig; +import org.apache.hyracks.api.network.INetworkSecurityManager; import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.NodeControllerService; @@ -77,6 +115,8 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle public static final String DISTRIBUTION_DIR = "dist"; + private static final int DOWNLOAD_RETRY_COUNT = 10; + private static final Logger LOGGER = LogManager.getLogger(ExternalLibraryManager.class); private final NodeControllerService ncs; @@ -91,8 +131,11 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle private final Map<Pair<DataverseName, String>, ILibrary> libraries = new HashMap<>(); private IPCSystem pythonIPC; private final ExternalFunctionResultRouter router; + private final IIOManager ioManager; + private boolean sslEnabled; - public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir) { + public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir, + IIOManager ioManager) { this.ncs = ncs; this.reg = reg; baseDir = appDir.getChild(LIBRARY_MANAGER_BASE_DIR_NAME); @@ -103,6 +146,8 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle trashDirPath = trashDir.getFile().toPath().normalize(); objectMapper = createObjectMapper(); router = new ExternalFunctionResultRouter(); + this.sslEnabled = ncs.getConfiguration().isSslEnabled(); + this.ioManager = ioManager; } public void initialize(boolean resetStorageData) throws HyracksDataException { @@ -170,6 +215,11 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle } } + @Override + public FileReference getStorageDir() { + return storageDir; + } + private FileReference getDataverseDir(DataverseName dataverseName) throws HyracksDataException { return getChildFileRef(storageDir, dataverseName.getCanonicalForm()); } @@ -208,10 +258,7 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle throw new HyracksDataException("Cannot find library: " + dataverseName + '.' + libraryName); } try { - FileReference descFile = libRevDir.getChild(DESCRIPTOR_FILE_NAME); - byte[] descData = Files.readAllBytes(descFile.getFile().toPath()); - LibraryDescriptor desc = deserializeLibraryDescriptor(descData); - ExternalFunctionLanguage libLang = desc.getLanguage(); + ExternalFunctionLanguage libLang = getLibraryDescriptor(libRevDir).getLanguage(); switch (libLang) { case JAVA: return new JavaLibrary(libContentsDir.getFile()); @@ -240,6 +287,13 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle return (LibraryDescriptor) reg.deserialize(jsonNode); } + private LibraryDescriptor getLibraryDescriptor(FileReference revDir) throws IOException { + FileReference descFile = revDir.getChild(DESCRIPTOR_FILE_NAME); + byte[] descData = Files.readAllBytes(descFile.getFile().toPath()); + return deserializeLibraryDescriptor(descData); + + } + private FileReference findLibraryRevDir(DataverseName dataverseName, String libraryName) throws HyracksDataException { FileReference libraryBaseDir = getLibraryDir(dataverseName, libraryName); @@ -277,6 +331,42 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle return new Pair<>(dataverseName, libraryName); } + public Path zipAllLibs() throws IOException { + byte[] copyBuf = new byte[4096]; + Path outDir = Paths.get(baseDir.getAbsolutePath(), DISTRIBUTION_DIR); + FileUtil.forceMkdirs(outDir.toFile()); + Path outZip = Files.createTempFile(outDir, "all_", ".zip"); + try (FileOutputStream out = new FileOutputStream(outZip.toFile()); + ZipArchiveOutputStream zipOut = new ZipArchiveOutputStream(out)) { + Files.walkFileTree(storageDirPath, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path currPath, BasicFileAttributes attrs) throws IOException { + ZipArchiveEntry e = + new ZipArchiveEntry(currPath.toFile(), storageDirPath.relativize(currPath).toString()); + zipOut.putArchiveEntry(e); + try (FileInputStream fileRead = new FileInputStream(currPath.toFile())) { + IOUtils.copyLarge(fileRead, zipOut, copyBuf); + zipOut.closeArchiveEntry(); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult preVisitDirectory(Path currPath, BasicFileAttributes attrs) throws IOException { + if (currPath.equals(storageDirPath)) { + return FileVisitResult.CONTINUE; + } + ZipArchiveEntry e = + new ZipArchiveEntry(currPath.toFile(), storageDirPath.relativize(currPath).toString()); + zipOut.putArchiveEntry(e); + return FileVisitResult.CONTINUE; + } + }); + zipOut.finish(); + } + return outZip; + } + @Override public void dropLibraryPath(FileReference fileRef) throws HyracksDataException { // does not flush any directories @@ -338,4 +428,156 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle } } } + + @Override + public MessageDigest download(FileReference targetFile, String authToken, URI libLocation) throws HyracksException { + try { + targetFile.getFile().createNewFile(); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + IFileHandle fHandle = ioManager.open(targetFile, IIOManager.FileReadWriteMode.READ_WRITE, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + + MessageDigest digest = DigestUtils.getDigest("MD5"); + try { + CloseableHttpClient httpClient = newClient(); + try { + // retry 10 times at maximum for downloading binaries + HttpGet request = new HttpGet(libLocation); + request.setHeader(HttpHeaders.AUTHORIZATION, authToken); + int tried = 0; + Exception trace = null; + while (tried < DOWNLOAD_RETRY_COUNT) { + tried++; + CloseableHttpResponse response = null; + try { + response = httpClient.execute(request); + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + throw new IOException("Http Error: " + response.getStatusLine().getStatusCode()); + } + HttpEntity e = response.getEntity(); + if (e == null) { + throw new IOException("No response"); + } + WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle); + OutputStream outStream = new DigestOutputStream(Channels.newOutputStream(outChannel), digest); + e.writeTo(outStream); + outStream.flush(); + ioManager.sync(fHandle, true); + return digest; + } catch (IOException e) { + LOGGER.error("Unable to download library", e); + trace = e; + try { + ioManager.truncate(fHandle, 0); + digest.reset(); + } catch (IOException e2) { + throw HyracksDataException.create(e2); + } + } finally { + if (response != null) { + try { + response.close(); + } catch (IOException e) { + LOGGER.warn("Failed to close", e); + } + } + } + } + + throw HyracksDataException.create(trace); + } finally { + try { + httpClient.close(); + } catch (IOException e) { + LOGGER.warn("Failed to close", e); + } + } + } finally { + try { + ioManager.close(fHandle); + } catch (HyracksDataException e) { + LOGGER.warn("Failed to close", e); + } + } + } + + @Override + public void unzip(FileReference sourceFile, FileReference outputDir) throws IOException { + boolean logTraceEnabled = LOGGER.isTraceEnabled(); + Set<Path> newDirs = new HashSet<>(); + Path outputDirPath = outputDir.getFile().toPath().toAbsolutePath().normalize(); + try (ZipFile zipFile = new ZipFile(sourceFile.getFile())) { + Enumeration<? extends ZipEntry> entries = zipFile.entries(); + byte[] writeBuf = new byte[4096]; + while (entries.hasMoreElements()) { + ZipEntry entry = entries.nextElement(); + if (entry.isDirectory()) { + continue; + } + Path entryOutputPath = outputDirPath.resolve(entry.getName()).toAbsolutePath().normalize(); + if (!entryOutputPath.startsWith(outputDirPath)) { + throw new IOException("Malformed ZIP archive: " + entry.getName()); + } + Path entryOutputDir = entryOutputPath.getParent(); + Files.createDirectories(entryOutputDir); + // remember new directories so we can flush them later + for (Path p = entryOutputDir; !p.equals(outputDirPath); p = p.getParent()) { + newDirs.add(p); + } + try (InputStream in = zipFile.getInputStream(entry)) { + FileReference entryOutputFileRef = ioManager.resolveAbsolutePath(entryOutputPath.toString()); + if (logTraceEnabled) { + LOGGER.trace("Extracting file {}", entryOutputFileRef); + } + writeAndForce(entryOutputFileRef, in, writeBuf); + } + } + } + for (Path newDir : newDirs) { + IoUtil.flushDirectory(newDir); + } + } + + @Override + public void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer) throws IOException { + outputFile.getFile().createNewFile(); + IFileHandle fHandle = ioManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + try { + WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle); + OutputStream outputStream = Channels.newOutputStream(outChannel); + IOUtils.copyLarge(dataStream, outputStream, copyBuffer); + outputStream.flush(); + ioManager.sync(fHandle, true); + } finally { + ioManager.close(fHandle); + } + } + + //TODO: this should probably be static so it could be reused somewhere else, or made such that the trust store is not + // reloaded from disk on every client intialization? + private CloseableHttpClient newClient() { + if (sslEnabled) { + try { + final INetworkSecurityManager networkSecurityManager = ncs.getNetworkSecurityManager(); + final INetworkSecurityConfig configuration = networkSecurityManager.getConfiguration(); + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + try (FileInputStream trustStoreFile = new FileInputStream(configuration.getTrustStoreFile())) { + trustStore.load(trustStoreFile, configuration.getKeyStorePassword().toCharArray()); + } + SSLContext sslcontext = SSLContexts.custom().loadTrustMaterial(trustStore, null).build(); + SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslcontext, + new String[] { "TLSv1.2" }, null, SSLConnectionSocketFactory.getDefaultHostnameVerifier()); + return HttpClients.custom().setSSLSocketFactory(sslsf).build(); + + } catch (Exception e) { + throw new IllegalStateException(e); + } + } else { + return HttpClients.createDefault(); + } + } + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java index ddf3d66..4f91b1a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java @@ -26,39 +26,21 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.net.URI; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Enumeration; -import java.util.HashSet; -import java.util.Set; -import java.util.zip.ZipEntry; -import java.util.zip.ZipFile; +import java.security.MessageDigest; import org.apache.asterix.common.functions.ExternalFunctionLanguage; import org.apache.asterix.common.library.LibraryDescriptor; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.external.library.ExternalLibraryManager; +import org.apache.asterix.external.util.ExternalLibraryUtils; import org.apache.commons.io.FilenameUtils; -import org.apache.commons.io.IOUtils; -import org.apache.http.HttpEntity; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpStatus; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.IFileHandle; -import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.util.file.FileUtil; import org.apache.logging.log4j.LogManager; @@ -68,8 +50,6 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOpera private static final long serialVersionUID = 1L; - private static final int DOWNLOAD_RETRY_COUNT = 10; - private static final Logger LOGGER = LogManager.getLogger(LibraryDeployPrepareOperatorDescriptor.class); private final ExternalFunctionLanguage language; @@ -89,7 +69,7 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOpera IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { return new AbstractLibraryNodePushable(ctx) { - private byte[] copyBuffer; + private final byte[] copyBuf = new byte[4096]; @Override protected void execute() throws IOException { @@ -139,8 +119,7 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOpera if (LOGGER.isDebugEnabled()) { LOGGER.debug("Downloading library from {} into {}", libLocation, targetFile); } - download(targetFile); - + MessageDigest digest = libraryManager.download(targetFile, authToken, libLocation); // extract from the archive FileReference contentsDir = stageDir.getChild(ExternalLibraryManager.CONTENTS_DIR_NAME); mkdir(contentsDir); @@ -155,7 +134,7 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOpera // shouldn't happen throw new IOException("Unexpected file type: " + fileExt); } - unzip(targetFile, contentsDir); + libraryManager.unzip(targetFile, contentsDir); break; case PYTHON: if (!LibraryDescriptor.FILE_EXT_PYZ.equals(fileExt)) { @@ -176,172 +155,45 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOpera if (LOGGER.isTraceEnabled()) { LOGGER.trace("Writing library descriptor into {}", targetDescFile); } - writeDescriptor(targetDescFile, new LibraryDescriptor(language)); + writeDescriptor(targetDescFile, + new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest))); flushDirectory(contentsDir); flushDirectory(stageDir); } - private void download(FileReference targetFile) throws HyracksException { - try { - targetFile.getFile().createNewFile(); - } catch (IOException e) { - throw HyracksDataException.create(e); - } - IFileHandle fHandle = ioManager.open(targetFile, IIOManager.FileReadWriteMode.READ_WRITE, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); - try { - CloseableHttpClient httpClient = HttpClientBuilder.create().build(); - try { - // retry 10 times at maximum for downloading binaries - HttpGet request = new HttpGet(libLocation); - request.setHeader(HttpHeaders.AUTHORIZATION, authToken); - int tried = 0; - Exception trace = null; - while (tried < DOWNLOAD_RETRY_COUNT) { - tried++; - CloseableHttpResponse response = null; - try { - response = httpClient.execute(request); - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw new IOException("Http Error: " + response.getStatusLine().getStatusCode()); - } - HttpEntity e = response.getEntity(); - if (e == null) { - throw new IOException("No response"); - } - WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle); - OutputStream outStream = Channels.newOutputStream(outChannel); - e.writeTo(outStream); - outStream.flush(); - ioManager.sync(fHandle, true); - return; - } catch (IOException e) { - LOGGER.error("Unable to download library", e); - trace = e; - try { - ioManager.truncate(fHandle, 0); - } catch (IOException e2) { - throw HyracksDataException.create(e2); - } - } finally { - if (response != null) { - try { - response.close(); - } catch (IOException e) { - LOGGER.warn("Failed to close", e); - } - } - } - } - - throw HyracksDataException.create(trace); - } finally { - try { - httpClient.close(); - } catch (IOException e) { - LOGGER.warn("Failed to close", e); - } - } - } finally { - try { - ioManager.close(fHandle); - } catch (HyracksDataException e) { - LOGGER.warn("Failed to close", e); - } - } - } - - private void unzip(FileReference sourceFile, FileReference outputDir) throws IOException { - boolean logTraceEnabled = LOGGER.isTraceEnabled(); - Set<Path> newDirs = new HashSet<>(); - Path outputDirPath = outputDir.getFile().toPath().toAbsolutePath().normalize(); - try (ZipFile zipFile = new ZipFile(sourceFile.getFile())) { - Enumeration<? extends ZipEntry> entries = zipFile.entries(); - while (entries.hasMoreElements()) { - ZipEntry entry = entries.nextElement(); - if (entry.isDirectory()) { - continue; - } - Path entryOutputPath = outputDirPath.resolve(entry.getName()).toAbsolutePath().normalize(); - if (!entryOutputPath.startsWith(outputDirPath)) { - throw new IOException("Malformed ZIP archive: " + entry.getName()); - } - Path entryOutputDir = entryOutputPath.getParent(); - Files.createDirectories(entryOutputDir); - // remember new directories so we can flush them later - for (Path p = entryOutputDir; !p.equals(outputDirPath); p = p.getParent()) { - newDirs.add(p); - } - try (InputStream in = zipFile.getInputStream(entry)) { - FileReference entryOutputFileRef = - ioManager.resolveAbsolutePath(entryOutputPath.toString()); - if (logTraceEnabled) { - LOGGER.trace("Extracting file {}", entryOutputFileRef); - } - writeAndForce(entryOutputFileRef, in); - } - } - } - for (Path newDir : newDirs) { - flushDirectory(newDir); - } - } - private void shiv(FileReference sourceFile, FileReference stageDir, FileReference contentsDir, boolean writeMsgpack) throws IOException { FileReference msgpack = stageDir.getChild("msgpack.pyz"); if (writeMsgpack) { - writeShim(msgpack, writeMsgpack); + writeShim(msgpack); File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc"); FileReference msgPackFolderRef = new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath()); - unzip(msgpack, msgPackFolderRef); + libraryManager.unzip(msgpack, msgPackFolderRef); Files.delete(msgpack.getFile().toPath()); } - unzip(sourceFile, contentsDir); - writeShim(contentsDir.getChild("entrypoint.py"), false); + libraryManager.unzip(sourceFile, contentsDir); + writeShim(contentsDir.getChild("entrypoint.py")); } - private boolean writeShim(FileReference outputFile, boolean optional) throws IOException { + private void writeShim(FileReference outputFile) throws IOException { InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName()); if (is == null) { throw new IOException("Classpath does not contain necessary Python resources!"); } try { - writeAndForce(outputFile, is); + libraryManager.writeAndForce(outputFile, is, copyBuf); } finally { is.close(); } - return true; } private void writeDescriptor(FileReference descFile, LibraryDescriptor desc) throws IOException { byte[] bytes = libraryManager.serializeLibraryDescriptor(desc); - writeAndForce(descFile, new ByteArrayInputStream(bytes)); + libraryManager.writeAndForce(descFile, new ByteArrayInputStream(bytes), copyBuf); } - private void writeAndForce(FileReference outputFile, InputStream dataStream) throws IOException { - outputFile.getFile().createNewFile(); - IFileHandle fHandle = ioManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); - try { - WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle); - OutputStream outputStream = Channels.newOutputStream(outChannel); - IOUtils.copyLarge(dataStream, outputStream, getCopyBuffer()); - outputStream.flush(); - ioManager.sync(fHandle, true); - } finally { - ioManager.close(fHandle); - } - } - - private byte[] getCopyBuffer() { - if (copyBuffer == null) { - copyBuffer = new byte[4096]; - } - return copyBuffer; - } }; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalLibraryUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalLibraryUtils.java new file mode 100644 index 0000000..6e3be21 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalLibraryUtils.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util; + +import java.io.IOException; +import java.io.StringWriter; +import java.security.MessageDigest; + +import org.apache.hyracks.util.bytes.HexPrinter; + +public class ExternalLibraryUtils { + + private ExternalLibraryUtils() { + + } + + public static String digestToHexString(MessageDigest digest) throws IOException { + byte[] hashBytes = digest.digest(); + StringWriter hashBuilder = new StringWriter(); + HexPrinter.printHexString(hashBytes, 0, hashBytes.length, hashBuilder); + return hashBuilder.toString(); + } +} diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java index ebdeeef0..d832175 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java @@ -32,18 +32,20 @@ public final class CreateLibraryStatement extends AbstractStatement { private final DataverseName dataverseName; private final String libraryName; private final ExternalFunctionLanguage lang; + private final String hash; private final URI location; private final boolean replaceIfExists; private final String authToken; public CreateLibraryStatement(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage lang, - URI location, boolean replaceIfExists, String authToken) { + String hash, URI location, boolean replaceIfExists, String authToken) { this.dataverseName = dataverseName; this.libraryName = libraryName; this.lang = lang; this.location = location; this.replaceIfExists = replaceIfExists; this.authToken = authToken; + this.hash = hash; } public DataverseName getDataverseName() { @@ -58,6 +60,10 @@ public final class CreateLibraryStatement extends AbstractStatement { return lang; } + public String getHash() { + return hash; + } + public URI getLocation() { return location; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java index 0bf8c3d..8da01aa 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java @@ -172,7 +172,7 @@ public class MetadataTransactionContext extends MetadataCache { } public void dropLibrary(DataverseName dataverseName, String libraryName) { - Library library = new Library(dataverseName, libraryName, null, MetadataUtil.PENDING_NO_OP); + Library library = new Library(dataverseName, libraryName, null, null, MetadataUtil.PENDING_NO_OP); droppedCache.addLibraryIfNotExists(library); logAndApply(new MetadataLogicalOperation(library, false)); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java index 8430f44..10f5047 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java @@ -75,6 +75,7 @@ public final class MetadataRecordTypes { public static final String FIELD_NAME_IS_PRIMARY = "IsPrimary"; public static final String FIELD_NAME_KIND = "Kind"; public static final String FIELD_NAME_LANGUAGE = "Language"; + public static final String FIELD_NAME_HASH = "MD5Hash"; public static final String FIELD_NAME_LIBRARY_DATAVERSE_NAME = "LibraryDataverseName"; public static final String FIELD_NAME_LIBRARY_NAME = "LibraryName"; public static final String FIELD_NAME_LAST_REFRESH_TIME = "LastRefreshTime"; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java index 494b5f1..4a6a512 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java @@ -29,12 +29,14 @@ public class Library implements IMetadataEntity<Library> { private final DataverseName dataverse; private final String name; private final String language; + private final String hash; private final int pendingOp; - public Library(DataverseName dataverseName, String libraryName, String language, int pendingOp) { + public Library(DataverseName dataverseName, String libraryName, String language, String hash, int pendingOp) { this.dataverse = dataverseName; this.name = libraryName; this.language = language; + this.hash = hash; this.pendingOp = pendingOp; } @@ -50,6 +52,10 @@ public class Library implements IMetadataEntity<Library> { return language; } + public String getHash() { + return hash; + } + public int getPendingOp() { return pendingOp; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java index 4792398..03abec8 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java @@ -19,6 +19,7 @@ package org.apache.asterix.metadata.entitytupletranslators; +import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NAME_HASH; import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NAME_LANGUAGE; import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NAME_PENDING_OP; @@ -68,7 +69,10 @@ public class LibraryTupleTranslator extends AbstractTupleTranslator<Library> { String language = languageIdx >= 0 ? ((AString) libraryRecord.getValueByPos(languageIdx)).getStringValue() : ExternalFunctionLanguage.JAVA.name(); - return new Library(dataverseName, libraryName, language, pendingOp); + int hashIdx = libraryRecordType.getFieldIndex(FIELD_NAME_HASH); + String hash = hashIdx >= 0 ? ((AString) libraryRecord.getValueByPos(hashIdx)).getStringValue() : null; + + return new Library(dataverseName, libraryName, language, hash, pendingOp); } @Override @@ -119,6 +123,7 @@ public class LibraryTupleTranslator extends AbstractTupleTranslator<Library> { protected void writeOpenFields(Library library) throws HyracksDataException { writeLanguage(library); writePendingOp(library); + writeHash(library); } private void writeLanguage(Library library) throws HyracksDataException { @@ -133,6 +138,18 @@ public class LibraryTupleTranslator extends AbstractTupleTranslator<Library> { recordBuilder.addField(fieldName, fieldValue); } + private void writeHash(Library library) throws HyracksDataException { + String hash = library.getHash(); + + fieldName.reset(); + aString.setValue(FIELD_NAME_HASH); + stringSerde.serialize(aString, fieldName.getDataOutput()); + fieldValue.reset(); + aString.setValue(hash); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(fieldName, fieldValue); + } + private void writePendingOp(Library library) throws HyracksDataException { int pendingOp = library.getPendingOp(); diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml index 7a3206c..459f370 100644 --- a/asterixdb/asterix-server/pom.xml +++ b/asterixdb/asterix-server/pom.xml @@ -38,6 +38,8 @@ <properties> <root.dir>${basedir}/..</root.dir> + <pip.path>${project.build.directory}/bin/pip3</pip.path> + <shiv.path>${project.build.directory}/bin/shiv</shiv.path> </properties> <build> @@ -469,6 +471,94 @@ </usedDependencies> </configuration> </plugin> + <!-- TODO: this is just grody. workaround for not being able to use the proper dir in our integration tests on jenkins --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <executions> + <execution> + <id>venv</id> + <phase>${prepare-env.stage}</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <!--suppress UnresolvedMavenProperty --> + <executable>${python.path}</executable> + <workingDirectory>${project.build.directory}</workingDirectory> + <arguments> + <argument>-m</argument> + <argument>venv</argument> + <argument>${project.build.directory}</argument> + </arguments> + </configuration> + </execution> + <execution> + <id>shiv-install</id> + <phase>${prepare-env.stage}</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <executable>${pip.path}</executable> + <workingDirectory>${project.build.directory}</workingDirectory> + <arguments> + <argument>install</argument> + <argument>--exists-action</argument> + <argument>w</argument> + <argument>--upgrade</argument> + <argument>shiv</argument> + </arguments> + <environmentVariables> + <VIRTUALENV>${project.build.directory}</VIRTUALENV> + <PATH>${project.build.directory}${path.separator}${env.PATH}${file.separator}bin</PATH> + </environmentVariables> + </configuration> + </execution> + <execution> + <id>shiv-msgpack-shim</id> + <phase>${shim.stage}</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <executable>${shiv.path}</executable> + <workingDirectory>${project.build.directory}</workingDirectory> + <arguments> + <argument>-o </argument> + <argument>${project.build.directory}${file.separator}classes${file.separator}msgpack.pyz</argument> + <argument>msgpack</argument> + </arguments> + <environmentVariables> + <VIRTUALENV>${project.build.directory}</VIRTUALENV> + <PATH>${project.build.directory}${path.separator}${env.PATH}${file.separator}bin</PATH> + </environmentVariables> + </configuration> + </execution> + <execution> + <id>shiv-test-lib</id> + <phase>${pytestlib.stage}</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <executable>${shiv.path}</executable> + <workingDirectory>${project.build.directory}</workingDirectory> + <arguments> + <argument>-o </argument> + <argument>${project.build.directory}${file.separator}TweetSent.pyz</argument> + <argument>--site-packages</argument> + <argument>${project.build.directory}${file.separator}..${file.separator}..${file.separator}asterix-app${file.separator}src${file.separator}test${file.separator}resources${file.separator}TweetSent</argument> + <argument>scikit-learn</argument> + </arguments> + <environmentVariables> + <VIRTUALENV>${project.build.directory}</VIRTUALENV> + <PATH>${project.build.directory}${path.separator}${env.PATH}${file.separator}bin</PATH> + </environmentVariables> + </configuration> + </execution> + </executions> + </plugin> </plugins> <pluginManagement> <plugins> @@ -501,6 +591,18 @@ </build> <profiles> <profile> + <id>windows.python.envs</id> + <activation> + <os> + <family>Windows</family> + </os> + </activation> + <properties> + <pip.path>${project.build.directory}\Scripts\pip3.exe</pip.path> + <shiv.path>${project.build.directory}\Scripts\shiv.exe</shiv.path> + </properties> + </profile> + <profile> <id>opt-modules</id> <activation> <file> @@ -744,5 +846,17 @@ <artifactId>kite-data-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpmime</artifactId> + </dependency> </dependencies> </project> diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index 3cd433c..5cc17af 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -75,7 +75,8 @@ <itest.excludes>${global.itest.excludes}</itest.excludes> <license.stage>compile</license.stage> <resource.stage>process-classes</resource.stage> - <pyro-shim.stage>none</pyro-shim.stage> + <prepare-env.stage>none</prepare-env.stage> + <shim.stage>none</shim.stage> <pytestlib.stage>none</pytestlib.stage> <!-- Versions under dependencymanagement or used in many projects via properties --> @@ -664,7 +665,25 @@ </file> </activation> <properties> - <pyro-shim.stage>process-classes</pyro-shim.stage> + <prepare-env.stage>process-classes</prepare-env.stage> + <shim.stage>process-classes</shim.stage> + <pytestlib.stage>generate-test-resources</pytestlib.stage> + <global.itest.excludes/> + </properties> + </profile> + <profile> + <id>python-udf-test-only</id> + <activation> + <property> + <name>no.shim</name> + </property> + <file> + <exists>${python.path}</exists> + </file> + </activation> + <properties> + <prepare-env.stage>process-classes</prepare-env.stage> + <shim.stage>none</shim.stage> <pytestlib.stage>generate-test-resources</pytestlib.stage> <global.itest.excludes/> </properties> diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java index 787e823..01baa69 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java @@ -239,6 +239,8 @@ public class HttpUtil { public static final String IMG_PNG = "image/png"; public static final String TEXT_HTML = "text/html"; public static final String TEXT_PLAIN = "text/plain"; + public static final String APPLICATION_ZIP = "application/zip"; + public static final String APPLICATION_OCTET_STREAM = "application/octet-stream"; private ContentType() { }
