This is an automated email from the ASF dual-hosted git repository.
kwmonroe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bigtop.git
The following commit(s) were added to refs/heads/master by this push:
new 036c085 BIGTOP-3092: kafka charm: support juju storage
036c085 is described below
commit 036c08542117667488636b050b8efea5cc48c79b
Author: Casey Marshall <[email protected]>
AuthorDate: Sat Sep 22 13:30:37 2018 +0200
BIGTOP-3092: kafka charm: support juju storage
Closes #400
Signed-off-by: Kevin W Monroe <[email protected]>
---
.../layer-kafka/lib/charms/layer/bigtop_kafka.py | 12 ++++-
.../src/charm/kafka/layer-kafka/metadata.yaml | 8 ++++
.../src/charm/kafka/layer-kafka/reactive/kafka.py | 56 +++++++++++++++++++---
3 files changed, 68 insertions(+), 8 deletions(-)
diff --git
a/bigtop-packages/src/charm/kafka/layer-kafka/lib/charms/layer/bigtop_kafka.py
b/bigtop-packages/src/charm/kafka/layer-kafka/lib/charms/layer/bigtop_kafka.py
index c2f1fc4..9b02a34 100755
---
a/bigtop-packages/src/charm/kafka/layer-kafka/lib/charms/layer/bigtop_kafka.py
+++
b/bigtop-packages/src/charm/kafka/layer-kafka/lib/charms/layer/bigtop_kafka.py
@@ -14,12 +14,14 @@
# limitations under the License.
import os
+import shutil
+from subprocess import check_output
+
from charmhelpers.core import hookenv
from charmhelpers.core import host
from jujubigdata import utils
from charms.layer.apache_bigtop_base import Bigtop
from charms import layer
-from subprocess import check_output
class Kafka(object):
@@ -38,7 +40,7 @@ class Kafka(object):
for port in self.dist_config.exposed_ports('kafka'):
hookenv.close_port(port)
- def configure_kafka(self, zk_units, network_interface=None):
+ def configure_kafka(self, zk_units, network_interface=None, log_dir=None):
# Get ip:port data from our connected zookeepers
zks = []
for unit in zk_units:
@@ -54,6 +56,7 @@ class Kafka(object):
'kafka::server::broker_id': unit_num,
'kafka::server::port': kafka_port,
'kafka::server::zookeeper_connection_string': zk_connect,
+ 'kafka::server::log_dirs': log_dir,
}
if network_interface:
ip = Bigtop().get_ip_for_interface(network_interface)
@@ -62,6 +65,11 @@ class Kafka(object):
bigtop = Bigtop()
bigtop.render_site_yaml(roles=roles, overrides=override)
bigtop.trigger_puppet()
+
+ if log_dir:
+ os.makedirs(log_dir, mode=0o700, exist_ok=True)
+ shutil.chown(log_dir, user='kafka')
+
self.set_advertise()
self.restart()
diff --git a/bigtop-packages/src/charm/kafka/layer-kafka/metadata.yaml
b/bigtop-packages/src/charm/kafka/layer-kafka/metadata.yaml
index d581b35..5094ee2 100644
--- a/bigtop-packages/src/charm/kafka/layer-kafka/metadata.yaml
+++ b/bigtop-packages/src/charm/kafka/layer-kafka/metadata.yaml
@@ -13,3 +13,11 @@ provides:
requires:
zookeeper:
interface: zookeeper
+storage:
+ logs:
+ type: filesystem
+ description: Directory where log files will be stored
+ minimum-size: 20M
+ location: /srv/kafka
+ multiple:
+ range: "0-1"
diff --git a/bigtop-packages/src/charm/kafka/layer-kafka/reactive/kafka.py
b/bigtop-packages/src/charm/kafka/layer-kafka/reactive/kafka.py
index 97f96d6..92cd58b 100644
--- a/bigtop-packages/src/charm/kafka/layer-kafka/reactive/kafka.py
+++ b/bigtop-packages/src/charm/kafka/layer-kafka/reactive/kafka.py
@@ -13,10 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from charmhelpers.core import hookenv
+import os
+
+from charmhelpers.core import hookenv, unitdata
from charms.layer.apache_bigtop_base import get_layer_opts, get_package_version
from charms.layer.bigtop_kafka import Kafka
-from charms.reactive import set_state, remove_state, when, when_not
+from charms.reactive import set_state, remove_state, when, when_not, hook
from charms.reactive.helpers import data_changed
@@ -38,9 +40,11 @@ def configure_kafka(zk):
hookenv.status_set('maintenance', 'setting up kafka')
data_changed( # Prime data changed for network interface
'kafka.network_interface', hookenv.config().get('network_interface'))
+ log_dir = unitdata.kv().get('kafka.storage.log_dir')
+ data_changed('kafka.storage.log_dir', log_dir)
kafka = Kafka()
zks = zk.zookeepers()
- kafka.configure_kafka(zks)
+ kafka.configure_kafka(zks, log_dir=log_dir)
kafka.open_ports()
set_state('kafka.started')
hookenv.status_set('active', 'ready')
@@ -62,14 +66,18 @@ def configure_kafka_zookeepers(zk):
"""
zks = zk.zookeepers()
network_interface = hookenv.config().get('network_interface')
- if not data_changed('zookeepers', zks) and not data_changed(
- 'kafka.network_interface', network_interface):
+ log_dir = unitdata.kv().get('kafka.storage.log_dir')
+ if not(any((
+ data_changed('zookeepers', zks),
+ data_changed('kafka.network_interface', network_interface),
+ data_changed('kafka.storage.log_dir', log_dir)))):
return
hookenv.log('Checking Zookeeper configuration')
hookenv.status_set('maintenance', 'updating zookeeper instances')
kafka = Kafka()
- kafka.configure_kafka(zks, network_interface)
+ kafka.configure_kafka(zks, network_interface=network_interface,
+ log_dir=log_dir)
hookenv.status_set('active', 'ready')
@@ -90,3 +98,39 @@ def serve_client(client, zookeeper):
client.send_port(kafka_port)
client.send_zookeepers(zookeeper.zookeepers())
hookenv.log('Sent Kafka configuration to client')
+
+
+@hook('logs-storage-attached')
+def storage_attach():
+ storageids = hookenv.storage_list('logs')
+ if not storageids:
+ hookenv.status_set('blocked', 'cannot locate attached storage')
+ return
+ storageid = storageids[0]
+
+ mount = hookenv.storage_get('location', storageid)
+ if not mount:
+ hookenv.status_set('blocked', 'cannot locate attached storage mount')
+ return
+
+ log_dir = os.path.join(mount, "logs")
+ unitdata.kv().set('kafka.storage.log_dir', log_dir)
+ hookenv.log('Kafka logs storage attached at {}'.format(log_dir))
+ # Stop Kafka; removing the kafka.started state will trigger a reconfigure
if/when it's ready
+ kafka = Kafka()
+ kafka.close_ports()
+ kafka.stop()
+ remove_state('kafka.started')
+ hookenv.status_set('waiting', 'reconfiguring to use attached storage')
+ set_state('kafka.storage.logs.attached')
+
+
+@hook('logs-storage-detaching')
+def storage_detaching():
+ unitdata.kv().unset('kafka.storage.log_dir')
+ kafka = Kafka()
+ kafka.close_ports()
+ kafka.stop()
+ remove_state('kafka.started')
+ hookenv.status_set('waiting', 'reconfiguring to use temporary storage')
+ remove_state('kafka.storage.logs.attached')