This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 5332e6d [New Scheduler] Run scheduler (#5194)
5332e6d is described below
commit 5332e6de625c4df9b717e067a5369ee446e2fda1
Author: Dominic Kim <[email protected]>
AuthorDate: Fri Feb 11 10:46:56 2022 +0900
[New Scheduler] Run scheduler (#5194)
* Add Akka-cluster dependency
* Update configurations to run the new scheduler.
* Add gRPC handlers for activations.
* Update Ansible scripts to run the new scheduler.
* Increase the queue creation request timeout.
* Add scheduler the ansible role.
* Fix typo.
* Change the loglevel config to logback's one.
* Change the topic name
* Remove unnecessary configs
* Add a guide how to deploy the new scheduler.
* Make ActorSystem for each test bind to a free port.
---
ansible/README.md | 52 ++++
ansible/environments/local/hosts.j2.ini | 6 +
ansible/group_vars/all | 62 +++-
ansible/openwhisk.yml | 5 +
ansible/roles/controller/tasks/deploy.yml | 15 +
ansible/roles/invoker/tasks/deploy.yml | 15 +
ansible/roles/schedulers/tasks/clean.yml | 24 ++
ansible/roles/schedulers/tasks/deploy.yml | 339 +++++++++++++++++++++
.../roles/schedulers/tasks/join_akka_cluster.yml | 38 +++
ansible/roles/schedulers/tasks/main.yml | 10 +
ansible/roles/schedulers/templates/jmx.yml.j2 | 25 ++
ansible/scheduler.yml | 37 +++
ansible/tasks/initdb.yml | 1 +
ansible/tasks/wipeDatabase.yml | 2 +
ansible/templates/db_local.ini.j2 | 4 +
common/scala/build.gradle | 4 +
common/scala/src/main/resources/application.conf | 17 ++
.../org/apache/openwhisk/core/WhiskConfig.scala | 2 +-
.../core/loadBalancer/FPCPoolBalancer.scala | 3 +-
core/scheduler/src/main/resources/application.conf | 10 +-
.../openwhisk/core/scheduler/Scheduler.scala | 42 ++-
tests/src/test/resources/application.conf.j2 | 18 +-
22 files changed, 698 insertions(+), 33 deletions(-)
diff --git a/ansible/README.md b/ansible/README.md
index ef458ae..6865c6b 100644
--- a/ansible/README.md
+++ b/ansible/README.md
@@ -148,6 +148,58 @@ ansible-playbook -i environments/$ENVIRONMENT prereq.yml
**Hint:** During playbook execution the `TASK [prereq : check for pip]` can
show as failed. This is normal if no pip is installed. The playbook will then
move on and install pip on the target machines.
+### [Optional] Enable the new scheduler
+
+You can enable the new scheduler of OpenWhisk.
+It will run one more component called "scheduler" and ETCD.
+
+#### Configure service providers for the scheduler
+You can update service providers for the scheduler as follows.
+
+**common/scala/src/main/resources**
+```
+whisk.spi {
+ ArtifactStoreProvider =
org.apache.openwhisk.core.database.CouchDbStoreProvider
+ ActivationStoreProvider =
org.apache.openwhisk.core.database.ArtifactActivationStoreProvider
+ MessagingProvider =
org.apache.openwhisk.connector.kafka.KafkaMessagingProvider
+ ContainerFactoryProvider =
org.apache.openwhisk.core.containerpool.docker.DockerContainerFactoryProvider
+ LogStoreProvider =
org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider
+ LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.FPCPoolBalancer
+ EntitlementSpiProvider =
org.apache.openwhisk.core.entitlement.FPCEntitlementProvider
+ AuthenticationDirectiveProvider =
org.apache.openwhisk.core.controller.BasicAuthenticationDirective
+ InvokerProvider = org.apache.openwhisk.core.invoker.FPCInvokerReactive
+ InvokerServerProvider = org.apache.openwhisk.core.invoker.FPCInvokerServer
+ DurationCheckerProvider =
org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationCheckerProvider
+}
+.
+.
+.
+```
+
+#### Enable the scheduler
+- Make sure you enable the scheduler by configuring `scheduler_enable`.
+
+**ansible/environments/local/group_vars**
+```yaml
+scheduler_enable: true
+```
+
+#### [Optional] Enable ElasticSearch Activation Store
+When you use the new scheduler, it is recommended to use ElasticSearch as an
activation store.
+
+**ansible/environments/local/group_vars**
+```yaml
+db_activation_backend: ElasticSearch
+elastic_cluster_name: <your elasticsearch cluster name>
+elastic_protocol: <your elasticsearch protocol>
+elastic_index_pattern: <your elasticsearch index pattern>
+elastic_base_volume: <your elasticsearch volume directory>
+elastic_username: <your elasticsearch username>
+elastic_password: <your elasticsearch username>
+```
+
+You can also refer to this guide to [deploy OpenWhisk using
ElasticSearch](https://github.com/apache/openwhisk/blob/master/ansible/README.md#using-elasticsearch-to-store-activations).
+
### Deploying Using CouchDB
- Make sure your `db_local.ini` file is [setup for](#setup) CouchDB then
execute:
diff --git a/ansible/environments/local/hosts.j2.ini
b/ansible/environments/local/hosts.j2.ini
index cceb706..26fce32 100644
--- a/ansible/environments/local/hosts.j2.ini
+++ b/ansible/environments/local/hosts.j2.ini
@@ -27,6 +27,12 @@ invoker0 ansible_host=172.17.0.1
ansible_connection=local
invoker1 ansible_host=172.17.0.1 ansible_connection=local
{% endif %}
+[schedulers]
+scheduler0 ansible_host=172.17.0.1 ansible_connection=local
+{% if mode is defined and 'HA' in mode %}
+scheduler1 ansible_host=172.17.0.1 ansible_connection=local
+{% endif %}
+
; db group is only used if db.provider is CouchDB
[db]
172.17.0.1 ansible_host=172.17.0.1 ansible_connection=local
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 0f9b107..79d7eaf 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -127,6 +127,8 @@ jmx:
rmiBasePortController: 16000
basePortInvoker: 17000
rmiBasePortInvoker: 18000
+ basePortScheduler: 21000
+ rmiBasePortScheduler: 22000
user: "{{ jmxuser | default('jmxuser') }}"
pass: "{{ jmxuser | default('jmxpass') }}"
jvmCommonArgs: "-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.authenticate=true
-Dcom.sun.management.jmxremote.password.file=/home/owuser/jmxremote.password
-Dcom.sun.management.jmxremote.access.file=/home/owuser/jmxremote.access"
@@ -221,6 +223,8 @@ invoker:
keystore:
password: "{{ invoker_keystore_password | default('openwhisk') }}"
name: "{{ __invoker_ssl_keyPrefix }}openwhisk-keystore.p12"
+ container:
+ creationMaxPeek: "{{ container_creation_max_peek | default(500) }}"
reactiveSpi: "{{ invokerReactive_spi | default('') }}"
serverSpi: "{{ invokerServer_spi | default('') }}"
@@ -278,6 +282,9 @@ db:
invoker:
user: "{{ db_invoker_user | default(lookup('ini', 'db_username
section=invoker file={{ playbook_dir }}/db_local.ini')) }}"
pass: "{{ db_invoker_pass | default(lookup('ini', 'db_password
section=invoker file={{ playbook_dir }}/db_local.ini')) }}"
+ scheduler:
+ user: "{{ db_scheduler_user | default(lookup('ini', 'db_username
section=scheduler file={{ playbook_dir }}/db_local.ini')) }}"
+ pass: "{{ db_scheduler_pass | default(lookup('ini', 'db_password
section=scheduler file={{ playbook_dir }}/db_local.ini')) }}"
artifact_store:
backend: "{{ db_artifact_backend | default('CouchDB') }}"
activation_store:
@@ -435,8 +442,9 @@ metrics:
user_events: "{{ user_events_enabled | default(false) | lower }}"
-durationChecker:
- timeWindow: "{{ duration_checker_time_window | default('1 d') }}"
+zeroDowntimeDeployment:
+ enabled: "{{ zerodowntime_deployment_switch | default(true) }}"
+ solution: "{{ zerodowntime_deployment_solution | default('apicall') }}"
etcd:
version: "{{ etcd_version | default('v3.4.0') }}"
@@ -463,13 +471,63 @@ etcd_connect_string: "{% set ret = [] %}\
{% endfor %}\
{{ ret | join(',') }}"
+
+__scheduler_blackbox_fraction: 0.10
+
+watcher:
+ eventNotificationDelayMs: "{{ watcher_notification_delay | default('5000
ms') }}"
+
+durationChecker:
+ timeWindow: "{{ duration_checker_time_window | default('1 d') }}"
+
+enable_scheduler: "{{ scheduler_enable | default(false) }}"
+
scheduler:
protocol: "{{ scheduler_protocol | default('http') }}"
+ dir:
+ become: "{{ scheduler_dir_become | default(false) }}"
+ confdir: "{{ config_root_dir }}/scheduler"
+ basePort: 14001
grpc:
+ basePort: 13001
tls: "{{ scheduler_grpc_tls | default(false) }}"
maxPeek: "{{ scheduler_max_peek | default(128) }}"
+ heap: "{{ scheduler_heap | default('2g') }}"
+ arguments: "{{ scheduler_arguments | default('') }}"
+ instances: "{{ groups['schedulers'] | length }}"
+ username: "{{ scheduler_username | default('scheduler.user') }}"
+ password: "{{ scheduler_password | default('scheduler.pass') }}"
+ akka:
+ provider: cluster
+ cluster:
+ basePort: 25520
+ host: "{{ groups['schedulers'] | map('extract', hostvars,
'ansible_host') | list }}"
+ bindPort: 3551
+ # at this moment all schedulers are seed nodes
+ seedNodes: "{{ groups['schedulers'] | map('extract', hostvars,
'ansible_host') | list }}"
+ loglevel: "{{ scheduler_loglevel | default(whisk_loglevel) | default('INFO')
}}"
+ extraEnv: "{{ scheduler_extraEnv | default({}) }}"
+ dataManagementService:
+ retryInterval: "{{ scheduler_dataManagementService_retryInterval |
default('1 second') }}"
+ inProgressJobRetentionSecond: "{{ scheduler_inProgressJobRetentionSecond |
default('20 seconds') }}"
+ managedFraction: "{{ scheduler_managed_fraction | default(1.0 -
(scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
+ blackboxFraction: "{{ scheduler_blackbox_fraction |
default(__scheduler_blackbox_fraction) }}"
queueManager:
maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second')
}}"
maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"
+ queue:
+ # the queue's state Running timeout, e.g. if have no activation comes into
queue when Running, the queue state will be changed from Running to Idle and
delete the decision algorithm actor
+ idleGrace: "{{ scheduler_queue_idleGrace | default('20 seconds') }}"
+ # the queue's state Idle timeout, e.g. if have no activation comes into
queue when Idle, the queue state will be changed from Idle to Removed
+ stopGrace: "{{ scheduler_queue_stopGrace | default('20 seconds') }}"
+ # the queue's state Paused timeout, e.g. if have no activation comes into
queue when Paused, the queue state will be changed from Paused to Removed
+ flushGrace: "{{ scheduler_queue_flushGrace | default('60 seconds') }}"
+ gracefulShutdownTimeout: "{{ scheduler_queue_gracefulShutdownTimeout |
default('5 seconds') }}"
+ maxRetentionSize: "{{ scheduler_queue_maxRetentionSize | default(10000) }}"
+ maxRetentionMs: "{{ scheduler_queue_maxRetentionMs | default(60000) }}"
+ maxBlackboxRetentionMs: "{{ scheduler_queue_maxBlackboxRetentionMs |
default(300000) }}"
+ throttlingFraction: "{{ scheduler_queue_throttlingFraction | default(0.9)
}}"
+ durationBufferSize: "{{ scheduler_queue_durationBufferSize | default(10)
}}"
+ deployment_ignore_error: "{{ scheduler_deployment_ignore_error |
default('False') }}"
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval |
default('1 second') }}"
diff --git a/ansible/openwhisk.yml b/ansible/openwhisk.yml
index 79d0b4d..f832b62 100644
--- a/ansible/openwhisk.yml
+++ b/ansible/openwhisk.yml
@@ -20,12 +20,17 @@
# playbook (currently cloudant.yml or couchdb.yml).
# It assumes that wipe.yml have being deployed at least once.
+- import_playbook: etcd.yml
+ when: enable_scheduler
- import_playbook: kafka.yml
when: not lean
- import_playbook: controller.yml
+- import_playbook: scheduler.yml
+ when: enable_scheduler
+
- import_playbook: invoker.yml
when: not lean
diff --git a/ansible/roles/controller/tasks/deploy.yml
b/ansible/roles/controller/tasks/deploy.yml
index 931d657..8c99dc9 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -309,6 +309,21 @@
env: "{{ env | combine(mongodb_env) }}"
when: db.artifact_store.backend == "MongoDB"
+- name: setup scheduler env
+ set_fact:
+ scheduler_env:
+ "CONFIG_whisk_etcd_hosts": "{{ etcd_connect_string }}"
+ "CONFIG_whisk_etcd_lease_timeout": "{{ etcd.lease.timeout }}"
+ "CONFIG_whisk_etcd_pool_threads": "{{ etcd.pool_threads }}"
+ "CONFIG_whisk_scheduler_grpc_tls": "{{ scheduler.grpc.tls |
default('false') | lower }}"
+ "CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
+ when: enable_scheduler
+
+- name: merge scheduler env
+ set_fact:
+ env: "{{ env | combine(scheduler_env) }}"
+ when: enable_scheduler
+
- name: populate volumes for controller
set_fact:
controller_volumes:
diff --git a/ansible/roles/invoker/tasks/deploy.yml
b/ansible/roles/invoker/tasks/deploy.yml
index 28d8ead..fd837ce 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -328,6 +328,21 @@
env: "{{ env | combine(mongodb_env) }}"
when: db.artifact_store.backend == "MongoDB"
+- name: setup scheduler env
+ set_fact:
+ scheduler_env:
+ "CONFIG_whisk_etcd_hosts": "{{ etcd_connect_string }}"
+ "CONFIG_whisk_etcd_lease_timeout": "{{ etcd.lease.timeout }}"
+ "CONFIG_whisk_etcd_pool_threads": "{{ etcd.pool_threads }}"
+ "CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{
scheduler.dataManagementService.retryInterval }}"
+ "CONFIG_whisk_invoker_containerCreation_maxPeek": "{{
invoker.container.creationMaxPeek }}"
+ when: enable_scheduler
+
+- name: merge scheduler env
+ set_fact:
+ env: "{{ env | combine(scheduler_env) }}"
+ when: enable_scheduler
+
- name: include plugins
include_tasks: "{{ inv_item }}.yml"
with_items: "{{ invoker_plugins | default([]) }}"
diff --git a/ansible/roles/schedulers/tasks/clean.yml
b/ansible/roles/schedulers/tasks/clean.yml
new file mode 100644
index 0000000..20bb996
--- /dev/null
+++ b/ansible/roles/schedulers/tasks/clean.yml
@@ -0,0 +1,24 @@
+---
+# Remove scheduler containers.
+
+- name: get scheduler name
+ set_fact:
+ scheduler_name: "{{ name_prefix ~ host_group.index(inventory_hostname) }}"
+
+- name: remove scheduler
+ docker_container:
+ name: "{{ scheduler_name }}"
+ state: absent
+ ignore_errors: "True"
+
+- name: remove scheduler log directory
+ file:
+ path: "{{ whisk_logs_dir }}/{{ scheduler_name }}"
+ state: absent
+ become: "{{ logs.dir.become }}"
+
+- name: remove scheduler conf directory
+ file:
+ path: "{{ scheduler.confdir }}/{{ scheduler_name }}"
+ state: absent
+ become: "{{ scheduler.dir.become }}"
diff --git a/ansible/roles/schedulers/tasks/deploy.yml
b/ansible/roles/schedulers/tasks/deploy.yml
new file mode 100644
index 0000000..7a97665
--- /dev/null
+++ b/ansible/roles/schedulers/tasks/deploy.yml
@@ -0,0 +1,339 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
contributor
+# license agreements; and to You under the Apache License, Version 2.0.
+---
+# This role will install Scheduler in group 'schedulers' in the environment
+# inventory
+
+- import_tasks: docker_login.yml
+
+- name: get scheduler name and index
+ set_fact:
+ scheduler_name: "{{ name_prefix ~ host_group.index(inventory_hostname) }}"
+ scheduler_index:
+ "{{ (scheduler_index_base|int) + host_group.index(inventory_hostname) }}"
+
+- name: "pull the {{ docker.image.tag }} image of scheduler"
+ shell: "docker pull {{docker_registry}}{{ docker.image.prefix
}}/scheduler:{{docker.image.tag}}"
+ when: docker_registry != ""
+ register: result
+ until: (result.rc == 0)
+ retries: "{{ docker.pull.retries }}"
+ delay: "{{ docker.pull.delay }}"
+
+- name: ensure scheduler log directory is created with permissions
+ file:
+ path: "{{ whisk_logs_dir }}/{{ scheduler_name }}"
+ state: directory
+ mode: 0777
+ become: "{{ logs.dir.become }}"
+
+# We need to create the file with proper permissions because the dir creation
above
+# does not result in a dir with full permissions in docker machine especially
with macos mounts
+- name: ensure scheduler log file is created with permissions
+ file:
+ path: "{{ whisk_logs_dir }}/{{ scheduler_name }}/{{ scheduler_name
}}_logs.log"
+ state: touch
+ mode: 0777
+ when: environment_type is defined and environment_type == "docker-machine"
+
+- name: ensure scheduler config directory is created with permissions
+ file:
+ path: "{{ scheduler.confdir }}/{{ scheduler_name }}"
+ state: directory
+ mode: 0777
+ become: "{{ scheduler.dir.become }}"
+
+- name: check, that required databases exist
+ include_tasks: "{{ openwhisk_home }}/ansible/tasks/db/checkDb.yml"
+ vars:
+ dbName: "{{ item }}"
+ dbUser: "{{ db.credentials.scheduler.user }}"
+ dbPass: "{{ db.credentials.scheduler.pass }}"
+ with_items:
+ - "{{ db.whisk.auth }}"
+
+- name: copy jmxremote password file
+ when: jmx.enabled
+ template:
+ src: "jmxremote.password.j2"
+ dest: "{{ scheduler.confdir }}/{{ scheduler_name }}/jmxremote.password"
+ mode: 0777
+
+- name: copy jmxremote access file
+ when: jmx.enabled
+ template:
+ src: "jmxremote.access.j2"
+ dest: "{{ scheduler.confdir }}/{{ scheduler_name }}/jmxremote.access"
+ mode: 0777
+
+- name: prepare scheduler port
+ set_fact:
+ scheduler_port: "{{ scheduler.basePort + (scheduler_index | int) }}"
+ ports_to_expose:
+ - "{{ scheduler.grpc.basePort + (scheduler_index | int) }}:{{
scheduler.grpc.basePort + (scheduler_index | int) }}"
+ - "{{ scheduler.basePort + (scheduler_index | int) }}:8080"
+
+- name: expose additional ports if jmxremote is enabled
+ when: jmx.enabled
+ vars:
+ jmx_remote_port: "{{ jmx.basePortScheduler + (scheduler_index|int) }}"
+ jmx_remote_rmi_port:
+ "{{ jmx.rmiBasePortScheduler + (scheduler_index|int) }}"
+ set_fact:
+ ports_to_expose: >-
+ {{ ports_to_expose }} +
+ [ '{{ jmx_remote_port }}:{{ jmx_remote_port }}' ] +
+ [ '{{ jmx_remote_rmi_port }}:{{ jmx_remote_rmi_port }}' ]
+ scheduler_args: >-
+ {{ scheduler.arguments }}
+ {{ jmx.jvmCommonArgs }}
+ -Djava.rmi.server.hostname={{ ansible_host }}
+ -Dcom.sun.management.jmxremote.rmi.port={{ jmx_remote_rmi_port }}
+ -Dcom.sun.management.jmxremote.port={{ jmx_remote_port }}
+
+- name: populate environment variables for scheduler
+ set_fact:
+ env:
+ "JAVA_OPTS":
+ -Xmx{{ scheduler.heap }}
+ -XX:+CrashOnOutOfMemoryError
+ -XX:+UseGCOverheadLimit
+ -XX:ErrorFile=/logs/java_error.log
+ -XX:+HeapDumpOnOutOfMemoryError
+ -XX:HeapDumpPath=/logs
+ "SCHEDULER_OPTS": "{{ scheduler_args | default(scheduler.arguments) }}"
+ "SCHEDULER_INSTANCES": "{{ scheduler.instances }}"
+ "JMX_REMOTE": "{{ jmx.enabled }}"
+ "PORT": "8080"
+
+ "WHISK_SCHEDULER_ENDPOINTS_HOST": "{{ ansible_host }}"
+ "WHISK_SCHEDULER_ENDPOINTS_RPCPORT": "{{ scheduler.grpc.basePort +
(scheduler_index | int)}}"
+ "WHISK_SCHEDULER_ENDPOINTS_AKKAPORT": "{{
scheduler.akka.cluster.basePort + (scheduler_index | int) }}"
+ "CONFIG_whisk_scheduler_protocol": "{{ scheduler.protocol }}"
+ "CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
+ "CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{
scheduler.dataManagementService.retryInterval }}"
+ "CONFIG_whisk_scheduler_inProgressJobRetention": "{{
scheduler.inProgressJobRetentionSecond }}"
+ "CONFIG_whisk_scheduler_queueManager_maxSchedulingTime": "{{
scheduler.queueManager.maxSchedulingTime }}"
+ "CONFIG_whisk_scheduler_queueManager_maxRetriesToGetQueue": "{{
scheduler.queueManager.maxRetriesToGetQueue }}"
+ "CONFIG_whisk_scheduler_queue_idleGrace": "{{ scheduler.queue.idleGrace
}}"
+ "CONFIG_whisk_scheduler_queue_stopGrace": "{{ scheduler.queue.stopGrace
}}"
+ "CONFIG_whisk_scheduler_queue_flushGrace": "{{
scheduler.queue.flushGrace }}"
+ "CONFIG_whisk_scheduler_queue_gracefulShutdownTimeout": "{{
scheduler.queue.gracefulShutdownTimeout }}"
+ "CONFIG_whisk_scheduler_queue_maxRetentionSize": "{{
scheduler.queue.maxRetentionSize }}"
+ "CONFIG_whisk_scheduler_queue_maxRetentionMs": "{{
scheduler.queue.maxRetentionMs }}"
+ "CONFIG_whisk_scheduler_queue_throttlingFraction": "{{
scheduler.queue.throttlingFraction }}"
+ "CONFIG_whisk_scheduler_queue_durationBufferSize": "{{
scheduler.queue.durationBufferSize }}"
+ "CONFIG_whisk_durationChecker_timeWindow": "{{
durationChecker.timeWindow }}"
+
+ "TZ": "{{ docker.timezone }}"
+
+ "KAFKA_HOSTS": "{{ kafka_connect_string }}"
+ "CONFIG_whisk_kafka_replicationFactor":
+ "{{ kafka.replicationFactor | default() }}"
+ "CONFIG_whisk_kafka_topics_scheduler_retentionBytes":
+ "{{ kafka_topics_scheduler_retentionBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_scheduler_retentionMs":
+ "{{ kafka_topics_scheduler_retentionMS | default() }}"
+ "CONFIG_whisk_kafka_topics_scheduler_segmentBytes":
+ "{{ kafka_topics_scheduler_segmentBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_creationAck_retentionBytes":
+ "{{ kafka_topics_creationAck_retentionBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_creationAck_retentionMs":
+ "{{ kafka_topics_creationAck_retentionMS | default() }}"
+ "CONFIG_whisk_kafka_topics_creationAck_segmentBytes":
+ "{{ kafka_topics_creationAck_segmentBytes | default() }}"
+ "CONFIG_whisk_kafka_topics_prefix":
+ "{{ kafka.topicsPrefix }}"
+ "CONFIG_whisk_kafka_topics_userEvent_prefix":
+ "{{ kafka.topicsUserEventPrefix }}"
+ "CONFIG_whisk_kafka_common_securityProtocol":
+ "{{ kafka.protocol }}"
+ "CONFIG_whisk_kafka_common_sslTruststoreLocation":
+ "/conf/{{ kafka.ssl.keystore.name }}"
+ "CONFIG_whisk_kafka_common_sslTruststorePassword":
+ "{{ kafka.ssl.keystore.password }}"
+ "CONFIG_whisk_kafka_common_sslKeystoreLocation":
+ "/conf/{{ kafka.ssl.keystore.name }}"
+ "CONFIG_whisk_kafka_common_sslKeystorePassword":
+ "{{ kafka.ssl.keystore.password }}"
+ "ZOOKEEPER_HOSTS": "{{ zookeeper_connect_string }}"
+
+ "LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
+
+ "CONFIG_whisk_couchdb_protocol": "{{ db.protocol }}"
+ "CONFIG_whisk_couchdb_host": "{{ db.host }}"
+ "CONFIG_whisk_couchdb_port": "{{ db.port }}"
+ "CONFIG_whisk_couchdb_username": "{{ db.credentials.scheduler.user }}"
+ "CONFIG_whisk_couchdb_password": "{{ db.credentials.scheduler.pass }}"
+ "CONFIG_whisk_couchdb_provider": "{{ db.provider }}"
+ "CONFIG_whisk_couchdb_databases_WhiskAuth": "{{ db.whisk.auth }}"
+ "CONFIG_whisk_couchdb_databases_WhiskEntity": "{{ db.whisk.actions }}"
+ "CONFIG_whisk_couchdb_databases_WhiskActivation": "{{
db.whisk.activations }}"
+ "CONFIG_whisk_db_actionsDdoc": "{{ db_whisk_actions_ddoc | default() }}"
+ "CONFIG_whisk_db_activationsDdoc": "{{ db_whisk_activations_ddoc |
default() }}"
+ "CONFIG_whisk_db_activationsFilterDdoc": "{{
db_whisk_activations_filter_ddoc | default() }}"
+ "CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) |
lower }}"
+
+ "CONFIG_whisk_memory_min": "{{ limit_action_memory_min | default() }}"
+ "CONFIG_whisk_memory_max": "{{ limit_action_memory_max | default() }}"
+ "CONFIG_whisk_memory_std": "{{ limit_action_memory_std | default() }}"
+
+ "CONFIG_whisk_timeLimit_min": "{{ limit_action_time_min | default() }}"
+ "CONFIG_whisk_timeLimit_max": "{{ limit_action_time_max | default() }}"
+ "CONFIG_whisk_timeLimit_std": "{{ limit_action_time_std | default() }}"
+
+ "RUNTIMES_MANIFEST": "{{ runtimesManifest | to_json }}"
+ "CONFIG_whisk_runtimes_defaultImagePrefix":
+ "{{ runtimes_default_image_prefix | default() }}"
+ "CONFIG_whisk_runtimes_defaultImageTag":
+ "{{ runtimes_default_image_tag | default() }}"
+ "CONFIG_whisk_runtimes_bypassPullForLocalImages":
+ "{{ runtimes_bypass_pull_for_local_images | default() | lower }}"
+ "CONFIG_whisk_runtimes_localImagePrefix":
+ "{{ runtimes_local_image_prefix | default() }}"
+
+ "METRICS_KAMON": "{{ metrics.kamon.enabled | default(false) | lower }}"
+ "METRICS_KAMON_TAGS": "{{ metrics.kamon.tags | default() | lower }}"
+ "METRICS_LOG": "{{ metrics.log.enabled | default(false) | lower }}"
+
+ "CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}"
+ "CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
+
+ "CONFIG_whisk_fraction_managedFraction":
+ "{{ scheduler.managedFraction }}"
+ "CONFIG_whisk_fraction_blackboxFraction":
+ "{{ scheduler.blackboxFraction }}"
+
+ "CONFIG_logback_log_level": "{{ scheduler.loglevel }}"
+
+ "CONFIG_whisk_transactions_header": "{{ transactions.header }}"
+
+ "CONFIG_whisk_etcd_hosts": "{{ etcd_connect_string }}"
+ "CONFIG_whisk_etcd_lease_timeout": "{{ etcd.lease.timeout }}"
+ "CONFIG_whisk_etcd_pool_threads": "{{ etcd.pool_threads }}"
+ "CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}"
+
+ "CONFIG_whisk_scheduler_username": "{{ scheduler.username }}"
+ "CONFIG_whisk_scheduler_password": "{{ scheduler.password }}"
+
+
+- name: merge extra env variables
+ set_fact:
+ env: "{{ env | combine(scheduler.extraEnv) }}"
+
+- name: populate volumes for scheduler
+ set_fact:
+ scheduler_volumes:
+ - "{{ whisk_logs_dir }}/{{ scheduler_name }}:/logs"
+ - "{{ scheduler.confdir }}/{{ scheduler_name }}:/conf"
+
+- name: setup elasticsearch activation store env
+ set_fact:
+ elastic_env:
+ "CONFIG_whisk_activationStore_elasticsearch_protocol": "{{
db.elasticsearch.protocol}}"
+ "CONFIG_whisk_activationStore_elasticsearch_hosts": "{{
elasticsearch_connect_string }}"
+ "CONFIG_whisk_activationStore_elasticsearch_indexPattern": "{{
db.elasticsearch.index_pattern }}"
+ "CONFIG_whisk_activationStore_elasticsearch_username": "{{
db.elasticsearch.auth.admin.username }}"
+ "CONFIG_whisk_activationStore_elasticsearch_password": "{{
db.elasticsearch.auth.admin.password }}"
+ "CONFIG_whisk_spi_ActivationStoreProvider":
"org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStoreProvider"
+ when: db.activation_store.backend == "ElasticSearch"
+
+- name: merge elasticsearch activation store env
+ set_fact:
+ env: "{{ env | combine(elastic_env) }}"
+ when: db.activation_store.backend == "ElasticSearch"
+
+- name: check if coverage collection is enabled
+ set_fact:
+ coverage_enabled: false
+ when: coverage_enabled is undefined
+
+- name: ensure scheduler coverage directory is created with permissions
+ file:
+ path: "{{ coverage_logs_dir }}/scheduler/{{ item }}"
+ state: directory
+ mode: 0777
+ with_items:
+ - scheduler
+ - common
+ become: "{{ logs.dir.become }}"
+ when: coverage_enabled
+
+- name: extend scheduler volume for coverage
+ set_fact:
+ scheduler_volumes: "{{ scheduler_volumes|default({}) +
[coverage_logs_dir+'/scheduler:/coverage'] }}"
+ when: coverage_enabled
+
+- name: include plugins
+ include_tasks: "{{ item }}.yml"
+ with_items: "{{ scheduler_plugins | default([]) }}"
+
+- name: Judge current scheduler whether deployed
+ shell: echo $(docker ps | grep {{ scheduler_name }} | wc -l)
+ register: schedulerDeployed
+ when: zeroDowntimeDeployment.enabled == true
+
+- name: disable scheduler{{ groups['schedulers'].index(inventory_hostname) }}
before redeploy scheduler
+ uri:
+ url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port
}}/disable"
+ validate_certs: no
+ method: POST
+ status_code: 200
+ user: "{{ scheduler.username }}"
+ password: "{{ scheduler.password }}"
+ force_basic_auth: yes
+ ignore_errors: "{{ scheduler.deployment_ignore_error }}"
+ when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout !=
"0"
+
+- name: wait until all queue and create queue task is finished before redeploy
scheduler when using apicall solution or half solution
+ uri:
+ url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port
}}/queue/total"
+ validate_certs: no
+ return_content: yes
+ user: "{{ scheduler.username }}"
+ password: "{{ scheduler.password }}"
+ force_basic_auth: yes
+ register: totalQueue
+ until: totalQueue.content == "0"
+ retries: 180
+ delay: 5
+ when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout !=
"0"
+ ignore_errors: "{{ scheduler.deployment_ignore_error }}"
+
+- name: wait until all queue and create queue task is finished before redeploy
scheduler using sleep solution
+ shell: sleep 120s
+ when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout !=
"0" and zerodowntimeDeployment.solution == 'sleep'
+
+- name: (re)start scheduler
+ docker_container:
+ name: "{{ scheduler_name }}"
+ image:
+ "{{docker_registry~docker.image.prefix}}/scheduler:{{ 'cov' if
(coverage_enabled) else docker.image.tag }}"
+ state: started
+ recreate: true
+ restart_policy: "{{ docker.restart.policy }}"
+ hostname: "{{ scheduler_name }}"
+ env: "{{ env }}"
+ volumes: "{{ scheduler_volumes }}"
+ ports: "{{ ports_to_expose }}"
+ command:
+ /bin/sh -c
+ "exec /init.sh {{ scheduler_index }}
+ >> /logs/{{ scheduler_name }}_logs.log 2>&1"
+
+- name: wait until the Scheduler in this host is up and running
+ uri:
+ url:
+ "{{scheduler.protocol}}://{{ansible_host}}:{{scheduler_port}}/ping"
+ validate_certs: "no"
+ register: result
+ until: result.status == 200
+ retries: 12
+ delay: 5
+
+- name: create scheduler jmx.yml
+ template:
+ src: "{{ openwhisk_home }}/ansible/roles/schedulers/templates/jmx.yml.j2"
+ dest: "{{ scheduler.confdir }}/jmx.yml"
+ ignore_errors: True
+ when: scheduler_index | int + 1 == groups['schedulers'] | length or
ansible_host != hostvars[groups['schedulers'][scheduler_index | int + 1
]]['ansible_host']
diff --git a/ansible/roles/schedulers/tasks/join_akka_cluster.yml
b/ansible/roles/schedulers/tasks/join_akka_cluster.yml
new file mode 100644
index 0000000..375a549
--- /dev/null
+++ b/ansible/roles/schedulers/tasks/join_akka_cluster.yml
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
contributor
+# license agreements; and to You under the Apache License, Version 2.0.
+---
+#
+# Scheduler 'plugin' that will add the items necessary to the scheduler
+# environment to cause the scheduler to join a specified akka cluster
+#
+
+- name: add akka port to ports_to_expose
+ set_fact:
+ ports_to_expose: >-
+ {{ ports_to_expose }} +
+ [ "{{ (scheduler.akka.cluster.basePort + (scheduler_index | int)) }}:"
+ + "{{ scheduler.akka.cluster.bindPort }}" ]
+
+- name: add seed nodes to scheduler environment
+ set_fact:
+ env: >-
+ {{ env | combine({
+ 'CONFIG_akka_cluster_seedNodes_' ~ seedNode.0:
+
'akka://scheduler-actor-system@'~seedNode.1~':'~(scheduler.akka.cluster.basePort+seedNode.0)
+ }) }}
+ with_indexed_items: "{{ scheduler.akka.cluster.seedNodes }}"
+ loop_control:
+ loop_var: seedNode
+
+- name: Add akka environment to scheduler environment
+ vars:
+ akka_env:
+ "CONFIG_akka_actor_provider": "{{ scheduler.akka.provider }}"
+ "CONFIG_akka_remote_artery_canonical_hostname":
+ "{{ scheduler.akka.cluster.host[(scheduler_index | int)] }}"
+ "CONFIG_akka_remote_artery_canonical_port":
+ "{{ scheduler.akka.cluster.basePort + (scheduler_index | int) }}"
+ "CONFIG_akka_remote_artery_bind_port":
+ "{{ scheduler.akka.cluster.bindPort }}"
+ set_fact:
+ env: "{{ env | combine(akka_env) }}"
diff --git a/ansible/roles/schedulers/tasks/main.yml
b/ansible/roles/schedulers/tasks/main.yml
new file mode 100644
index 0000000..26f7f35
--- /dev/null
+++ b/ansible/roles/schedulers/tasks/main.yml
@@ -0,0 +1,10 @@
+---
+# This role will install scheduler in group 'schedulers' in the environment
inventory
+# In deploy mode it will deploy schedulers.
+# In clean mode it will remove the scheduler containers.
+
+- import_tasks: deploy.yml
+ when: mode == "deploy"
+
+- import_tasks: clean.yml
+ when: mode == "clean"
diff --git a/ansible/roles/schedulers/templates/jmx.yml.j2
b/ansible/roles/schedulers/templates/jmx.yml.j2
new file mode 100644
index 0000000..c66802e
--- /dev/null
+++ b/ansible/roles/schedulers/templates/jmx.yml.j2
@@ -0,0 +1,25 @@
+collects:
+{% set index = groups['schedulers'].index(inventory_hostname) %}
+{% set ip =
hostvars[groups['schedulers'][groups['schedulers'].index(inventory_hostname) |
int]]['ansible_host'] %}
+{% for i in range(0,index+1)|reverse if
hostvars[groups['schedulers'][i]]['ansible_host'] == ip %}
+ - {{ hostvars[groups['schedulers'][i]]['inventory_hostname'] }}
+{% endfor %}
+
+rules:
+{% for i in range(0,index+1)|reverse if
hostvars[groups['schedulers'][i]]['ansible_host'] == ip %}
+ - name: {{ hostvars[groups['schedulers'][i]]['inventory_hostname'] }}
+ metrics:
+ - kafka.producer:type=producer-metrics,client-id=*
request-latency-avg,request-latency-max,request-rate,response-rate,incoming-byte-rate,outgoing-byte-rate,connection-count,connection-creation-rate,connection-close-rate,io-ratio,io-time-ns-avg,io-wait-ratio,select-rate,io-wait-time-ns-avg
client-id
+ - kafka.producer:type=producer-node-metrics,client-id=*,node-id=*
request-rate,response-rate,request-latency-max,request-latency-avg,incoming-byte-rate,request-size-avg,outgoing-byte-rate,request-size-max
client-id
+ - kafka.producer:type=producer-topic-metrics,client-id=*,topic=*
record-retry-rate,record-send-rate,compression-rate,byte-rate,record-error-rate
client-id
+ - kafka.consumer:type=consumer-metrics,client-id=*
connection-creation-rate,response-rate,select-rate,connection-count,network-io-rate,io-ratio,io-wait-time-ns-avg,io-wait-ratio,outgoing-byte-rate,request-size-max,io-time-ns-avg,request-rate,incoming-byte-rate,connection-close-rate,request-size-avg
client-id
+ - kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=*
bytes-consumed-rate,records-consumed-rate,fetch-size-max,fetch-size-avg,records-per-request-avg
client-id
+ - kafka.consumer:type=consumer-node-metrics,client-id=*,node-id=*
request-rate,response-rate,request-latency-max,request-latency-avg,incoming-byte-rate,request-size-avg,outgoing-byte-rate,request-size-max
client-id
+ - kafka.consumer:type=consumer-coordinator-metrics,client-id=*
join-time-max,commit-latency-avg,sync-time-avg,join-rate,assigned-partitions,sync-rate,commit-rate,last-heartbeat-seconds-ago,heartbeat-rate,commit-latency-max,join-time-avg,sync-time-max,heartbeat-response-time-max
client-id
+ jvmPrefix: kafka.jvm
+ jmxUrl: "service:jmx:rmi:///jndi/rmi://{{ ip }}:{{ jmx.basePortScheduler +
i }}/jmxrmi"
+ jmxUsername: "{{ jmx.user }}"
+ jmxPassword: "{{ jmx.pass }}"
+
+{% endfor %}
+intervalSec: 10
diff --git a/ansible/scheduler.yml b/ansible/scheduler.yml
new file mode 100644
index 0000000..cb88c4f
--- /dev/null
+++ b/ansible/scheduler.yml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
contributor
+# license agreements; and to You under the Apache License, Version 2.0.
+---
+# This playbook deploys Openwhisk Controllers.
+
+- hosts: schedulers
+ vars:
+ #
+ # host_group - usually "{{ groups['...'] }}" where '...' is what was used
+ # for 'hosts' above. The hostname of each host will be looked up in this
+ # group to assign a zero-based index. That index will be used in concert
+ # with 'name_prefix' below to assign a host/container name.
+ host_group: "{{ groups['schedulers'] }}"
+ #
+ # name_prefix - a unique prefix for this set of controllers. The prefix
+ # will be used in combination with an index (determined using
+ # 'host_group' above) to name host/controllers.
+ name_prefix: "scheduler"
+ #
+ # controller_index_base - the deployment process allocates host docker
+ # ports to individual controllers based on their indices. This is an
+ # additional offset to prevent collisions between different controller
+ # groups. Usually 0 if only one group is being deployed, otherwise
+ # something like "{{ groups['firstcontrollergroup']|length }}"
+ scheduler_index_base: 0
+ #
+ # select which additional capabilities (from the controller role) need
+ # to be added to the controller. Plugin will override default
+ # configuration settings. (Plugins are found in the
+ # 'roles/controller/tasks' directory for now.)
+ scheduler_plugins:
+ # Join an akka cluster rather than running standalone akka
+ - "join_akka_cluster"
+
+ serial: '1'
+ roles:
+ - schedulers
diff --git a/ansible/tasks/initdb.yml b/ansible/tasks/initdb.yml
index 6f57019..6071fe3 100644
--- a/ansible/tasks/initdb.yml
+++ b/ansible/tasks/initdb.yml
@@ -29,6 +29,7 @@
readers:
- "{{ db.credentials.controller.user }}"
- "{{ db.credentials.invoker.user }}"
+ - "{{ db.credentials.scheduler.user }}"
- include_tasks: db/recreateDoc.yml
vars:
diff --git a/ansible/tasks/wipeDatabase.yml b/ansible/tasks/wipeDatabase.yml
index da4c9e9..a468894 100644
--- a/ansible/tasks/wipeDatabase.yml
+++ b/ansible/tasks/wipeDatabase.yml
@@ -30,6 +30,7 @@
readers:
- "{{ db.credentials.controller.user }}"
- "{{ db.credentials.invoker.user }}"
+ - "{{ db.credentials.scheduler.user }}"
writers:
- "{{ db.credentials.controller.user }}"
@@ -46,6 +47,7 @@
writers:
- "{{ db.credentials.controller.user }}"
- "{{ db.credentials.invoker.user }}"
+ - "{{ db.credentials.scheduler.user }}"
- include_tasks: recreateViews.yml
when: withViews == True
diff --git a/ansible/templates/db_local.ini.j2
b/ansible/templates/db_local.ini.j2
index c94ab63..857f58a 100644
--- a/ansible/templates/db_local.ini.j2
+++ b/ansible/templates/db_local.ini.j2
@@ -13,3 +13,7 @@ db_password={{ lookup('env', 'OW_DB_CONTROLLER_PASSWORD') |
default('some_contro
[invoker]
db_username={{ lookup('env', 'OW_DB_INVOKER_USERNAME') | default(db_prefix +
'invoker0', true) }}
db_password={{ lookup('env', 'OW_DB_INVOKER_PASSWORD') |
default('some_invoker_passw0rd', true) }}
+
+[scheduler]
+db_username={{ lookup('env', 'OW_DB_SCHEDULER_USERNAME') | default(db_prefix +
'scheduler0', true) }}
+db_password={{ lookup('env', 'OW_DB_SCHEDULER_PASSWORD') |
default('some_scheduler_passw0rd', true) }}
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 988a236..ccdfc8a 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -40,6 +40,10 @@ dependencies {
compile
"com.typesafe.akka:akka-actor_${gradle.scala.depVersion}:${gradle.akka.version}"
compile
"com.typesafe.akka:akka-stream_${gradle.scala.depVersion}:${gradle.akka.version}"
compile
"com.typesafe.akka:akka-slf4j_${gradle.scala.depVersion}:${gradle.akka.version}"
+ compile
"com.typesafe.akka:akka-cluster_${gradle.scala.depVersion}:${gradle.akka.version}"
+ compile
"com.typesafe.akka:akka-cluster-metrics_${gradle.scala.depVersion}:${gradle.akka.version}"
+ compile
"com.typesafe.akka:akka-cluster-tools_${gradle.scala.depVersion}:${gradle.akka.version}"
+ compile
"com.typesafe.akka:akka-distributed-data_${gradle.scala.depVersion}:${gradle.akka.version}"
compile
"com.typesafe.akka:akka-http-core_${gradle.scala.depVersion}:${gradle.akka_http.version}"
compile
"com.typesafe.akka:akka-http-spray-json_${gradle.scala.depVersion}:${gradle.akka_http.version}"
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 6c69e11..5352526 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -36,6 +36,11 @@ akka.http {
max-connections = 128
max-open-requests = 1024
}
+
+ server {
+ preview.enable-http2 = on
+ parsing.illegal-header-warnings = off
+ }
}
#kamon related configuration
@@ -180,6 +185,13 @@ whisk {
# max-message-bytes is defined programatically as
${whisk.activation.kafka.payload.max} +
# ${whisk.activation.kafka.serdes-overhead}.
}
+ creationAck {
+ segment-bytes = 536870912
+ retention-bytes = 1073741824
+ retention-ms = 3600000
+ # max-message-bytes is defined programatically as
${whisk.activation.kafka.payload.max} +
+ # ${whisk.activation.kafka.serdes-overhead}.
+ }
health {
segment-bytes = 536870912
retention-bytes = 1073741824
@@ -197,6 +209,11 @@ whisk {
retention-bytes = 1073741824
retention-ms = 3600000
}
+ scheduler {
+ segment-bytes = 536870912
+ retention-bytes = 1073741824
+ retention-ms = 86400000
+ }
prefix = ""
user-event {
prefix = ""
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 2a6490b..cd33ffe 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -306,5 +306,5 @@ object ConfigKeys {
val whiskClusterName = "whisk.cluster.name"
- val dataManagementServiceRetryInterval =
"whisk.scheduler.data-management-service.retryInterval"
+ val dataManagementServiceRetryInterval =
"whisk.scheduler.data-management-service.retry-interval"
}
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index 2280ce7..f606f64 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -46,7 +46,8 @@ class FPCPoolBalancer(config: WhiskConfig,
extends LoadBalancer {
private implicit val executionContext: ExecutionContext =
actorSystem.dispatcher
- private implicit val requestTimeout: Timeout = Timeout(5.seconds)
+ // This value is given according to the total waiting time at QueueManager
for a new queue to be created.
+ private implicit val requestTimeout: Timeout = Timeout(8.seconds)
private val entityStore = WhiskEntityStore.datastore()
diff --git a/core/scheduler/src/main/resources/application.conf
b/core/scheduler/src/main/resources/application.conf
index f1a2ed6..66c07fc 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -18,6 +18,7 @@
akka {
actor {
+ provider = cluster
allow-java-serialization = off
serializers {
kryo = "io.altoo.akka.serialization.kryo.KryoSerializer"
@@ -37,10 +38,11 @@ akka {
}
}
- remote.netty.tcp {
- send-buffer-size = 3151796b
- receive-buffer-size = 3151796b
- maximum-frame-size = 3151796b
+ remote {
+ artery {
+ enabled = on
+ transport = tcp
+ }
}
}
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index b6eaed5..f93a766 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -19,14 +19,14 @@ package org.apache.openwhisk.core.scheduler
import akka.Done
import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem,
CoordinatedShutdown, Props}
+import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
-import akka.util.Timeout
import akka.pattern.ask
+import akka.util.Timeout
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._
-import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
import org.apache.openwhisk.core.connector._
@@ -37,28 +37,22 @@ import
org.apache.openwhisk.core.etcd.EtcdType.ByteStringToString
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
import org.apache.openwhisk.core.scheduler.container.{ContainerManager,
CreationJobManager}
import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
-import org.apache.openwhisk.core.scheduler.queue.{
- DurationCheckerProvider,
- MemoryQueue,
- QueueManager,
- QueueSize,
- SchedulingDecisionMaker
-}
+import org.apache.openwhisk.core.scheduler.queue._
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker,
LeaseKeepAliveService, WatcherService}
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.grpc.ActivationServiceHandler
import org.apache.openwhisk.http.BasicHttpService
import org.apache.openwhisk.spi.SpiLoader
import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.generic.auto._
import pureconfig.loadConfigOrThrow
import spray.json.{DefaultJsonProtocol, _}
+import scala.collection.JavaConverters
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
-import pureconfig.generic.auto._
-
-import scala.collection.JavaConverters
class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints:
SchedulerEndpoints)(implicit config: WhiskConfig,
actorSystem: ActorSystem,
@@ -274,12 +268,9 @@ object Scheduler {
schedulerHost -> null,
schedulerAkkaPort -> null,
schedulerRpcPort -> null,
- WhiskConfig.actionInvokePerMinuteLimit -> null,
- WhiskConfig.actionInvokeConcurrentLimit -> null,
- WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+ WhiskConfig.actionInvokeConcurrentLimit -> null) ++
kafkaHosts ++
zookeeperHosts ++
- wskApiHost ++
ExecManifest.requiredProperties
def initKamon(instance: SchedulerInstanceId): Unit = {
@@ -329,7 +320,7 @@ object Scheduler {
val msgProvider = SpiLoader.get[MessagingProvider]
Seq(
- (topicPrefix + "scheduler" + instanceId.asString, "actions",
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+ (topicPrefix + "scheduler" + instanceId.asString, "scheduler",
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
(
topicPrefix + "creationAck" + instanceId.asString,
"creationAck",
@@ -347,12 +338,17 @@ object Scheduler {
// Create scheduler
val scheduler = new Scheduler(instanceId, schedulerEndpoints)
- // TODO: Add Akka-grpc handler
- val httpsConfig =
- if (Scheduler.protocol == "https")
Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None
-
-
BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route,
port, httpsConfig)(actorSystem)
+ Http()
+ .newServerAt("0.0.0.0", port = rpcPort)
+ .bind(scheduler.serviceHandlers)
+ .foreach { _ =>
+ val httpsConfig =
+ if (Scheduler.protocol == "https")
Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https"))
+ else None
+
BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route,
port, httpsConfig)(
+ actorSystem)
+ }
case Failure(t) =>
abort(s"Invalid runtimes manifest: $t")
}
@@ -384,7 +380,7 @@ case class SchedulerStates(sid: SchedulerInstanceId,
queueSize: Int, endpoints:
def getRemoteRef(name: String)(implicit context: ActorRefFactory):
ActorSelection = {
implicit val ec = context.dispatcher
- val path =
s"akka//scheduler-actor-system@${endpoints.asAkkaEndpoint}/user/${name}"
+ val path =
s"akka://scheduler-actor-system@${endpoints.asAkkaEndpoint}/user/${name}"
context.actorSelection(path)
}
diff --git a/tests/src/test/resources/application.conf.j2
b/tests/src/test/resources/application.conf.j2
index 352fa16..4f10144 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -15,6 +15,9 @@ akka.http.host-connection-pool.client.idle-timeout = 90 s
# Avoid system exit for test runs
akka.jvm-exit-on-fatal-error = off
+# Each ActorSystem binds to a free port
+akka.remote.artery.canonical.port=0
+
whisk {
feature-flags {
require-api-key-annotation = {{
whisk.feature_flags.require_api_key_annotation | default(true) }}
@@ -143,11 +146,22 @@ whisk {
grpc {
tls = "{{ scheduler.grpc.tls | default('false') | lower }}"
}
+ queue {
+ idle-grace = "{{ scheduler.queue.idleGrace | default('20 seconds')
}}"
+ stop-grace = "{{ scheduler.queue.stopGrace | default('20 seconds')
}}"
+ flush-grace = "{{ scheduler_queue_flushGrace | default('60
seconds') }}"
+ graceful-shutdown-timeout = "{{
scheduler.queue.gracefulShutdownTimeout | default('5 seconds') }}"
+ max-retention-size = "{{ scheduler.queue.maxRetentionSize |
default(10000) }}"
+ max-retention-ms = "{{ scheduler.queue.maxRetentionMs |
default(60000) }}"
+ throttling-fraction = "{{ scheduler.queue.throttlingFraction |
default(0.9) }}"
+ duration-buffer-size = "{{ scheduler.queue.durationBufferSize |
default(10) }}"
+ }
queue-manager {
- max-scheduling-time = "{{
scheduler.queueManager.maxSchedulingTime }}"
- max-retries-to-get-queue = "{{
scheduler.queueManager.maxRetriesToGetQueue }}"
+ max-scheduling-time = "{{ scheduler.queueManager.maxSchedulingTime
}}"
+ max-retries-to-get-queue = "{{
scheduler.queueManager.maxRetriesToGetQueue }}"
}
max-peek = "{{ scheduler.maxPeek }}"
+ in-progress-job-retention = "{{ scheduler.inProgressJobRetentionSecond
| default('20 seconds') }}"
}
}