This is an automated email from the ASF dual-hosted git repository.

kwmonroe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bigtop.git


The following commit(s) were added to refs/heads/master by this push:
     new 036c085  BIGTOP-3092: kafka charm: support juju storage
036c085 is described below

commit 036c08542117667488636b050b8efea5cc48c79b
Author: Casey Marshall <[email protected]>
AuthorDate: Sat Sep 22 13:30:37 2018 +0200

    BIGTOP-3092: kafka charm: support juju storage
    
    Closes #400
    
    Signed-off-by: Kevin W Monroe <[email protected]>
---
 .../layer-kafka/lib/charms/layer/bigtop_kafka.py   | 12 ++++-
 .../src/charm/kafka/layer-kafka/metadata.yaml      |  8 ++++
 .../src/charm/kafka/layer-kafka/reactive/kafka.py  | 56 +++++++++++++++++++---
 3 files changed, 68 insertions(+), 8 deletions(-)

diff --git 
a/bigtop-packages/src/charm/kafka/layer-kafka/lib/charms/layer/bigtop_kafka.py 
b/bigtop-packages/src/charm/kafka/layer-kafka/lib/charms/layer/bigtop_kafka.py
index c2f1fc4..9b02a34 100755
--- 
a/bigtop-packages/src/charm/kafka/layer-kafka/lib/charms/layer/bigtop_kafka.py
+++ 
b/bigtop-packages/src/charm/kafka/layer-kafka/lib/charms/layer/bigtop_kafka.py
@@ -14,12 +14,14 @@
 # limitations under the License.
 
 import os
+import shutil
+from subprocess import check_output
+
 from charmhelpers.core import hookenv
 from charmhelpers.core import host
 from jujubigdata import utils
 from charms.layer.apache_bigtop_base import Bigtop
 from charms import layer
-from subprocess import check_output
 
 
 class Kafka(object):
@@ -38,7 +40,7 @@ class Kafka(object):
         for port in self.dist_config.exposed_ports('kafka'):
             hookenv.close_port(port)
 
-    def configure_kafka(self, zk_units, network_interface=None):
+    def configure_kafka(self, zk_units, network_interface=None, log_dir=None):
         # Get ip:port data from our connected zookeepers
         zks = []
         for unit in zk_units:
@@ -54,6 +56,7 @@ class Kafka(object):
             'kafka::server::broker_id': unit_num,
             'kafka::server::port': kafka_port,
             'kafka::server::zookeeper_connection_string': zk_connect,
+            'kafka::server::log_dirs': log_dir,
         }
         if network_interface:
             ip = Bigtop().get_ip_for_interface(network_interface)
@@ -62,6 +65,11 @@ class Kafka(object):
         bigtop = Bigtop()
         bigtop.render_site_yaml(roles=roles, overrides=override)
         bigtop.trigger_puppet()
+
+        if log_dir:
+            os.makedirs(log_dir, mode=0o700, exist_ok=True)
+            shutil.chown(log_dir, user='kafka')
+
         self.set_advertise()
         self.restart()
 
diff --git a/bigtop-packages/src/charm/kafka/layer-kafka/metadata.yaml 
b/bigtop-packages/src/charm/kafka/layer-kafka/metadata.yaml
index d581b35..5094ee2 100644
--- a/bigtop-packages/src/charm/kafka/layer-kafka/metadata.yaml
+++ b/bigtop-packages/src/charm/kafka/layer-kafka/metadata.yaml
@@ -13,3 +13,11 @@ provides:
 requires:
   zookeeper:
     interface: zookeeper
+storage:
+  logs:
+    type: filesystem
+    description: Directory where log files will be stored
+    minimum-size: 20M
+    location: /srv/kafka
+    multiple:
+      range: "0-1"
diff --git a/bigtop-packages/src/charm/kafka/layer-kafka/reactive/kafka.py 
b/bigtop-packages/src/charm/kafka/layer-kafka/reactive/kafka.py
index 97f96d6..92cd58b 100644
--- a/bigtop-packages/src/charm/kafka/layer-kafka/reactive/kafka.py
+++ b/bigtop-packages/src/charm/kafka/layer-kafka/reactive/kafka.py
@@ -13,10 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from charmhelpers.core import hookenv
+import os
+
+from charmhelpers.core import hookenv, unitdata
 from charms.layer.apache_bigtop_base import get_layer_opts, get_package_version
 from charms.layer.bigtop_kafka import Kafka
-from charms.reactive import set_state, remove_state, when, when_not
+from charms.reactive import set_state, remove_state, when, when_not, hook
 from charms.reactive.helpers import data_changed
 
 
@@ -38,9 +40,11 @@ def configure_kafka(zk):
     hookenv.status_set('maintenance', 'setting up kafka')
     data_changed(  # Prime data changed for network interface
         'kafka.network_interface', hookenv.config().get('network_interface'))
+    log_dir = unitdata.kv().get('kafka.storage.log_dir')
+    data_changed('kafka.storage.log_dir', log_dir)
     kafka = Kafka()
     zks = zk.zookeepers()
-    kafka.configure_kafka(zks)
+    kafka.configure_kafka(zks, log_dir=log_dir)
     kafka.open_ports()
     set_state('kafka.started')
     hookenv.status_set('active', 'ready')
@@ -62,14 +66,18 @@ def configure_kafka_zookeepers(zk):
     """
     zks = zk.zookeepers()
     network_interface = hookenv.config().get('network_interface')
-    if not data_changed('zookeepers', zks) and not data_changed(
-            'kafka.network_interface', network_interface):
+    log_dir = unitdata.kv().get('kafka.storage.log_dir')
+    if not(any((
+            data_changed('zookeepers', zks),
+            data_changed('kafka.network_interface', network_interface),
+            data_changed('kafka.storage.log_dir', log_dir)))):
         return
 
     hookenv.log('Checking Zookeeper configuration')
     hookenv.status_set('maintenance', 'updating zookeeper instances')
     kafka = Kafka()
-    kafka.configure_kafka(zks, network_interface)
+    kafka.configure_kafka(zks, network_interface=network_interface,
+                          log_dir=log_dir)
     hookenv.status_set('active', 'ready')
 
 
@@ -90,3 +98,39 @@ def serve_client(client, zookeeper):
     client.send_port(kafka_port)
     client.send_zookeepers(zookeeper.zookeepers())
     hookenv.log('Sent Kafka configuration to client')
+
+
+@hook('logs-storage-attached')
+def storage_attach():
+    storageids = hookenv.storage_list('logs')
+    if not storageids:
+        hookenv.status_set('blocked', 'cannot locate attached storage')
+        return
+    storageid = storageids[0]
+
+    mount = hookenv.storage_get('location', storageid)
+    if not mount:
+        hookenv.status_set('blocked', 'cannot locate attached storage mount')
+        return
+
+    log_dir = os.path.join(mount, "logs")
+    unitdata.kv().set('kafka.storage.log_dir', log_dir)
+    hookenv.log('Kafka logs storage attached at {}'.format(log_dir))
+    # Stop Kafka; removing the kafka.started state will trigger a reconfigure 
if/when it's ready
+    kafka = Kafka()
+    kafka.close_ports()
+    kafka.stop()
+    remove_state('kafka.started')
+    hookenv.status_set('waiting', 'reconfiguring to use attached storage')
+    set_state('kafka.storage.logs.attached')
+
+
+@hook('logs-storage-detaching')
+def storage_detaching():
+    unitdata.kv().unset('kafka.storage.log_dir')
+    kafka = Kafka()
+    kafka.close_ports()
+    kafka.stop()
+    remove_state('kafka.started')
+    hookenv.status_set('waiting', 'reconfiguring to use temporary storage')
+    remove_state('kafka.storage.logs.attached')

Reply via email to