This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d8e82b4 Support starting standalone server in Docker image (#6102)
d8e82b4 is described below
commit d8e82b4ae68cde48e1118995d58d4b5967995ebc
Author: kezhenxu94 <[email protected]>
AuthorDate: Sun Sep 5 18:23:27 2021 +0800
Support starting standalone server in Docker image (#6102)
Also remove unused class
---
.../dolphinscheduler/supervisor/supervisor.ini | 15 ++
docker/build/startup.sh | 17 +-
.../server/master/future/TaskFuture.java | 175 ---------------------
3 files changed, 26 insertions(+), 181 deletions(-)
diff --git a/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini
b/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini
index c8c4e12..19166f4 100644
--- a/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini
+++ b/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini
@@ -90,3 +90,18 @@ killasgroup=true
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
+
+[program:standalone]
+command=%(ENV_DOLPHINSCHEDULER_BIN)s/dolphinscheduler-daemon.sh start
standalone-server
+directory=%(ENV_DOLPHINSCHEDULER_HOME)s
+priority=999
+autostart=%(ENV_STANDALONE_START_ENABLED)s
+autorestart=true
+startsecs=5
+stopwaitsecs=3
+exitcodes=0
+stopasgroup=true
+killasgroup=true
+redirect_stderr=true
+stdout_logfile=/dev/fd/1
+stdout_logfile_maxbytes=0
diff --git a/docker/build/startup.sh b/docker/build/startup.sh
index ae1ed36..7f3b7d0 100755
--- a/docker/build/startup.sh
+++ b/docker/build/startup.sh
@@ -24,6 +24,7 @@ export WORKER_START_ENABLED=false
export API_START_ENABLED=false
export ALERT_START_ENABLED=false
export LOGGER_START_ENABLED=false
+export STANDALONE_START_ENABLED=false
# wait database
waitDatabase() {
@@ -67,12 +68,13 @@ waitZK() {
printUsage() {
echo -e "Dolphin Scheduler is a distributed and easy-to-expand visual DAG
workflow scheduling system,"
echo -e "dedicated to solving the complex dependencies in data processing,
making the scheduling system out of the box for data processing.\n"
- echo -e "Usage: [ all | master-server | worker-server | api-server |
alert-server ]\n"
- printf "%-13s: %s\n" "all" "Run master-server, worker-server,
api-server and alert-server"
- printf "%-13s: %s\n" "master-server" "MasterServer is mainly responsible
for DAG task split, task submission monitoring."
- printf "%-13s: %s\n" "worker-server" "WorkerServer is mainly responsible
for task execution and providing log services."
- printf "%-13s: %s\n" "api-server" "ApiServer is mainly responsible for
processing requests and providing the front-end UI layer."
- printf "%-13s: %s\n" "alert-server" "AlertServer mainly include Alarms."
+ echo -e "Usage: [ all | master-server | worker-server | api-server |
alert-server | standalone-server ]\n"
+ printf "%-13s: %s\n" "all" "Run master-server,
worker-server, api-server and alert-server"
+ printf "%-13s: %s\n" "master-server" "MasterServer is mainly
responsible for DAG task split, task submission monitoring."
+ printf "%-13s: %s\n" "worker-server" "WorkerServer is mainly
responsible for task execution and providing log services."
+ printf "%-13s: %s\n" "api-server" "ApiServer is mainly
responsible for processing requests and providing the front-end UI layer."
+ printf "%-13s: %s\n" "alert-server" "AlertServer mainly include
Alarms."
+ printf "%-13s: %s\n" "standalone-server" "Standalone server that uses
embedded zookeeper and database, only for testing and demostration."
}
# init config file
@@ -110,6 +112,9 @@ case "$1" in
waitDatabase
export ALERT_START_ENABLED=true
;;
+ (standalone-server)
+ export STANDALONE_START_ENABLED=true
+ ;;
(help)
printUsage
exit 1
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
deleted file mode 100644
index bab4acc..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.master.future;
-
-
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * task future
- */
-public class TaskFuture {
-
- private final static Logger LOGGER =
LoggerFactory.getLogger(TaskFuture.class);
-
- private final static ConcurrentHashMap<Long,TaskFuture> FUTURE_TABLE = new
ConcurrentHashMap<>(256);
-
- /**
- * request unique identification
- */
- private final long opaque;
-
- /**
- * timeout
- */
- private final long timeoutMillis;
-
- private final CountDownLatch latch = new CountDownLatch(1);
-
- private final long beginTimestamp = System.currentTimeMillis();
-
- /**
- * response command
- */
- private AtomicReference<Command> responseCommandReference = new
AtomicReference<>();
-
- private volatile boolean sendOk = true;
-
- private AtomicReference<Throwable> causeReference;
-
- public TaskFuture(long opaque, long timeoutMillis) {
- this.opaque = opaque;
- this.timeoutMillis = timeoutMillis;
- FUTURE_TABLE.put(opaque, this);
- }
-
- /**
- * wait for response
- * @return command
- * @throws InterruptedException if error throws InterruptedException
- */
- public Command waitResponse() throws InterruptedException {
- this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
- return this.responseCommandReference.get();
- }
-
- /**
- * put response
- *
- * @param responseCommand responseCommand
- */
- public void putResponse(final Command responseCommand) {
- responseCommandReference.set(responseCommand);
- this.latch.countDown();
- FUTURE_TABLE.remove(opaque);
- }
-
- /**
- * whether timeout
- * @return timeout
- */
- public boolean isTimeout() {
- long diff = System.currentTimeMillis() - this.beginTimestamp;
- return diff > this.timeoutMillis;
- }
-
- public static void notify(final Command responseCommand){
- TaskFuture taskFuture =
FUTURE_TABLE.remove(responseCommand.getOpaque());
- if(taskFuture != null){
- taskFuture.putResponse(responseCommand);
- }
- }
-
-
- public boolean isSendOK() {
- return sendOk;
- }
-
- public void setSendOk(boolean sendOk) {
- this.sendOk = sendOk;
- }
-
- public void setCause(Throwable cause) {
- causeReference.set(cause);
- }
-
- public Throwable getCause() {
- return causeReference.get();
- }
-
- public long getOpaque() {
- return opaque;
- }
-
- public long getTimeoutMillis() {
- return timeoutMillis;
- }
-
- public long getBeginTimestamp() {
- return beginTimestamp;
- }
-
- public Command getResponseCommand() {
- return responseCommandReference.get();
- }
-
- public void setResponseCommand(Command responseCommand) {
- responseCommandReference.set(responseCommand);
- }
-
-
- /**
- * scan future table
- */
- public static void scanFutureTable(){
- final List<TaskFuture> futureList = new LinkedList<>();
- Iterator<Map.Entry<Long, TaskFuture>> it =
FUTURE_TABLE.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<Long, TaskFuture> next = it.next();
- TaskFuture future = next.getValue();
- if ((future.getBeginTimestamp() + future.getTimeoutMillis() +
1000) <= System.currentTimeMillis()) {
- futureList.add(future);
- it.remove();
- LOGGER.warn("remove timeout request : {}", future);
- }
- }
- }
-
- @Override
- public String toString() {
- return "TaskFuture{" +
- "opaque=" + opaque +
- ", timeoutMillis=" + timeoutMillis +
- ", latch=" + latch +
- ", beginTimestamp=" + beginTimestamp +
- ", responseCommand=" + responseCommandReference.get() +
- ", sendOk=" + sendOk +
- ", cause=" + causeReference.get() +
- '}';
- }
-}