This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 409e6a88b9e4083e0c30c3abb83d1cf3a66b3411 Author: Ian Maxon <[email protected]> AuthorDate: Tue May 31 17:02:24 2022 -0700 [ASTERIXDB-3034][RT] Fenced UDFs - user model changes: yes - storage format changes: no - interface changes: yes Details: - Allow UDFs to be run via domain socket activated systemd services . This makes it so the UDF is run as a different user than the NC process itself Change-Id: Ibeb6228f2dc8edbf642e61cd5633c71913e18972 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16364 Reviewed-by: Wail Alkowaileet <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- asterixdb/asterix-app/pom.xml | 2 +- .../asterix-app/src/main/resources/entrypoint.py | 1 + .../api/common/AsterixHyracksIntegrationUtil.java | 7 +- .../asterix/test/runtime/LangExecutionUtil.java | 15 +- .../resources/runtimets/testsuite_it_python.xml | 14 +- asterixdb/asterix-docker/pom.xml | 68 ------- .../external/api/IExternalLangIPCProto.java | 105 +++++++++++ .../asterix/external/api/ILibraryEvaluator.java | 40 ++++ ...onIPCProto.java => AbstractPythonIPCProto.java} | 157 +++------------- .../external/ipc/PythonDomainSocketProto.java | 161 ++++++++++++++++ .../asterix/external/ipc/PythonMessageBuilder.java | 10 + .../asterix/external/ipc/PythonTCPSocketProto.java | 85 +++++++++ .../library/AbstractLibrarySocketEvaluator.java | 100 ++++++++++ .../ExternalScalarPythonFunctionEvaluator.java | 5 +- .../PythonLibraryDomainSocketEvaluator.java | 126 +++++++++++++ .../external/library/PythonLibraryEvaluator.java | 209 --------------------- .../library/PythonLibraryEvaluatorFactory.java | 163 +++++++++------- .../library/PythonLibraryTCPSocketEvaluator.java | 127 +++++++++++++ .../ExternalAssignBatchRuntimeFactory.java | 42 +++-- .../asterix/external/util/ExternalDataUtils.java | 74 ++++++-- .../docker/.gitattributes | 0 .../docker/Dockerfile | 0 .../docker/asterix-configuration.xml | 0 .../docker/fbm.adm | 0 .../docker/fbu.adm | 0 .../docker/supervisord.conf | 0 .../docker/twm.adm | 0 .../docker/twu.adm | 0 asterixdb/asterix-podman/pom.xml | 156 +++++++++++++++ .../test/podman/PodmanPythonFunctionIT.java | 103 ++++++++++ .../asterix/test/podman/PodmanUDFLibrarian.java | 85 +++++++++ .../src/test/resources/cc.conf} | 36 ++-- asterixdb/asterix-podman/src/test/resources/passwd | 1 + .../asterix-podman/src/test/resources/setup.sh | 8 + .../src/test/resources/socktest/Containerfile | 17 ++ .../asterix-podman/src/test/resources/testenv.conf | 3 + asterixdb/asterix-server/pom.xml | 67 +++++-- asterixdb/asterix-server/src/deb/control/control | 5 +- asterixdb/asterix-server/src/deb/control/postinst | 3 +- asterixdb/asterix-server/src/deb/control/preinst | 4 + .../src/deb/systemd/asterix-cc.service | 3 +- .../src/deb/systemd/asterix-nc.service | 1 + .../src/deb/systemd/cc.conf} | 33 ++-- .../systemd/{asterix-nc.service => pyudf.socket} | 18 +- .../systemd/{asterix-cc.service => [email protected]} | 16 +- .../src/deb/udf_listener.py} | 49 ++--- asterixdb/pom.xml | 2 +- .../control/common/controllers/NCConfig.java | 3 + 48 files changed, 1506 insertions(+), 618 deletions(-) diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index 51ede69956..8262ce6e99 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -388,7 +388,7 @@ <profile> <id>asterix-gerrit-asterix-app</id> <properties> - <test.excludes>**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/AqlExecutionTest.java,**/*Compression*Test.java,**/*Ssl*Test.java</test.excludes> + <test.excludes>**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/AqlExecutionTest.java,**/*Compression*Test.java,**/*Ssl*Test.java,**/Podman*.java</test.excludes> <itest.excludes>**/*.java</itest.excludes> </properties> <build> diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-app/src/main/resources/entrypoint.py index 7bad7ef485..918596ca33 100755 --- a/asterixdb/asterix-app/src/main/resources/entrypoint.py +++ b/asterixdb/asterix-app/src/main/resources/entrypoint.py @@ -168,6 +168,7 @@ class Wrapper(object): def quit(self): self.alive = False + self.disconnect_sock() return True def handle_call(self): diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 86d8ed4ad1..e8c2c1d00f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -43,6 +43,7 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.hyracks.bootstrap.CCApplication; import org.apache.asterix.hyracks.bootstrap.NCApplication; +import org.apache.asterix.lang.common.util.ExpressionUtils; import org.apache.asterix.test.dataflow.TestLsmIoOpCallbackFactory; import org.apache.asterix.test.dataflow.TestPrimaryIndexOperationTrackerFactory; import org.apache.commons.io.FileUtils; @@ -132,13 +133,13 @@ public class AsterixHyracksIntegrationUtil { cc = new ClusterControllerService(ccConfig, ccApplication); nodeNames = ccConfig.getConfigManager().getNodeNames(); - if (deleteOldInstanceData) { + if (deleteOldInstanceData && nodeNames != null) { deleteTransactionLogs(); removeTestStorageFiles(); deleteCCFiles(); } final List<NodeControllerService> nodeControllers = new ArrayList<>(); - for (String nodeId : nodeNames) { + for (String nodeId : ExpressionUtils.emptyIfNull(nodeNames)) { // mark this NC as virtual, so that the CC doesn't try to start via NCService... configManager.set(nodeId, NCConfig.Option.NCSERVICE_PORT, NCConfig.NCSERVICE_PORT_DISABLED); final INCApplication ncApplication = createNCApplication(); @@ -303,7 +304,7 @@ public class AsterixHyracksIntegrationUtil { stopCC(false); - if (deleteOldInstanceData) { + if (deleteOldInstanceData && nodeNames != null) { deleteTransactionLogs(); removeTestStorageFiles(); deleteCCFiles(); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java index 408882d082..d704d8e456 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.List; import org.apache.asterix.app.external.ExternalUDFLibrarian; +import org.apache.asterix.app.external.IExternalUDFLibrarian; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; @@ -65,11 +66,17 @@ public class LangExecutionUtil { } public static void setUp(String configFile, TestExecutor executor, boolean startHdfs) throws Exception { + setUp(configFile, executor, startHdfs, false, new ExternalUDFLibrarian()); + } + + public static void setUp(String configFile, TestExecutor executor, boolean startHdfs, boolean disableLangExec, + IExternalUDFLibrarian librarian) throws Exception { testExecutor = executor; File outdir = new File(PATH_ACTUAL); outdir.mkdirs(); - ExecutionTestUtil.setUp(cleanupOnStart, configFile, integrationUtil, startHdfs, null); - librarian = new ExternalUDFLibrarian(); + if (!disableLangExec) { + ExecutionTestUtil.setUp(cleanupOnStart, configFile, integrationUtil, startHdfs, null); + } testExecutor.setLibrarian(librarian); if (repeat != 1) { System.out.println("FYI: each test will be run " + repeat + " times."); @@ -151,7 +158,9 @@ public class LangExecutionUtil { NodeControllerService[] ncs = integrationUtil.ncs; // Checks that dataset files are uniformly distributed across each io device. for (NodeControllerService nc : ncs) { - checkNcStore(nc); + if (nc != null) { + checkNcStore(nc); + } } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml index 686ede22e9..284c2fd242 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml @@ -52,15 +52,7 @@ <test-case FilePath="external-library" check-warnings="true"> <compilation-unit name="py_function_error"> <output-dir compare="Clean-JSON">py_function_error</output-dir> - <expected-warn>ASX0201: External UDF returned exception. Returned exception was: Traceback (most recent call last): - File "entrypoint.py", line 181, in handle_call - result[0].append(self.next_tuple(*arg, key=self.mid)) - File "entrypoint.py", line 99, in next_tuple - return self.wrapped_fns[key](*args) - File "site-packages/roundtrip.py", line 32, in warning - raise ArithmeticError("oof") -ArithmeticError: oof - (in line 28, at column 1)</expected-warn> + <expected-warn>ArithmeticError: oof</expected-warn> </compilation-unit> </test-case> <test-case FilePath="external-library"> @@ -76,8 +68,8 @@ ArithmeticError: oof <test-case FilePath="external-library" check-warnings="true"> <compilation-unit name="crash"> <output-dir compare="Text">crash</output-dir> - <expected-warn>ASX0201: External UDF returned exception. Returned exception was: Function externallibtest:crash#0 failed to execute (in line 23, at column 1)</expected-warn> - <expected-warn>ASX0201: External UDF returned exception. Returned exception was: java.io.IOException: Python process exited with code: 1 (in line 23, at column 1)</expected-warn> + <expected-warn>ASX0201: External UDF returned exception.</expected-warn> + <expected-warn>ASX0201: External UDF returned exception.</expected-warn> </compilation-unit> </test-case> <test-case FilePath="external-library"> diff --git a/asterixdb/asterix-docker/pom.xml b/asterixdb/asterix-docker/pom.xml deleted file mode 100644 index 6c54337dd5..0000000000 --- a/asterixdb/asterix-docker/pom.xml +++ /dev/null @@ -1,68 +0,0 @@ -<!-- - ! Licensed to the Apache Software Foundation (ASF) under one - ! or more contributor license agreements. See the NOTICE file - ! distributed with this work for additional information - ! regarding copyright ownership. The ASF licenses this file - ! to you under the Apache License, Version 2.0 (the - ! "License"); you may not use this file except in compliance - ! with the License. You may obtain a copy of the License at - ! - ! http://www.apache.org/licenses/LICENSE-2.0 - ! - ! Unless required by applicable law or agreed to in writing, - ! software distributed under the License is distributed on an - ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ! KIND, either express or implied. See the License for the - ! specific language governing permissions and limitations - ! under the License. - !--> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <artifactId>apache-asterixdb</artifactId> - <groupId>org.apache.asterix</groupId> - <version>0.9.8-SNAPSHOT</version> - </parent> - <artifactId>asterix-docker</artifactId> - - <properties> - <root.dir>${basedir}/..</root.dir> - </properties> - - <licenses> - <license> - <name>Apache License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - <distribution>repo</distribution> - <comments>A business-friendly OSS license</comments> - </license> - </licenses> - - <profiles> - <profile> - <id>docker</id> - <build> - <plugins> - <plugin> - <groupId>com.spotify</groupId> - <artifactId>docker-maven-plugin</artifactId> - <version>0.2.11</version> - <configuration> - <imageName>asterixdb/demo</imageName> - <dockerDirectory>docker</dockerDirectory> - <resources> - <resource> - <targetPath>/</targetPath> - <directory>../asterix-server/target/</directory> - <include>asterix-server-${project.version}-binary-assembly.zip</include> - </resource> - </resources> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> - -</project> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalLangIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalLangIPCProto.java new file mode 100644 index 0000000000..35e59610f3 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalLangIPCProto.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.api; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.ipc.MessageType; +import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor; +import org.apache.asterix.om.pointables.AFlatValuePointable; +import org.apache.asterix.om.pointables.AListVisitablePointable; +import org.apache.asterix.om.pointables.ARecordVisitablePointable; +import org.apache.asterix.om.pointables.PointableAllocator; +import org.apache.asterix.om.pointables.base.IVisitablePointable; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.EnumDeserializer; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.types.TypeTagUtil; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; + +public interface IExternalLangIPCProto { + static void visitValueRef(IAType type, DataOutput out, IValueReference valueReference, + PointableAllocator pointableAllocator, MsgPackPointableVisitor pointableVisitor, boolean visitNull) + throws IOException { + IVisitablePointable pointable; + switch (type.getTypeTag()) { + case OBJECT: + pointable = pointableAllocator.allocateRecordValue(type); + pointable.set(valueReference); + pointableVisitor.visit((ARecordVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out)); + break; + case ARRAY: + case MULTISET: + pointable = pointableAllocator.allocateListValue(type); + pointable.set(valueReference); + pointableVisitor.visit((AListVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out)); + break; + case ANY: + ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER + .deserialize(valueReference.getByteArray()[valueReference.getStartOffset()]); + IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag); + visitValueRef(rtType, out, valueReference, pointableAllocator, pointableVisitor, visitNull); + break; + case MISSING: + case NULL: + if (!visitNull) { + return; + } + default: + pointable = pointableAllocator.allocateFieldValue(type); + pointable.set(valueReference); + pointableVisitor.visit((AFlatValuePointable) pointable, pointableVisitor.getTypeInfo(type, out)); + break; + } + } + + void start(); + + void helo() throws IOException, AsterixException; + + long init(String module, String clazz, String fn) throws IOException, AsterixException; + + ByteBuffer call(long functionId, IAType[] argTypes, IValueReference[] argValues, boolean nullCall) + throws IOException, AsterixException; + + ByteBuffer callMulti(long key, ArrayBackedValueStorage args, int numTuples) throws IOException, AsterixException; + + //For future use with interpreter reuse between jobs. + void quit() throws HyracksDataException; + + void receiveMsg() throws IOException, AsterixException; + + void sendHeader(long key, int msgLen) throws IOException; + + void sendMsg(ArrayBackedValueStorage content) throws IOException; + + void sendMsg() throws IOException; + + MessageType getResponseType(); + + long getRouteId(); + + DataOutputStream getSockOut(); +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ILibraryEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ILibraryEvaluator.java new file mode 100644 index 0000000000..8c6538b181 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ILibraryEvaluator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.api; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.om.functions.IExternalFunctionInfo; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.api.resources.IDeallocatable; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; + +public interface ILibraryEvaluator extends IDeallocatable { + + void start() throws IOException, AsterixException; + + long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException; + + ByteBuffer call(long id, IAType[] argTypes, IValueReference[] valueReferences, boolean nullCall) throws IOException; + + ByteBuffer callMulti(long id, ArrayBackedValueStorage arguments, int numTuples) throws IOException; +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/AbstractPythonIPCProto.java similarity index 51% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/AbstractPythonIPCProto.java index c803517f45..00d1dccf77 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/AbstractPythonIPCProto.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,7 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,7 +20,6 @@ package org.apache.asterix.external.ipc; import static org.apache.hyracks.ipc.impl.Message.HEADER_SIZE; -import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -26,16 +27,10 @@ import java.nio.ByteBuffer; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.api.IExternalLangIPCProto; import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor; -import org.apache.asterix.om.pointables.AFlatValuePointable; -import org.apache.asterix.om.pointables.AListVisitablePointable; -import org.apache.asterix.om.pointables.ARecordVisitablePointable; import org.apache.asterix.om.pointables.PointableAllocator; -import org.apache.asterix.om.pointables.base.IVisitablePointable; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.EnumDeserializer; import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.types.TypeTagUtil; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; @@ -45,28 +40,24 @@ import org.msgpack.core.MessagePack; import org.msgpack.core.MessageUnpacker; import org.msgpack.core.buffer.ArrayBufferInput; -public class PythonIPCProto { - - private final PythonMessageBuilder messageBuilder; - private final DataOutputStream sockOut; - private final ByteBuffer headerBuffer = ByteBuffer.allocate(21); - private ByteBuffer recvBuffer = ByteBuffer.allocate(32768); - private final ExternalFunctionResultRouter router; - private long routeId; - private Pair<ByteBuffer, Exception> bufferBox; - private final Process pythonProc; - private long maxFunctionId; - private final ArrayBufferInput unpackerInput; - private final MessageUnpacker unpacker; - private final ArrayBackedValueStorage argsStorage; - private final PointableAllocator pointableAllocator; - private final MsgPackPointableVisitor pointableVisitor; - - public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter router, Process pythonProc) { - this.sockOut = new DataOutputStream(sockOut); +public abstract class AbstractPythonIPCProto { + public static final int HEADER_SIZE_LEN_INCLUSIVE = 21; + protected final PythonMessageBuilder messageBuilder; + protected final DataOutputStream sockOut; + protected final ArrayBufferInput unpackerInput; + protected final MessageUnpacker unpacker; + protected final ArrayBackedValueStorage argsStorage; + protected final PointableAllocator pointableAllocator; + protected final MsgPackPointableVisitor pointableVisitor; + private final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_LEN_INCLUSIVE); + protected ByteBuffer recvBuffer = ByteBuffer.allocate(32768); + protected long routeId; + protected Pair<ByteBuffer, Exception> bufferBox; + protected long maxFunctionId; + + public AbstractPythonIPCProto(OutputStream sockOut) { messageBuilder = new PythonMessageBuilder(); - this.router = router; - this.pythonProc = pythonProc; + this.sockOut = new DataOutputStream(sockOut); this.maxFunctionId = 0L; unpackerInput = new ArrayBufferInput(new byte[0]); unpacker = MessagePack.newDefaultUnpacker(unpackerInput); @@ -75,12 +66,6 @@ public class PythonIPCProto { this.pointableVisitor = new MsgPackPointableVisitor(); } - public void start() { - Pair<Long, Pair<ByteBuffer, Exception>> keyAndBufferBox = router.insertRoute(recvBuffer); - this.routeId = keyAndBufferBox.getFirst(); - this.bufferBox = keyAndBufferBox.getSecond(); - } - public void helo() throws IOException, AsterixException { recvBuffer.clear(); recvBuffer.position(0); @@ -121,8 +106,8 @@ public class PythonIPCProto { messageBuilder.reset(); argsStorage.reset(); for (int i = 0; i < argTypes.length; i++) { - visitValueRef(argTypes[i], argsStorage.getDataOutput(), argValues[i], pointableAllocator, pointableVisitor, - nullCall); + IExternalLangIPCProto.visitValueRef(argTypes[i], argsStorage.getDataOutput(), argValues[i], + pointableAllocator, pointableVisitor, nullCall); } int len = argsStorage.getLength() + 5; sendHeader(functionId, len); @@ -154,42 +139,11 @@ public class PythonIPCProto { return recvBuffer; } - //For future use with interpreter reuse between jobs. public void quit() throws HyracksDataException { messageBuilder.quit(); - router.removeRoute(routeId); } - public void receiveMsg() throws IOException, AsterixException { - Exception except; - try { - synchronized (bufferBox) { - while ((bufferBox.getFirst().limit() == 0 || bufferBox.getSecond() != null) && pythonProc.isAlive()) { - bufferBox.wait(100); - } - } - except = router.getAndRemoveException(routeId); - if (!pythonProc.isAlive()) { - except = new IOException("Python process exited with code: " + pythonProc.exitValue()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, e); - } - if (except != null) { - throw new AsterixException(except); - } - if (bufferBox.getFirst() != recvBuffer) { - recvBuffer = bufferBox.getFirst(); - } - messageBuilder.readHead(recvBuffer); - if (messageBuilder.type == MessageType.ERROR) { - unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(), - recvBuffer.remaining()); - unpacker.reset(unpackerInput); - throw new AsterixException(unpacker.unpackString()); - } - } + public abstract void receiveMsg() throws IOException, AsterixException; public void sendHeader(long key, int msgLen) throws IOException { headerBuffer.clear(); @@ -226,65 +180,4 @@ public class PythonIPCProto { public DataOutputStream getSockOut() { return sockOut; } - - public static void visitValueRef(IAType type, DataOutput out, IValueReference valueReference, - PointableAllocator pointableAllocator, MsgPackPointableVisitor pointableVisitor, boolean visitNull) - throws IOException { - IVisitablePointable pointable; - switch (type.getTypeTag()) { - case OBJECT: - pointable = pointableAllocator.allocateRecordValue(type); - pointable.set(valueReference); - pointableVisitor.visit((ARecordVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out)); - break; - case ARRAY: - case MULTISET: - pointable = pointableAllocator.allocateListValue(type); - pointable.set(valueReference); - pointableVisitor.visit((AListVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out)); - break; - case ANY: - ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER - .deserialize(valueReference.getByteArray()[valueReference.getStartOffset()]); - IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag); - switch (rtTypeTag) { - case OBJECT: - pointable = pointableAllocator.allocateRecordValue(rtType); - pointable.set(valueReference); - pointableVisitor.visit((ARecordVisitablePointable) pointable, - pointableVisitor.getTypeInfo(rtType, out)); - break; - case ARRAY: - case MULTISET: - pointable = pointableAllocator.allocateListValue(rtType); - pointable.set(valueReference); - pointableVisitor.visit((AListVisitablePointable) pointable, - pointableVisitor.getTypeInfo(rtType, out)); - break; - case MISSING: - case NULL: - if (!visitNull) { - return; - } - default: - pointable = pointableAllocator.allocateFieldValue(rtType); - pointable.set(valueReference); - pointableVisitor.visit((AFlatValuePointable) pointable, - pointableVisitor.getTypeInfo(rtType, out)); - break; - } - break; - case MISSING: - case NULL: - if (!visitNull) { - return; - } - default: - pointable = pointableAllocator.allocateFieldValue(type); - pointable.set(valueReference); - pointableVisitor.visit((AFlatValuePointable) pointable, pointableVisitor.getTypeInfo(type, out)); - break; - } - } - } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonDomainSocketProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonDomainSocketProto.java new file mode 100644 index 0000000000..89f240a9ee --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonDomainSocketProto.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.ipc; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.api.IExternalLangIPCProto; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.msgpack.core.MessagePack; + +public class PythonDomainSocketProto extends AbstractPythonIPCProto implements IExternalLangIPCProto { + private final String wd; + SocketChannel chan; + private ByteBuffer headerBuffer; + private ProcessHandle pid; + public static final int HYR_HEADER_SIZE = 21; // 4 (sz) + 8 (mid) + 8 (rmid) + 1 (flags) + public static final int HYR_HEADER_SIZE_NOSZ = 17; // 8 + 8 + 1 + + public PythonDomainSocketProto(OutputStream sockOut, SocketChannel chan, String wd) { + super(sockOut); + this.chan = chan; + this.wd = wd; + headerBuffer = ByteBuffer.allocate(HYR_HEADER_SIZE); + } + + @Override + public void start() { + } + + @Override + public void helo() throws IOException, AsterixException { + recvBuffer.clear(); + recvBuffer.position(0); + recvBuffer.limit(0); + messageBuilder.reset(); + messageBuilder.helloDS(wd); + sendHeader(routeId, messageBuilder.getLength()); + sendMsg(true); + receiveMsg(true); + byte pidType = recvBuffer.get(); + if (pidType != MessagePack.Code.UINT32 && pidType != MessagePack.Code.UINT16) { + throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, + "Returned pid type is incorrect: " + pidType); + } + switch (pidType) { + case MessagePack.Code.UINT32: + pid = ProcessHandle.of(recvBuffer.getInt()).get(); + break; + case MessagePack.Code.UINT16: + pid = ProcessHandle.of(recvBuffer.getShort()).get(); + break; + case MessagePack.Code.UINT8: + pid = ProcessHandle.of(recvBuffer.get()).get(); + break; + default: + throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, + "Returned pid type is incorrect: " + pidType); + } + if (getResponseType() != MessageType.HELO) { + throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE, + "Expected HELO, recieved " + getResponseType().name()); + } + } + + @Override + public void sendMsg() throws IOException { + sendMsg(false); + } + + @Override + public void sendMsg(ArrayBackedValueStorage args) throws IOException { + sendMsg(false, args); + } + + public void sendMsg(boolean sendIfDead) throws IOException { + if (!sendIfDead && (pid == null || !pid.isAlive())) { + return; + } + super.sendMsg(); + } + + public void sendMsg(boolean sendIfDead, ArrayBackedValueStorage args) throws IOException { + if (!sendIfDead && (pid == null || !pid.isAlive())) { + return; + } + super.sendMsg(args); + } + + @Override + public void receiveMsg() throws IOException, AsterixException { + receiveMsg(false); + } + + public void receiveMsg(boolean sendIfDead) throws IOException, AsterixException { + if (!sendIfDead && (pid == null || !pid.isAlive())) { + throw new AsterixException("Python process exited unexpectedly"); + } + readFully(headerBuffer.capacity(), headerBuffer); + if (headerBuffer.remaining() < Integer.BYTES) { + recvBuffer.limit(0); + throw new AsterixException("Python process exited unexpectedly"); + } + int msgSz = headerBuffer.getInt() - HYR_HEADER_SIZE_NOSZ; + if (recvBuffer.capacity() < msgSz) { + recvBuffer = ByteBuffer.allocate(((msgSz / 32768) + 1) * 32768); + } + readFully(msgSz, recvBuffer); + messageBuilder.readHead(recvBuffer); + if (messageBuilder.type == MessageType.ERROR) { + unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(), + recvBuffer.remaining()); + unpacker.reset(unpackerInput); + throw new AsterixException(unpacker.unpackString().replace('\0', ' ')); + } + } + + private void readFully(int msgSz, ByteBuffer buf) throws IOException, AsterixException { + buf.limit(msgSz); + buf.clear(); + int read; + int size = msgSz; + while (size > 0) { + read = chan.read(buf); + if (read < 0) { + throw new AsterixException("Socket closed"); + } + size -= read; + } + buf.flip(); + } + + @Override + public void quit() throws HyracksDataException { + messageBuilder.quit(); + } + + public ProcessHandle getPid() { + return pid; + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java index 5429657fe7..20f8306274 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java @@ -82,6 +82,16 @@ public class PythonMessageBuilder { buf.put(serAddr); } + public void helloDS(String modulePath) throws IOException { + this.type = MessageType.HELO; + // sum(string lengths) + 2 from fix array tag and message type + dataLength = PythonMessageBuilder.getStringLength(modulePath) + 2; + packHeader(); + MessagePackUtils.packFixArrayHeader(buf, (byte) 2); + MessagePackUtils.packStr(buf, "HELLO"); + MessagePackUtils.packStr(buf, modulePath); + } + public void quit() throws HyracksDataException { this.type = MessageType.QUIT; dataLength = getStringLength("QUIT"); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonTCPSocketProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonTCPSocketProto.java new file mode 100644 index 0000000000..7fd3de49f9 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonTCPSocketProto.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.ipc; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.api.IExternalLangIPCProto; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class PythonTCPSocketProto extends AbstractPythonIPCProto + implements org.apache.asterix.external.api.IExternalLangIPCProto { + + private final ExternalFunctionResultRouter router; + private final Process proc; + + public PythonTCPSocketProto(OutputStream sockOut, ExternalFunctionResultRouter router, Process pythonProc) { + super(sockOut); + this.router = router; + this.proc = pythonProc; + } + + @Override + public void start() { + Pair<Long, Pair<ByteBuffer, Exception>> keyAndBufferBox = router.insertRoute(recvBuffer); + this.routeId = keyAndBufferBox.getFirst(); + this.bufferBox = keyAndBufferBox.getSecond(); + } + + @Override + public void quit() throws HyracksDataException { + messageBuilder.quit(); + router.removeRoute(routeId); + } + + @Override + public void receiveMsg() throws IOException, AsterixException { + Exception except; + try { + synchronized (bufferBox) { + while ((bufferBox.getFirst().limit() == 0 || bufferBox.getSecond() != null) && proc.isAlive()) { + bufferBox.wait(100); + } + } + except = router.getAndRemoveException(routeId); + if (!proc.isAlive()) { + except = new IOException("Python process exited with code: " + proc.exitValue()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, e); + } + if (except != null) { + throw new AsterixException(except); + } + if (bufferBox.getFirst() != recvBuffer) { + recvBuffer = bufferBox.getFirst(); + } + messageBuilder.readHead(recvBuffer); + if (messageBuilder.type == MessageType.ERROR) { + unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(), + recvBuffer.remaining()); + unpacker.reset(unpackerInput); + throw new AsterixException(unpacker.unpackString()); + } + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/AbstractLibrarySocketEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/AbstractLibrarySocketEvaluator.java new file mode 100644 index 0000000000..6fcfdcff3c --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/AbstractLibrarySocketEvaluator.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.library; + +import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_UDF_EXCEPTION; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.api.IExternalLangIPCProto; +import org.apache.asterix.external.api.ILibraryEvaluator; +import org.apache.asterix.om.functions.IExternalFunctionInfo; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.api.dataflow.TaskAttemptId; +import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.exceptions.Warning; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.std.base.AbstractStateObject; + +public abstract class AbstractLibrarySocketEvaluator extends AbstractStateObject implements ILibraryEvaluator { + + protected IExternalLangIPCProto proto; + protected TaskAttemptId task; + protected IWarningCollector warningCollector; + protected SourceLocation sourceLoc; + + public AbstractLibrarySocketEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, TaskAttemptId task, + IWarningCollector warningCollector, SourceLocation sourceLoc) { + super(jobId, evaluatorId); + this.task = task; + this.warningCollector = warningCollector; + this.sourceLoc = sourceLoc; + } + + @Override + public long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException { + List<String> externalIdents = finfo.getExternalIdentifier(); + String packageModule = externalIdents.get(0); + String clazz; + String fn; + String externalIdent1 = externalIdents.get(1); + int idx = externalIdent1.lastIndexOf('.'); + if (idx >= 0) { + clazz = externalIdent1.substring(0, idx); + fn = externalIdent1.substring(idx + 1); + } else { + clazz = null; + fn = externalIdent1; + } + return proto.init(packageModule, clazz, fn); + } + + @Override + public ByteBuffer call(long id, IAType[] argTypes, IValueReference[] valueReferences, boolean nullCall) + throws IOException { + ByteBuffer ret = null; + try { + ret = proto.call(id, argTypes, valueReferences, nullCall); + } catch (AsterixException e) { + if (warningCollector.shouldWarn()) { + warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage())); + } + } + return ret; + } + + @Override + public ByteBuffer callMulti(long id, ArrayBackedValueStorage arguments, int numTuples) throws IOException { + ByteBuffer ret = null; + try { + ret = proto.callMulti(id, arguments, numTuples); + } catch (AsterixException e) { + if (warningCollector.shouldWarn()) { + warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage())); + } + } + return ret; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java index 94a4dd2cf8..fb8d761520 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.api.ILibraryEvaluator; import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.om.functions.IExternalFunctionInfo; @@ -49,7 +50,7 @@ import org.msgpack.core.buffer.ArrayBufferInput; class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvaluator { - private final PythonLibraryEvaluator libraryEvaluator; + private final ILibraryEvaluator libraryEvaluator; private final ArrayBackedValueStorage resultBuffer = new ArrayBackedValueStorage(); private final ByteBuffer argHolder; @@ -115,7 +116,7 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua return; } try { - ByteBuffer res = libraryEvaluator.callPython(fnId, argTypes, argValues, nullCall); + ByteBuffer res = libraryEvaluator.call(fnId, argTypes, argValues, nullCall); resultBuffer.reset(); wrap(res, resultBuffer.getDataOutput()); } catch (Exception e) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryDomainSocketEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryDomainSocketEvaluator.java new file mode 100644 index 0000000000..056aa9a014 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryDomainSocketEvaluator.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.library; + +import java.io.IOException; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.lang.invoke.VarHandle; +import java.net.ProtocolFamily; +import java.net.SocketAddress; +import java.net.StandardProtocolFamily; +import java.nio.channels.Channels; +import java.nio.channels.SocketChannel; +import java.nio.file.Path; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.external.ipc.PythonDomainSocketProto; +import org.apache.asterix.om.functions.IExternalFunctionInfo; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.TaskAttemptId; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class PythonLibraryDomainSocketEvaluator extends AbstractLibrarySocketEvaluator { + + private final ILibraryManager libMgr; + private final Path sockPath; + SocketChannel chan; + ProcessHandle pid; + private static final Logger LOGGER = LogManager.getLogger(ExternalLibraryManager.class); + + public PythonLibraryDomainSocketEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr, + TaskAttemptId task, IWarningCollector warningCollector, SourceLocation sourceLoc, Path sockPath) { + super(jobId, evaluatorId, task, warningCollector, sourceLoc); + this.libMgr = libMgr; + this.sockPath = sockPath; + } + + public void start() throws IOException, AsterixException { + PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id; + PythonLibrary library = + (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName()); + String wd = library.getFile().getAbsolutePath(); + MethodHandles.Lookup lookup = MethodHandles.lookup(); + SocketAddress sockAddr; + try { + VarHandle sockEnum = lookup.in(StandardProtocolFamily.class) + .findStaticVarHandle(StandardProtocolFamily.class, "UNIX", StandardProtocolFamily.class); + Class domainSock = Class.forName("java.net.UnixDomainSocketAddress"); + MethodType unixDomainSockAddrType = MethodType.methodType(domainSock, Path.class); + MethodHandle unixDomainSockAddr = lookup.findStatic(domainSock, "of", unixDomainSockAddrType); + MethodType sockChanMethodType = MethodType.methodType(SocketChannel.class, ProtocolFamily.class); + MethodHandle sockChanOpen = lookup.findStatic(SocketChannel.class, "open", sockChanMethodType); + sockAddr = ((SocketAddress) unixDomainSockAddr.invoke(sockPath)); + chan = (SocketChannel) sockChanOpen.invoke(sockEnum.get()); + } catch (Throwable e) { + throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR, e); + } + chan.connect(sockAddr); + proto = new PythonDomainSocketProto(Channels.newOutputStream(chan), chan, wd); + proto.start(); + proto.helo(); + this.pid = ((PythonDomainSocketProto) proto).getPid(); + } + + @Override + public void deallocate() { + try { + if (proto != null) { + proto.quit(); + } + if (chan != null) { + chan.close(); + } + } catch (IOException e) { + LOGGER.error("Caught exception exiting Python UDF:", e); + } + if (pid != null && pid.isAlive()) { + LOGGER.error("Python UDF " + pid.pid() + " did not exit as expected."); + } + } + + static PythonLibraryDomainSocketEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr, + IHyracksTaskContext ctx, IWarningCollector warningCollector, SourceLocation sourceLoc) + throws IOException, AsterixException { + PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(), + finfo.getLibraryName(), Thread.currentThread()); + PythonLibraryDomainSocketEvaluator evaluator = + (PythonLibraryDomainSocketEvaluator) ctx.getStateObject(evaluatorId); + if (evaluator == null) { + Path sockPath = Path.of(ctx.getJobletContext().getServiceContext().getAppConfig() + .getString(NCConfig.Option.PYTHON_DS_PATH)); + evaluator = new PythonLibraryDomainSocketEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr, + ctx.getTaskAttemptId(), warningCollector, sourceLoc, sockPath); + ctx.getJobletContext().registerDeallocatable(evaluator); + evaluator.start(); + ctx.setStateObject(evaluator); + } + return evaluator; + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java deleted file mode 100644 index f82b30d91d..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.library; - -import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_UDF_EXCEPTION; -import static org.msgpack.core.MessagePack.Code.ARRAY16; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.external.ipc.ExternalFunctionResultRouter; -import org.apache.asterix.external.ipc.PythonIPCProto; -import org.apache.asterix.external.library.msgpack.MessagePackUtils; -import org.apache.asterix.om.functions.IExternalFunctionInfo; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.EnumDeserializer; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.types.TypeTagUtil; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.TaskAttemptId; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.exceptions.IWarningCollector; -import org.apache.hyracks.api.exceptions.SourceLocation; -import org.apache.hyracks.api.exceptions.Warning; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.resources.IDeallocatable; -import org.apache.hyracks.data.std.api.IValueReference; -import org.apache.hyracks.data.std.primitive.TaggedValuePointable; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -import org.apache.hyracks.dataflow.std.base.AbstractStateObject; -import org.apache.hyracks.ipc.impl.IPCSystem; - -public class PythonLibraryEvaluator extends AbstractStateObject implements IDeallocatable { - - public static final String ENTRYPOINT = "entrypoint.py"; - public static final String SITE_PACKAGES = "site-packages"; - - private Process p; - private ILibraryManager libMgr; - private File pythonHome; - private PythonIPCProto proto; - private ExternalFunctionResultRouter router; - private IPCSystem ipcSys; - private String sitePkgs; - private List<String> pythonArgs; - private Map<String, String> pythonEnv; - private TaskAttemptId task; - private IWarningCollector warningCollector; - private SourceLocation sourceLoc; - - public PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr, - File pythonHome, String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv, - ExternalFunctionResultRouter router, IPCSystem ipcSys, TaskAttemptId task, - IWarningCollector warningCollector, SourceLocation sourceLoc) { - super(jobId, evaluatorId); - this.libMgr = libMgr; - this.pythonHome = pythonHome; - this.sitePkgs = sitePkgs; - this.pythonArgs = pythonArgs; - this.pythonEnv = pythonEnv; - this.router = router; - this.task = task; - this.ipcSys = ipcSys; - this.warningCollector = warningCollector; - this.sourceLoc = sourceLoc; - } - - private void initialize() throws IOException, AsterixException { - PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id; - PythonLibrary library = - (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName()); - String wd = library.getFile().getAbsolutePath(); - int port = ipcSys.getSocketAddress().getPort(); - List<String> args = new ArrayList<>(); - args.add(pythonHome.getAbsolutePath()); - args.addAll(pythonArgs); - args.add(ENTRYPOINT); - args.add(InetAddress.getLoopbackAddress().getHostAddress()); - args.add(Integer.toString(port)); - args.add(sitePkgs); - ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0])); - pb.environment().putAll(pythonEnv); - pb.directory(new File(wd)); - p = pb.start(); - proto = new PythonIPCProto(p.getOutputStream(), router, p); - proto.start(); - proto.helo(); - } - - public long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException { - List<String> externalIdents = finfo.getExternalIdentifier(); - String packageModule = externalIdents.get(0); - String clazz; - String fn; - String externalIdent1 = externalIdents.get(1); - int idx = externalIdent1.lastIndexOf('.'); - if (idx >= 0) { - clazz = externalIdent1.substring(0, idx); - fn = externalIdent1.substring(idx + 1); - } else { - clazz = null; - fn = externalIdent1; - } - return proto.init(packageModule, clazz, fn); - } - - public ByteBuffer callPython(long id, IAType[] argTypes, IValueReference[] valueReferences, boolean nullCall) - throws IOException { - ByteBuffer ret = null; - try { - ret = proto.call(id, argTypes, valueReferences, nullCall); - } catch (AsterixException e) { - if (warningCollector.shouldWarn()) { - warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage())); - } - } - return ret; - } - - public ByteBuffer callPythonMulti(long id, ArrayBackedValueStorage arguments, int numTuples) throws IOException { - ByteBuffer ret = null; - try { - ret = proto.callMulti(id, arguments, numTuples); - } catch (AsterixException e) { - if (warningCollector.shouldWarn()) { - warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage())); - } - } - return ret; - } - - @Override - public void deallocate() { - if (p != null) { - boolean dead = false; - try { - p.destroy(); - dead = p.waitFor(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - //gonna kill it anyway - } - if (!dead) { - p.destroyForcibly(); - } - } - router.removeRoute(proto.getRouteId()); - } - - public static ATypeTag peekArgument(IAType type, IValueReference valueReference) throws HyracksDataException { - ATypeTag tag = type.getTypeTag(); - if (tag == ATypeTag.ANY) { - TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable(); - pointy.set(valueReference); - ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag()); - IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag); - return MessagePackUtils.peekUnknown(rtType); - } else { - return MessagePackUtils.peekUnknown(type); - } - } - - public static void setVoidArgument(ArrayBackedValueStorage argHolder) throws IOException { - argHolder.getDataOutput().writeByte(ARRAY16); - argHolder.getDataOutput().writeShort((short) 0); - } - - public static PythonLibraryEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr, - ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx, - String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv, IWarningCollector warningCollector, - SourceLocation sourceLoc) throws IOException, AsterixException { - PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(), - finfo.getLibraryName(), Thread.currentThread()); - PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId); - if (evaluator == null) { - evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr, pythonHome, - sitePkgs, pythonArgs, pythonEnv, router, ipcSys, ctx.getTaskAttemptId(), warningCollector, - sourceLoc); - ctx.getJobletContext().registerDeallocatable(evaluator); - evaluator.initialize(); - ctx.setStateObject(evaluator); - } - return evaluator; - } - -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java index 06c9bc99a6..63a6ec37c6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java @@ -18,10 +18,12 @@ */ package org.apache.asterix.external.library; -import static org.apache.asterix.external.library.PythonLibraryEvaluator.SITE_PACKAGES; +import static org.apache.asterix.external.library.PythonLibraryTCPSocketEvaluator.SITE_PACKAGES; import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -31,8 +33,10 @@ import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.external.api.ILibraryEvaluator; import org.apache.asterix.external.ipc.ExternalFunctionResultRouter; import org.apache.asterix.om.functions.IExternalFunctionInfo; +import org.apache.commons.lang3.SystemUtils; import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.SourceLocation; @@ -40,83 +44,116 @@ import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.ipc.impl.IPCSystem; public class PythonLibraryEvaluatorFactory { - private final ILibraryManager libraryManager; - private final IPCSystem ipcSys; - private final File pythonPath; - private final IHyracksTaskContext ctx; - private final ExternalFunctionResultRouter router; - private final String sitePackagesPath; - private final List<String> pythonArgs; - private final Map<String, String> pythonEnv; + + private ILibraryManager libraryManager; + private IPCSystem ipcSys; + private File pythonPath; + private IHyracksTaskContext ctx; + private ExternalFunctionResultRouter router; + private String sitePackagesPath; + private List<String> pythonArgs; + private Map<String, String> pythonEnv; + + private boolean domainSockEnable; public PythonLibraryEvaluatorFactory(IHyracksTaskContext ctx) throws AsterixException { this.ctx = ctx; + String dsPath = + ctx.getJobletContext().getServiceContext().getAppConfig().getString(NCConfig.Option.PYTHON_DS_PATH); + config(dsPath == null ? null : Path.of(dsPath)); libraryManager = ((INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext()) .getLibraryManager(); - router = libraryManager.getRouter(); - ipcSys = libraryManager.getIPCI(); - IApplicationConfig appCfg = ctx.getJobletContext().getServiceContext().getAppConfig(); - String pythonPathCmd = appCfg.getString(NCConfig.Option.PYTHON_CMD); - boolean findPython = appCfg.getBoolean(NCConfig.Option.PYTHON_CMD_AUTOLOCATE); - pythonArgs = new ArrayList<>(); - if (pythonPathCmd == null) { - if (findPython) { - //if absolute path to interpreter is not specified, try to use environmental python - pythonPathCmd = "/usr/bin/env"; - pythonArgs.add("python3"); - } else { - throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, "Python interpreter not specified, and " - + NCConfig.Option.PYTHON_CMD_AUTOLOCATE.ini() + " is false"); + if (!domainSockEnable) { + router = libraryManager.getRouter(); + ipcSys = libraryManager.getIPCI(); + IApplicationConfig appCfg = ctx.getJobletContext().getServiceContext().getAppConfig(); + String pythonPathCmd = appCfg.getString(NCConfig.Option.PYTHON_CMD); + boolean findPython = appCfg.getBoolean(NCConfig.Option.PYTHON_CMD_AUTOLOCATE); + pythonArgs = new ArrayList<>(); + if (pythonPathCmd == null) { + if (findPython) { + //if absolute path to interpreter is not specified, try to use environmental python + pythonPathCmd = "/usr/bin/env"; + pythonArgs.add("python3"); + } else { + throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, + "Python interpreter not specified or domain socket not found, and " + + NCConfig.Option.PYTHON_CMD_AUTOLOCATE.ini() + " is false"); + } } - } - pythonEnv = new HashMap<>(); - String[] envRaw = appCfg.getStringArray((NCConfig.Option.PYTHON_ENV)); - if (envRaw != null) { - for (String rawEnvArg : envRaw) { - //TODO: i think equals is shared among all unixes and windows. but it needs verification - if (rawEnvArg.length() < 1) { - continue; + pythonEnv = new HashMap<>(); + String[] envRaw = appCfg.getStringArray((NCConfig.Option.PYTHON_ENV)); + if (envRaw != null) { + for (String rawEnvArg : envRaw) { + //TODO: i think equals is shared among all unixes and windows. but it needs verification + if (rawEnvArg.length() < 1) { + continue; + } + String[] rawArgSplit = rawEnvArg.split("(?<!\\\\)=", 2); + if (rawArgSplit.length < 2) { + throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, + "Invalid environment variable format detected."); + } + pythonEnv.put(rawArgSplit[0], rawArgSplit[1]); } - String[] rawArgSplit = rawEnvArg.split("(?<!\\\\)=", 2); - if (rawArgSplit.length < 2) { - throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, - "Invalid environment variable format detected."); + } + pythonPath = new File(pythonPathCmd); + List<String> sitePkgs = new ArrayList<>(); + sitePkgs.add(SITE_PACKAGES); + String[] addlSitePackages = appCfg.getStringArray((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES)); + for (String sitePkg : addlSitePackages) { + if (sitePkg.length() > 0) { + sitePkgs.add(sitePkg); } - pythonEnv.put(rawArgSplit[0], rawArgSplit[1]); } - } - pythonPath = new File(pythonPathCmd); - List<String> sitePkgs = new ArrayList<>(); - sitePkgs.add(SITE_PACKAGES); - String[] addlSitePackages = appCfg.getStringArray((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES)); - for (String sitePkg : addlSitePackages) { - if (sitePkg.length() > 0) { - sitePkgs.add(sitePkg); + if (appCfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) { + sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator); } - } - if (appCfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) { - sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator); - } - String[] pythonArgsRaw = appCfg.getStringArray(NCConfig.Option.PYTHON_ARGS); - if (pythonArgsRaw != null) { - for (String arg : pythonArgsRaw) { - if (arg.length() > 0) { - pythonArgs.add(arg); + String[] pythonArgsRaw = appCfg.getStringArray(NCConfig.Option.PYTHON_ARGS); + if (pythonArgsRaw != null) { + for (String arg : pythonArgsRaw) { + if (arg.length() > 0) { + pythonArgs.add(arg); + } } } + StringBuilder sitePackagesPathBuilder = new StringBuilder(); + for (int i = 0; i < sitePkgs.size() - 1; i++) { + sitePackagesPathBuilder.append(sitePkgs.get(i)); + sitePackagesPathBuilder.append(File.pathSeparator); + } + sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1)); + sitePackagesPath = sitePackagesPathBuilder.toString(); } - StringBuilder sitePackagesPathBuilder = new StringBuilder(); - for (int i = 0; i < sitePkgs.size() - 1; i++) { - sitePackagesPathBuilder.append(sitePkgs.get(i)); - sitePackagesPathBuilder.append(File.pathSeparator); - } - sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1)); - sitePackagesPath = sitePackagesPathBuilder.toString(); } - public PythonLibraryEvaluator getEvaluator(IExternalFunctionInfo fnInfo, SourceLocation sourceLoc) + public ILibraryEvaluator getEvaluator(IExternalFunctionInfo fnInfo, SourceLocation sourceLoc) throws IOException, AsterixException { - return PythonLibraryEvaluator.getInstance(fnInfo, libraryManager, router, ipcSys, pythonPath, ctx, - sitePackagesPath, pythonArgs, pythonEnv, ctx.getWarningCollector(), sourceLoc); + if (domainSockEnable) { + return PythonLibraryDomainSocketEvaluator.getInstance(fnInfo, libraryManager, ctx, + ctx.getWarningCollector(), sourceLoc); + } else { + return PythonLibraryTCPSocketEvaluator.getInstance(fnInfo, libraryManager, router, ipcSys, pythonPath, ctx, + sitePackagesPath, pythonArgs, pythonEnv, ctx.getWarningCollector(), sourceLoc); + } + } + + private void config(Path sockPath) throws AsterixException { + if (sockPath == null) { + domainSockEnable = false; + return; + } + Runtime rt = Runtime.getRuntime(); + if (rt.version().feature() >= 17 && SystemUtils.IS_OS_LINUX) { + if (Files.exists(sockPath)) { + domainSockEnable = true; + } else { + throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, + "Domain socket was not found at specified path"); + } + } else { + throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, + "Domain socket path specified, but Java version is below 17 or OS is not Linux"); + } } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java new file mode 100644 index 0000000000..385d738f83 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.library; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.external.ipc.ExternalFunctionResultRouter; +import org.apache.asterix.external.ipc.PythonTCPSocketProto; +import org.apache.asterix.om.functions.IExternalFunctionInfo; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.TaskAttemptId; +import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.ipc.impl.IPCSystem; + +public class PythonLibraryTCPSocketEvaluator extends AbstractLibrarySocketEvaluator { + + public static final String ENTRYPOINT = "entrypoint.py"; + public static final String SITE_PACKAGES = "site-packages"; + + private Process p; + private ILibraryManager libMgr; + private File pythonHome; + private ExternalFunctionResultRouter router; + private IPCSystem ipcSys; + private String sitePkgs; + private List<String> pythonArgs; + private Map<String, String> pythonEnv; + + public PythonLibraryTCPSocketEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr, + File pythonHome, String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv, + ExternalFunctionResultRouter router, IPCSystem ipcSys, TaskAttemptId task, + IWarningCollector warningCollector, SourceLocation sourceLoc) { + super(jobId, evaluatorId, task, warningCollector, sourceLoc); + this.libMgr = libMgr; + this.pythonHome = pythonHome; + this.sitePkgs = sitePkgs; + this.pythonArgs = pythonArgs; + this.pythonEnv = pythonEnv; + this.router = router; + this.ipcSys = ipcSys; + } + + @Override + public void start() throws IOException, AsterixException { + PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id; + PythonLibrary library = + (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName()); + String wd = library.getFile().getAbsolutePath(); + int port = ipcSys.getSocketAddress().getPort(); + List<String> args = new ArrayList<>(); + args.add(pythonHome.getAbsolutePath()); + args.addAll(pythonArgs); + args.add(ENTRYPOINT); + args.add(InetAddress.getLoopbackAddress().getHostAddress()); + args.add(Integer.toString(port)); + args.add(sitePkgs); + ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0])); + pb.environment().putAll(pythonEnv); + pb.directory(new File(wd)); + p = pb.start(); + proto = new PythonTCPSocketProto(p.getOutputStream(), router, p); + proto.start(); + proto.helo(); + } + + @Override + public void deallocate() { + if (p != null) { + boolean dead = false; + try { + p.destroy(); + dead = p.waitFor(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + //gonna kill it anyway + } + if (!dead) { + p.destroyForcibly(); + } + } + router.removeRoute(proto.getRouteId()); + } + + static PythonLibraryTCPSocketEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr, + ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx, + String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv, IWarningCollector warningCollector, + SourceLocation sourceLoc) throws IOException, AsterixException { + PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(), + finfo.getLibraryName(), Thread.currentThread()); + PythonLibraryTCPSocketEvaluator evaluator = (PythonLibraryTCPSocketEvaluator) ctx.getStateObject(evaluatorId); + if (evaluator == null) { + evaluator = new PythonLibraryTCPSocketEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr, + pythonHome, sitePkgs, pythonArgs, pythonEnv, router, ipcSys, ctx.getTaskAttemptId(), + warningCollector, sourceLoc); + ctx.getJobletContext().registerDeallocatable(evaluator); + evaluator.start(); + ctx.setStateObject(evaluator); + } + return evaluator; + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java index 741dad2f85..5f8a3f01b5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java @@ -33,12 +33,13 @@ import java.util.List; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.ipc.PythonIPCProto; -import org.apache.asterix.external.library.PythonLibraryEvaluator; +import org.apache.asterix.external.api.IExternalLangIPCProto; +import org.apache.asterix.external.api.ILibraryEvaluator; import org.apache.asterix.external.library.PythonLibraryEvaluatorFactory; import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM; import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.om.functions.IExternalFunctionDescriptor; import org.apache.asterix.om.pointables.PointableAllocator; import org.apache.asterix.om.types.ATypeTag; @@ -50,6 +51,7 @@ import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneO import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.Warning; +import org.apache.hyracks.data.std.primitive.TaggedValuePointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; @@ -87,7 +89,7 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne private ArrayBackedValueStorage outputWrapper; private List<ArrayBackedValueStorage> argHolders; ArrayTupleBuilder tupleBuilder; - private List<Pair<Long, PythonLibraryEvaluator>> libraryEvaluators; + private List<Pair<Long, ILibraryEvaluator>> libraryEvaluators; private ATypeTag[][] nullCalls; private int[] numCalls; private VoidPointable ref; @@ -97,6 +99,7 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne private MessageUnpackerToADM unpackerToADM; private PointableAllocator pointableAllocator; private MsgPackPointableVisitor pointableVisitor; + private TaggedValuePointable anyPointer; @Override public void open() throws HyracksDataException { @@ -109,7 +112,7 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne try { PythonLibraryEvaluatorFactory evalFactory = new PythonLibraryEvaluatorFactory(ctx); for (IExternalFunctionDescriptor fnDesc : fnDescs) { - PythonLibraryEvaluator eval = evalFactory.getEvaluator(fnDesc.getFunctionInfo(), sourceLoc); + ILibraryEvaluator eval = evalFactory.getEvaluator(fnDesc.getFunctionInfo(), sourceLoc); long id = eval.initialize(fnDesc.getFunctionInfo()); libraryEvaluators.add(new Pair<>(id, eval)); } @@ -133,6 +136,7 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne unpackerToADM = new MessageUnpackerToADM(); pointableAllocator = new PointableAllocator(); pointableVisitor = new MsgPackPointableVisitor(); + anyPointer = TaggedValuePointable.FACTORY.createPointable(); } private void resetBuffers(int numTuples, int[] numCalls) { @@ -177,8 +181,12 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne int numEntries = unpacker.unpackArrayHeader(); for (int j = 0; j < numEntries; j++) { if (ctx.getWarningCollector().shouldWarn()) { - ctx.getWarningCollector().warn(Warning.of(sourceLoc, - ErrorCode.EXTERNAL_UDF_EXCEPTION, unpacker.unpackString())); + //TODO: in domain socket mode, a NUL can appear at the end of the stacktrace strings. + // this should probably not happen but warnings with control characters should + // also be properly escaped + ctx.getWarningCollector() + .warn(Warning.of(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION, + unpacker.unpackString().replace('\0', ' '))); } } } catch (MessagePackException e) { @@ -211,8 +219,8 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne for (int colIdx = 0; colIdx < cols.length; colIdx++) { ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]), tRef.getFieldLength(cols[colIdx])); - ATypeTag argumentPresence = PythonLibraryEvaluator - .peekArgument(fnDescs[func].getArgumentTypes()[colIdx], ref); + ATypeTag argumentPresence = ExternalDataUtils + .peekArgument(fnDescs[func].getArgumentTypes()[colIdx], ref, anyPointer); argumentStatus = handleNullMatrix(func, t, argumentPresence, argumentStatus); } } @@ -224,7 +232,7 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne for (int colIdx = 0; colIdx < cols.length; colIdx++) { ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]), tRef.getFieldLength(cols[colIdx])); - PythonIPCProto.visitValueRef(fnDescs[func].getArgumentTypes()[colIdx], + IExternalLangIPCProto.visitValueRef(fnDescs[func].getArgumentTypes()[colIdx], argHolders.get(func).getDataOutput(), ref, pointableAllocator, pointableVisitor, fnDescs[func].getFunctionInfo().getNullCall()); } @@ -232,21 +240,25 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne numCalls[func]--; } if (cols.length == 0) { - PythonLibraryEvaluator.setVoidArgument(argHolders.get(func)); + ExternalDataUtils.setVoidArgument(argHolders.get(func)); } } } //TODO: maybe this could be done in parallel for each unique library evaluator? for (int argHolderIdx = 0; argHolderIdx < argHolders.size(); argHolderIdx++) { - Pair<Long, PythonLibraryEvaluator> fnEval = libraryEvaluators.get(argHolderIdx); - ByteBuffer columnResult = fnEval.getSecond().callPythonMulti(fnEval.getFirst(), + Pair<Long, ILibraryEvaluator> fnEval = libraryEvaluators.get(argHolderIdx); + ByteBuffer columnResult = fnEval.getSecond().callMulti(fnEval.getFirst(), argHolders.get(argHolderIdx), numCalls[argHolderIdx]); if (columnResult != null) { Pair<ByteBuffer, Counter> resultholder = batchResults.get(argHolderIdx); - if (resultholder.getFirst().capacity() < columnResult.capacity()) { - ByteBuffer realloc = ctx.reallocateFrame(resultholder.getFirst(), - columnResult.capacity() * 2, false); + if (resultholder.getFirst().capacity() < columnResult.remaining()) { + ByteBuffer realloc = + ctx.reallocateFrame(resultholder.getFirst(), + ctx.getInitialFrameSize() + * ((columnResult.remaining() / ctx.getInitialFrameSize()) + 1), + false); + realloc.limit(columnResult.limit()); resultholder.setFirst(realloc); } ByteBuffer resultBuf = resultholder.getFirst(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 62dc07425f..5bf584487e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -35,6 +35,7 @@ import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.val import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties; import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties; import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS; +import static org.msgpack.core.MessagePack.Code.ARRAY16; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -66,11 +67,15 @@ import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; import org.apache.asterix.external.library.JavaLibrary; +import org.apache.asterix.external.library.msgpack.MessagePackUtils; import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions; import org.apache.asterix.external.util.aws.s3.S3Utils; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.AUnionType; +import org.apache.asterix.om.types.EnumDeserializer; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.types.TypeTagUtil; import org.apache.asterix.runtime.evaluators.common.NumberUtils; import org.apache.asterix.runtime.projection.DataProjectionInfo; import org.apache.asterix.runtime.projection.FunctionCallInformation; @@ -78,6 +83,9 @@ import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.TaggedValuePointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory; @@ -386,7 +394,8 @@ public class ExternalDataUtils { /** * Fills the configuration of the external dataset and its adapter with default values if not provided by user. * - * @param configuration external data configuration + * @param configuration + * external data configuration */ public static void defaultConfiguration(Map<String, String> configuration) { String format = configuration.get(ExternalDataConstants.KEY_FORMAT); @@ -408,8 +417,10 @@ public class ExternalDataUtils { * Prepares the configuration of the external data and its adapter by filling the information required by * adapters and parsers. * - * @param adapterName adapter name - * @param configuration external data configuration + * @param adapterName + * adapter name + * @param configuration + * external data configuration */ public static void prepare(String adapterName, Map<String, String> configuration) { if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) { @@ -431,7 +442,8 @@ public class ExternalDataUtils { * Normalizes the values of certain parameters of the adapter configuration. This should happen before persisting * the metadata (e.g. when creating external datasets or feeds) and when creating an adapter factory. * - * @param configuration external data configuration + * @param configuration + * external data configuration */ public static void normalize(Map<String, String> configuration) { // normalize the "format" parameter @@ -451,8 +463,10 @@ public class ExternalDataUtils { /** * Validates the parameter values of the adapter configuration. This should happen after normalizing the values. * - * @param configuration external data configuration - * @throws HyracksDataException HyracksDataException + * @param configuration + * external data configuration + * @throws HyracksDataException + * HyracksDataException */ public static void validate(Map<String, String> configuration) throws HyracksDataException { String format = configuration.get(ExternalDataConstants.KEY_FORMAT); @@ -514,7 +528,8 @@ public class ExternalDataUtils { * Validates adapter specific external dataset properties. Specific properties for different adapters should be * validated here * - * @param configuration properties + * @param configuration + * properties */ public static void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc, IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { @@ -542,7 +557,8 @@ public class ExternalDataUtils { /** * Regex matches all the provided patterns against the provided path * - * @param path path to check against + * @param path + * path to check against * @return {@code true} if all patterns match, {@code false} otherwise */ public static boolean matchPatterns(List<Matcher> matchers, String path) { @@ -557,7 +573,8 @@ public class ExternalDataUtils { /** * Converts the wildcard to proper regex * - * @param pattern wildcard pattern to convert + * @param pattern + * wildcard pattern to convert * @return regex expression */ public static String patternToRegex(String pattern) { @@ -646,7 +663,8 @@ public class ExternalDataUtils { /** * Adjusts the prefix (if needed) and returns it * - * @param configuration configuration + * @param configuration + * configuration */ public static String getPrefix(Map<String, String> configuration) { return getPrefix(configuration, true); @@ -661,8 +679,10 @@ public class ExternalDataUtils { } /** - * @param configuration configuration map - * @throws CompilationException Compilation exception + * @param configuration + * configuration map + * @throws CompilationException + * Compilation exception */ public static void validateIncludeExclude(Map<String, String> configuration) throws CompilationException { // Ensure that include and exclude are not provided at the same time + ensure valid format or property @@ -746,8 +766,10 @@ public class ExternalDataUtils { /** * Validate Parquet dataset's declared type and configuration * - * @param properties external dataset configuration - * @param datasetRecordType dataset declared type + * @param properties + * external dataset configuration + * @param datasetRecordType + * dataset declared type */ public static void validateParquetTypeAndConfiguration(Map<String, String> properties, ARecordType datasetRecordType) throws CompilationException { @@ -780,7 +802,8 @@ public class ExternalDataUtils { /** * Serialize {@link ARecordType} as Base64 string to pass it to {@link org.apache.hadoop.conf.Configuration} * - * @param expectedType expected type + * @param expectedType + * expected type * @return the expected type as Base64 string */ private static String serializeExpectedTypeToString(ARecordType expectedType) throws IOException { @@ -799,7 +822,8 @@ public class ExternalDataUtils { * Serialize {@link FunctionCallInformation} map as Base64 string to pass it to * {@link org.apache.hadoop.conf.Configuration} * - * @param functionCallInfoMap function information map + * @param functionCallInfoMap + * function information map * @return function information map as Base64 string */ static String serializeFunctionCallInfoToString(Map<String, FunctionCallInformation> functionCallInfoMap) @@ -830,4 +854,22 @@ public class ExternalDataUtils { public static Optional<String> getFirstNotNull(Map<String, String> configuration, String... parameters) { return Arrays.stream(parameters).filter(field -> configuration.get(field) != null).findFirst(); } + + public static ATypeTag peekArgument(IAType type, IValueReference valueReference, TaggedValuePointable pointy) + throws HyracksDataException { + ATypeTag tag = type.getTypeTag(); + if (tag == ATypeTag.ANY) { + pointy.set(valueReference); + ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag()); + IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag); + return MessagePackUtils.peekUnknown(rtType); + } else { + return MessagePackUtils.peekUnknown(type); + } + } + + public static void setVoidArgument(ArrayBackedValueStorage argHolder) throws IOException { + argHolder.getDataOutput().writeByte(ARRAY16); + argHolder.getDataOutput().writeShort((short) 0); + } } diff --git a/asterixdb/asterix-docker/docker/.gitattributes b/asterixdb/asterix-podman/docker/.gitattributes similarity index 100% rename from asterixdb/asterix-docker/docker/.gitattributes rename to asterixdb/asterix-podman/docker/.gitattributes diff --git a/asterixdb/asterix-docker/docker/Dockerfile b/asterixdb/asterix-podman/docker/Dockerfile similarity index 100% rename from asterixdb/asterix-docker/docker/Dockerfile rename to asterixdb/asterix-podman/docker/Dockerfile diff --git a/asterixdb/asterix-docker/docker/asterix-configuration.xml b/asterixdb/asterix-podman/docker/asterix-configuration.xml similarity index 100% rename from asterixdb/asterix-docker/docker/asterix-configuration.xml rename to asterixdb/asterix-podman/docker/asterix-configuration.xml diff --git a/asterixdb/asterix-docker/docker/fbm.adm b/asterixdb/asterix-podman/docker/fbm.adm similarity index 100% rename from asterixdb/asterix-docker/docker/fbm.adm rename to asterixdb/asterix-podman/docker/fbm.adm diff --git a/asterixdb/asterix-docker/docker/fbu.adm b/asterixdb/asterix-podman/docker/fbu.adm similarity index 100% rename from asterixdb/asterix-docker/docker/fbu.adm rename to asterixdb/asterix-podman/docker/fbu.adm diff --git a/asterixdb/asterix-docker/docker/supervisord.conf b/asterixdb/asterix-podman/docker/supervisord.conf similarity index 100% copy from asterixdb/asterix-docker/docker/supervisord.conf copy to asterixdb/asterix-podman/docker/supervisord.conf diff --git a/asterixdb/asterix-docker/docker/twm.adm b/asterixdb/asterix-podman/docker/twm.adm similarity index 100% rename from asterixdb/asterix-docker/docker/twm.adm rename to asterixdb/asterix-podman/docker/twm.adm diff --git a/asterixdb/asterix-docker/docker/twu.adm b/asterixdb/asterix-podman/docker/twu.adm similarity index 100% rename from asterixdb/asterix-docker/docker/twu.adm rename to asterixdb/asterix-podman/docker/twu.adm diff --git a/asterixdb/asterix-podman/pom.xml b/asterixdb/asterix-podman/pom.xml new file mode 100644 index 0000000000..3d32518587 --- /dev/null +++ b/asterixdb/asterix-podman/pom.xml @@ -0,0 +1,156 @@ +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>apache-asterixdb</artifactId> + <groupId>org.apache.asterix</groupId> + <version>0.9.8-SNAPSHOT</version> + </parent> + <artifactId>asterix-podman</artifactId> + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>asterix-server</artifactId> + <version>${project.version}</version> + <type>deb</type> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.asterix</groupId> + <artifactId>asterix-app</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.asterix</groupId> + <artifactId>asterix-test-framework</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>1.17.1</version> + <scope>test</scope> + </dependency> + </dependencies> + + <properties> + <root.dir>${basedir}/..</root.dir> + </properties> + + <licenses> + <license> + <name>Apache License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + <comments>A business-friendly OSS license</comments> + </license> + </licenses> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/setup.sh</exclude> + <exclude>src/test/resources/passwd</exclude> + <exclude>src/test/resources/socktest/Containerfile</exclude> + <exclude>src/test/resources/testenv.conf</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + <profiles> + <profile> + <id>podman.tests</id> + <properties> + <test.excludes>**/*.java</test.excludes> + <itest.includes>**/PodmanPythonFunctionIT.java</itest.includes> + <failIfNoTests>false</failIfNoTests> + </properties> + <build> + <plugins> + <plugin> + <groupId>nl.lexemmens</groupId> + <artifactId>podman-maven-plugin</artifactId> + <version>1.8.0</version> + <executions> + <execution> + <goals> + <goal>build</goal> + </goals> + <phase>generate-test-resources</phase> + </execution> + </executions> + <configuration> + <skipAuth>true</skipAuth> + <images> + <image> + <name>asterixdb/socktest</name> + <build> + <pull>false</pull> + <createLatestTag>true</createLatestTag> + <containerFileDir>src/test/resources/socktest</containerFileDir> + </build> + </image> + </images> + </configuration> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-external-data-resources</id> + <phase>generate-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>target/</outputDirectory> + <overwrite>true</overwrite> + <resources> + <resource> + <directory>../asterix-server/target</directory> + <includes> + <include>asterix-server*.deb</include> + </includes> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> diff --git a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanPythonFunctionIT.java b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanPythonFunctionIT.java new file mode 100644 index 0000000000..f0f89cd67f --- /dev/null +++ b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanPythonFunctionIT.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.podman; + +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.test.runtime.ExecutionTestUtil; +import org.apache.asterix.test.runtime.LangExecutionUtil; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +import com.github.dockerjava.api.DockerClient; + +/** + * Runs the Python UDF tests within a container using domain sockets. + */ +@RunWith(Parameterized.class) +public class PodmanPythonFunctionIT { + public static final DockerImageName ASTERIX_IMAGE = DockerImageName.parse("asterixdb/socktest"); + @ClassRule + public static GenericContainer<?> asterix = new GenericContainer(ASTERIX_IMAGE).withExposedPorts(19004, 5006, 19002) + .withFileSystemBind("../asterix-app/", "/var/tmp/asterix-app/", BindMode.READ_WRITE); + protected static final String TEST_CONFIG_FILE_NAME = "../asterix-app/src/test/resources/cc.conf"; + private static final boolean cleanupOnStop = true; + + @BeforeClass + public static void setUp() throws Exception { + final TestExecutor testExecutor = new TestExecutor( + List.of(InetSocketAddress.createUnresolved(asterix.getHost(), asterix.getMappedPort(19002)))); + asterix.execInContainer("/opt/setup.sh"); + LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor, false, true, new PodmanUDFLibrarian(asterix)); + setEndpoints(testExecutor); + testExecutor.waitForClusterActive(60, TimeUnit.SECONDS); + } + + @AfterClass + public static void tearDown() throws Exception { + try { + } finally { + ExecutionTestUtil.tearDown(cleanupOnStop); + DockerClient dc = DockerClientFactory.instance().client(); + dc.removeImageCmd(ASTERIX_IMAGE.asCanonicalNameString()).withForce(true).exec(); + } + } + + @Parameters(name = "PodmanPythonFunctionIT {index}: {0}") + public static Collection<Object[]> tests() throws Exception { + return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_it_python.xml", + "../asterix-app/src/test/resources/runtimets"); + } + + protected TestCaseContext tcCtx; + + public PodmanPythonFunctionIT(TestCaseContext tcCtx) { + this.tcCtx = tcCtx; + } + + @Test + public void test() throws Exception { + LangExecutionUtil.test(tcCtx); + } + + private static void setEndpoints(TestExecutor testExecutor) { + final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>(); + final String ip = asterix.getHost(); + final String nodeId = "asterix_nc"; + int apiPort = asterix.getMappedPort(19004); + ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort)); + testExecutor.setNcEndPoints(ncEndPoints); + } +} diff --git a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java new file mode 100644 index 0000000000..025f607a30 --- /dev/null +++ b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.podman; + +import java.io.IOException; +import java.net.URI; + +import org.apache.asterix.app.external.IExternalUDFLibrarian; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.json.JsonReadFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class PodmanUDFLibrarian implements IExternalUDFLibrarian { + final GenericContainer<?> asterix; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public PodmanUDFLibrarian(GenericContainer asterix) { + OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true); + this.asterix = asterix; + } + + @Override + public void install(URI path, String type, String libPath, Pair<String, String> credentials) throws Exception { + Container.ExecResult curlResult = null; + int retryCt = 0; + while (retryCt < 10) { + try { + curlResult = asterix.execInContainer("curl", "--no-progress-meter", "-X", "POST", "-u", + credentials.first + ":" + credentials.second, "-F", + "data=@" + "/var/tmp/asterix-app/" + libPath, "-F", "type=" + type, + "http://localhost:19004" + path.getRawPath()); + handleResponse(curlResult); + return; + } catch (RuntimeException e) { + retryCt++; + if (retryCt > 9) + throw e; + } + } + } + + @Override + public void uninstall(URI path, Pair<String, String> credentials) throws IOException, AsterixException { + try { + Container.ExecResult curlResult = asterix.execInContainer("curl", "-X", "DELETE", "-u", + credentials.first + ":" + credentials.second, "http://localhost:19004" + path.getPath()); + handleResponse(curlResult); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private void handleResponse(Container.ExecResult result) throws AsterixException, JsonProcessingException { + if (result.getExitCode() != 0) { + throw new AsterixException(result.getStderr()); + } + JsonNode resp = OBJECT_MAPPER.readTree(result.getStdout().replace('\0', ' ')); + if (resp.has("error")) { + throw new AsterixException(resp.get("error").toString()); + } + return; + } +} diff --git a/asterixdb/asterix-docker/docker/supervisord.conf b/asterixdb/asterix-podman/src/test/resources/cc.conf similarity index 55% copy from asterixdb/asterix-docker/docker/supervisord.conf copy to asterixdb/asterix-podman/src/test/resources/cc.conf index 20f1797ed8..e4cbd73e48 100644 --- a/asterixdb/asterix-docker/docker/supervisord.conf +++ b/asterixdb/asterix-podman/src/test/resources/cc.conf @@ -15,26 +15,22 @@ ; specific language governing permissions and limitations ; under the License. -[supervisord] -nodaemon=true +[nc/asterix_nc1] +txn.log.dir=/opt/apache-asterixdb/data/txnlog +core.dump.dir=/opt/apache-asterixdb/logs/coredump +iodevices=/opt/apache-asterixdb/data/ +nc.api.port=19004 -[program:asterixnc1] -command=/asterixdb/bin/asterixncservice -logdir - -config-file "/asterixdb/opt/local/conf/blue.conf" -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +[nc] +address=127.0.0.1 +command=asterixnc +credential.file=/opt/apache-asterixdb/etc/passwd +jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5006 +python.ds.path = /tmp/pyudf.socket -[program:asterixnc2] -command=/asterixdb/bin/asterixncservice -logdir - -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +[cc] +address = 127.0.0.1 -[program:asterixcc] -command=/asterixdb/bin/asterixcc -config-file "/asterixdb/opt/local/conf/cc.conf" -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +[common] +log.level = INFO +log.dir = /opt/apache-asterixdb/logs/ diff --git a/asterixdb/asterix-podman/src/test/resources/passwd b/asterixdb/asterix-podman/src/test/resources/passwd new file mode 100644 index 0000000000..a1ea5b03a2 --- /dev/null +++ b/asterixdb/asterix-podman/src/test/resources/passwd @@ -0,0 +1 @@ +admin:$2a$12$JxgDzf/uOn1NS2Y3exhrDOf7JY/eUHQH7HeH90s5Ye2gALoO0FsQy diff --git a/asterixdb/asterix-podman/src/test/resources/setup.sh b/asterixdb/asterix-podman/src/test/resources/setup.sh new file mode 100644 index 0000000000..e3523aaa5e --- /dev/null +++ b/asterixdb/asterix-podman/src/test/resources/setup.sh @@ -0,0 +1,8 @@ +#!/bin/bash +cd /var/tmp/asterix-app/ +shiv -o target/TweetSent.pyz --site-packages src/test/resources/TweetSent scikit-learn +cp -a /var/tmp/asterix-app/data/classifications /opt/apache-asterixdb/data/ +cp -a /var/tmp/asterix-app/data/twitter /opt/apache-asterixdb/data/ +cp -a /var/tmp/asterix-app/data/big-object /opt/apache-asterixdb/data/ +mkdir -p /opt/apache-asterixdb/target/data/ +cp -a /var/tmp/asterix-app/target/data/big-object /opt/apache-asterixdb/target/data/ \ No newline at end of file diff --git a/asterixdb/asterix-podman/src/test/resources/socktest/Containerfile b/asterixdb/asterix-podman/src/test/resources/socktest/Containerfile new file mode 100644 index 0000000000..a7546d5a7d --- /dev/null +++ b/asterixdb/asterix-podman/src/test/resources/socktest/Containerfile @@ -0,0 +1,17 @@ +FROM ubuntu:22.04 +RUN apt -y update +RUN DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt -y install systemd openjdk-17-jre-headless unzip wget curl python3-pip python3-venv python3-systemd +RUN pip3 install shiv msgpack +COPY target/asterix-server_*all.deb . +RUN dpkg -i asterix-server*.deb +COPY src/test/resources/cc.conf /opt/apache-asterixdb/cc.conf +COPY src/test/resources/passwd /opt/apache-asterixdb/etc/passwd +RUN mkdir -p /etc/systemd/system/[email protected]/ +COPY src/test/resources/testenv.conf /etc/systemd/system/[email protected]/ +COPY src/test/resources/setup.sh /opt +RUN chmod +x /opt/setup.sh +RUN systemctl enable asterix-nc asterix-cc pyudf.socket + +EXPOSE 19001 19002 19004 + +CMD [ "/lib/systemd/systemd" ] diff --git a/asterixdb/asterix-podman/src/test/resources/testenv.conf b/asterixdb/asterix-podman/src/test/resources/testenv.conf new file mode 100644 index 0000000000..0c2f182c35 --- /dev/null +++ b/asterixdb/asterix-podman/src/test/resources/testenv.conf @@ -0,0 +1,3 @@ +[Service] +Environment="FOO=BAR=BAZ" +Environment="BAR=BAZ" diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml index 6c2a05adc3..e1d09640ba 100644 --- a/asterixdb/asterix-server/pom.xml +++ b/asterixdb/asterix-server/pom.xml @@ -978,7 +978,7 @@ <plugin> <artifactId>jdeb</artifactId> <groupId>org.vafer</groupId> - <version>1.5</version> + <version>1.8</version> <executions> <execution> <phase>package</phase> @@ -988,26 +988,36 @@ <configuration> <dataSet> <data> - <src>${project.build.directory}/${project.build.finalName}-binary-assembly/apache-asterixdb-${project.version}/</src> - <excludes>bin/**</excludes> + <src>${project.build.directory}/${project.build.finalName}-binary-assembly/apache-asterixdb-${project.version}</src> <type>directory</type> <mapper> <type>perm</type> - <prefix>/opt/apache-asterixdb-${project.version}/</prefix> - <user>asterixdb</user> - <group>asterixdb</group> + <prefix>/opt/apache-asterixdb/</prefix> + <user>root</user> + <group>root</group> + <filemode>755</filemode> + </mapper> + </data> + <data> + <type>file</type> + <src>src/deb/systemd/cc.conf</src> + <mapper> + <prefix>/opt/apache-asterixdb/</prefix> + <type>perm</type> + <user>root</user> + <group>root</group> <filemode>644</filemode> </mapper> </data> <data> - <src>${project.build.directory}/${project.build.finalName}-binary-assembly/apache-asterixdb-${project.version}/bin</src> - <type>directory</type> + <type>file</type> + <src>src/deb/udf_listener.py</src> <mapper> + <prefix>/opt/apache-asterixdb/bin</prefix> <type>perm</type> - <prefix>/opt/apache-asterixdb-${project.version}/bin</prefix> - <user>asterixdb</user> - <group>asterixdb</group> - <filemode>754</filemode> + <user>root</user> + <group>root</group> + <filemode>555</filemode> </mapper> </data> <data> @@ -1030,6 +1040,39 @@ <group>root</group> </mapper> </data> + <data> + <type>file</type> + <src>src/deb/systemd/pyudf.socket</src> + <mapper> + <prefix>/lib/systemd/system</prefix> + <type>perm</type> + <user>root</user> + <group>root</group> + </mapper> + </data> + <data> + <type>file</type> + <src>src/deb/systemd/[email protected]</src> + <mapper> + <prefix>/lib/systemd/system</prefix> + <type>perm</type> + <user>root</user> + <group>root</group> + </mapper> + </data> + <data> + <type>template</type> + <paths> + <path>/opt/apache-asterixdb/logs</path> + <path>/opt/apache-asterixdb/data</path> + </paths> + <mapper> + <type>perm</type> + <user>asterixdb</user> + <group>asterixdb</group> + <filemode>750</filemode> + </mapper> + </data> </dataSet> </configuration> </execution> diff --git a/asterixdb/asterix-server/src/deb/control/control b/asterixdb/asterix-server/src/deb/control/control index 1f6c213e95..77bbd1df78 100644 --- a/asterixdb/asterix-server/src/deb/control/control +++ b/asterixdb/asterix-server/src/deb/control/control @@ -17,8 +17,7 @@ Version: [[version]] Section: databases Priority: extra Architecture: all -Depends: jdk (>= 1.8) +Depends: java17-runtime-headless Maintainer: Ian Maxon <[email protected]> Description: Apache AsterixDB - a scalable, open source Big Data Management System (BDMS) -Distribution: development -Depends: default-jre | java8-runtime +Distribution: development \ No newline at end of file diff --git a/asterixdb/asterix-server/src/deb/control/postinst b/asterixdb/asterix-server/src/deb/control/postinst index 896ca28b4e..fe5c912155 100644 --- a/asterixdb/asterix-server/src/deb/control/postinst +++ b/asterixdb/asterix-server/src/deb/control/postinst @@ -13,5 +13,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -adduser --system --group --quiet --home /opt/apache-asterixdb/ \ ---no-create-home --disabled-login --force-badname asterixdb +chmod -R 755 /opt/apache-asterixdb/ \ No newline at end of file diff --git a/asterixdb/asterix-server/src/deb/control/preinst b/asterixdb/asterix-server/src/deb/control/preinst index 4509c90586..8d14847d47 100644 --- a/asterixdb/asterix-server/src/deb/control/preinst +++ b/asterixdb/asterix-server/src/deb/control/preinst @@ -13,3 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +adduser --system --group --quiet --home /opt/apache-asterixdb/ \ +--no-create-home --disabled-login --force-badname asterixdb +adduser --system --group --quiet --home /opt/apache-asterixdb/ \ +--no-create-home --disabled-login --force-badname asterixdb-udf \ No newline at end of file diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service b/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service index 9711fba6c1..2a52e2def9 100644 --- a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service +++ b/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service @@ -19,8 +19,9 @@ After=network.target [Service] Type=simple User=asterixdb -ExecStart=/opt/apache-asterixdb/bin/asterixcc --config-file /opt/apache-asterixdb/cc.conf +ExecStart=/opt/apache-asterixdb/bin/asterixcc -config-file "/opt/apache-asterixdb/cc.conf" Restart=on-abort +WorkingDirectory=/opt/apache-asterixdb [Install] WantedBy=multi-user.target diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service b/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service index bfe62966cd..e09d8e8202 100644 --- a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service +++ b/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service @@ -21,6 +21,7 @@ Type=simple User=asterixdb ExecStart=/opt/apache-asterixdb/bin/asterixncservice Restart=on-abort +WorkingDirectory=/opt/apache-asterixdb [Install] WantedBy=multi-user.target diff --git a/asterixdb/asterix-docker/docker/supervisord.conf b/asterixdb/asterix-server/src/deb/systemd/cc.conf similarity index 55% rename from asterixdb/asterix-docker/docker/supervisord.conf rename to asterixdb/asterix-server/src/deb/systemd/cc.conf index 20f1797ed8..0af967a395 100644 --- a/asterixdb/asterix-docker/docker/supervisord.conf +++ b/asterixdb/asterix-server/src/deb/systemd/cc.conf @@ -15,26 +15,19 @@ ; specific language governing permissions and limitations ; under the License. -[supervisord] -nodaemon=true +[nc/asterix_nc1] +txn.log.dir=/opt/apache-asterixdb/data/txnlog +core.dump.dir=/opt/apache-asterixdb/logs/coredump +iodevices=/opt/apache-asterixdb/data/ +nc.api.port=19004 -[program:asterixnc1] -command=/asterixdb/bin/asterixncservice -logdir - -config-file "/asterixdb/opt/local/conf/blue.conf" -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +[nc] +address=127.0.0.1 +command=asterixnc -[program:asterixnc2] -command=/asterixdb/bin/asterixncservice -logdir - -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +[cc] +address = 127.0.0.1 -[program:asterixcc] -command=/asterixdb/bin/asterixcc -config-file "/asterixdb/opt/local/conf/cc.conf" -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 +[common] +log.level = INFO +log.dir = /opt/apache-asterixdb/logs/ diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service b/asterixdb/asterix-server/src/deb/systemd/pyudf.socket similarity index 78% copy from asterixdb/asterix-server/src/deb/systemd/asterix-nc.service copy to asterixdb/asterix-server/src/deb/systemd/pyudf.socket index bfe62966cd..4e731db8b4 100644 --- a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service +++ b/asterixdb/asterix-server/src/deb/systemd/pyudf.socket @@ -13,14 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. [Unit] -Description=Apache AsterixDB Node Controller Daemon -After=network.target +Description=AsterixDB UDF Domain Socket +PartOf=asterixdb_udf.service -[Service] -Type=simple -User=asterixdb -ExecStart=/opt/apache-asterixdb/bin/asterixncservice -Restart=on-abort +[Socket] +ListenStream=/tmp/pyudf.socket +SocketMode=0660 +SocketUser=asterixdb-udf +SocketGroup=asterixdb +Accept=true +DeferAcceptSec=1 [Install] -WantedBy=multi-user.target +WantedBy=sockets.target \ No newline at end of file diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service b/asterixdb/asterix-server/src/deb/systemd/[email protected] similarity index 75% copy from asterixdb/asterix-server/src/deb/systemd/asterix-cc.service copy to asterixdb/asterix-server/src/deb/systemd/[email protected] index 9711fba6c1..9856142e97 100644 --- a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service +++ b/asterixdb/asterix-server/src/deb/systemd/[email protected] @@ -1,3 +1,4 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -13,14 +14,17 @@ # See the License for the specific language governing permissions and # limitations under the License. [Unit] -Description=Apache AsterixDB Cluster Controller -After=network.target +Description=AsterixDB UDF Executor Service +After=network.target pyudf.socket +Requires=pyudf.socket [Service] +User=asterixdb-udf Type=simple -User=asterixdb -ExecStart=/opt/apache-asterixdb/bin/asterixcc --config-file /opt/apache-asterixdb/cc.conf -Restart=on-abort +ExecStart=/usr/bin/python3 /opt/apache-asterixdb/bin/udf_listener.py +TimeoutStopSec=5 +StandardError=journal +StandardError=journal [Install] -WantedBy=multi-user.target +WantedBy=default.target \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-server/src/deb/udf_listener.py old mode 100755 new mode 100644 similarity index 88% copy from asterixdb/asterix-app/src/main/resources/entrypoint.py copy to asterixdb/asterix-server/src/deb/udf_listener.py index 7bad7ef485..03874b2136 --- a/asterixdb/asterix-app/src/main/resources/entrypoint.py +++ b/asterixdb/asterix-server/src/deb/udf_listener.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -16,12 +17,10 @@ # under the License. import sys -from os import pathsep -addr = str(sys.argv[1]) -port = str(sys.argv[2]) -paths = sys.argv[3] -for p in paths.split(pathsep): - sys.path.append(p) +from systemd.daemon import listen_fds +from os import chdir +from os import getcwd +from os import getpid from struct import * import signal import msgpack @@ -32,6 +31,7 @@ from pathlib import Path from enum import IntEnum from io import BytesIO + PROTO_VERSION = 1 HEADER_SZ = 8 + 8 + 1 REAL_HEADER_SZ = 4 + 8 + 8 + 1 @@ -66,8 +66,8 @@ class Wrapper(object): resp = None unpacked_msg = None msg_type = None - packer = msgpack.Packer(autoreset=False) - unpacker = msgpack.Unpacker() + packer = msgpack.Packer(autoreset=False, use_bin_type=False) + unpacker = msgpack.Unpacker(raw=False) response_buf = BytesIO() stdin_buf = BytesIO() wrapped_fns = {} @@ -102,6 +102,7 @@ class Wrapper(object): cwd = Path('.').resolve() module_path = Path(module.__file__).resolve() return cwd in module_path.parents + return True def read_header(self, readbuf): self.sz, self.mid, self.rmid, self.flag = unpack( @@ -130,14 +131,19 @@ class Wrapper(object): self.send_msg() self.packer.reset() + def cd(self, basedir): + chdir(basedir + "/site-packages") + sys.path.insert(0,getcwd()) + def helo(self): # need to ack the connection back before sending actual HELO - self.init_remote_ipc() + # self.init_remote_ipc() + self.cd(self.unpacked_msg[1][1]) self.flag = MessageFlags.NORMAL self.response_buf.seek(0) self.packer.pack(int(MessageType.HELO)) - self.packer.pack("HELO") - dlen = 5 # tag(1) + body(4) + self.packer.pack(int(getpid())) + dlen = len(self.packer.bytes()) # tag(1) + body(4) resp_len = self.write_header(self.response_buf, dlen) self.response_buf.write(self.packer.bytes()) self.resp = self.response_buf.getbuffer()[0:resp_len] @@ -198,7 +204,7 @@ class Wrapper(object): self.flag = MessageFlags.NORMAL self.packer.reset() self.response_buf.seek(0) - body = msgpack.packb(e) + body = msgpack.packb(str(e)) dlen = len(body) + 1 # 1 for tag resp_len = self.write_header(self.response_buf, dlen) self.packer.pack(int(MessageType.ERROR)) @@ -217,9 +223,8 @@ class Wrapper(object): MessageType.CALL: handle_call } - def connect_sock(self, addr, port): - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.connect((addr, int(port))) + def connect_sock(self): + self.sock = socket.fromfd(listen_fds()[0], socket.AF_UNIX, socket.SOCK_STREAM) def disconnect_sock(self, *args): self.sock.shutdown(socket.SHUT_RDWR) @@ -227,26 +232,26 @@ class Wrapper(object): def recv_msg(self): while self.alive: - pos = sys.stdin.buffer.readinto1(self.readbuf) + pos = self.sock.recv_into(self.readbuf) if pos <= 0: self.alive = False return try: while pos < REAL_HEADER_SZ: - read = sys.stdin.buffer.readinto1(self.readview[pos:]) + read = self.sock.recv_into(self.readview[pos:]) if read <= 0: self.alive = False return pos += read self.read_header(self.readview) while pos < self.sz and len(self.readbuf) - pos > 0: - read = sys.stdin.buffer.readinto1(self.readview[pos:]) + read = self.sock.recv_into(self.readview[pos:]) if read <= 0: self.alive = False return pos += read while pos < self.sz: - vszchunk = sys.stdin.buffer.read1(FRAMESZ) + vszchunk = self.sock.recv(4096) if len(vszchunk) == 0: self.alive = False return @@ -258,8 +263,8 @@ class Wrapper(object): self.unpacked_msg = list(self.unpacker) self.msg_type = MessageType(self.unpacked_msg[0]) self.type_handler[self.msg_type](self) - except BaseException: - self.handle_error(traceback.format_exc()) + except BaseException as e: + self.handle_error(''.join(traceback.format_exc())) def send_msg(self): self.sock.sendall(self.resp) @@ -273,6 +278,6 @@ class Wrapper(object): wrap = Wrapper() -wrap.connect_sock(addr, port) +wrap.connect_sock() signal.signal(signal.SIGTERM, wrap.disconnect_sock) wrap.recv_loop() diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index 9fa2fc1d0e..dcd0978c38 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -931,7 +931,7 @@ <module>asterix-test-framework</module> <module>asterix-maven-plugins</module> <module>asterix-server</module> - <module>asterix-docker</module> + <module>asterix-podman</module> <module>asterix-doc</module> <module>asterix-fuzzyjoin</module> <module>asterix-replication</module> diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 01cb9bfc9d..bb40e2b728 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -99,6 +99,7 @@ public class NCConfig extends ControllerConfig { PYTHON_USE_BUNDLED_MSGPACK(BOOLEAN, true), PYTHON_ARGS(STRING_ARRAY, (String[]) null), PYTHON_ENV(STRING_ARRAY, (String[]) null), + PYTHON_DS_PATH(STRING, (String) null), CREDENTIAL_FILE( OptionTypes.STRING, (Function<IApplicationConfig, String>) appConfig -> FileUtil @@ -248,6 +249,8 @@ public class NCConfig extends ControllerConfig { return "Whether or not to attempt to automatically set PYTHON_CMD to a usable interpreter"; case PYTHON_ENV: return "List of environment variables to set when invoking the Python interpreter for Python UDFs. E.g. FOO=1"; + case PYTHON_DS_PATH: + return "Path to systemd socket for fenced Python UDFs. Requires JDK17+, *nix operating system, and "; case CREDENTIAL_FILE: return "Path to HTTP basic credentials"; default:
