AMBARI-18862. KAFKA broker start failed during restart stale config services 
after updating log directory.(vbrodetskyi)

(cherry picked from commit 494795607c52a373db68099b472e5ede707d1794)

Change-Id: Ide509bc7ad95a76b680e999607b7b33a31a985c8


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/092c762f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/092c762f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/092c762f

Branch: refs/heads/AMBARI-2.4.2.16
Commit: 092c762f8a14dbc8249a904f0b2a7354e4aee8d4
Parents: 8243b9a
Author: Vitaly Brodetskyi <[email protected]>
Authored: Fri Nov 11 06:48:43 2016 +0200
Committer: Vitaly Brodetskyi <[email protected]>
Committed: Fri Nov 11 21:00:16 2016 +0000

----------------------------------------------------------------------
 .../KAFKA/0.8.1/package/scripts/kafka.py        | 26 ++++++++++++++------
 1 file changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/092c762f/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
index ac7b0ae..6b4579c 100644
--- 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
@@ -65,15 +65,15 @@ def kafka(upgrade_type=None):
         kafka_server_config['listeners'] = listeners.replace("6667", port)
         Logger.info(format("Kafka listeners after the port update: 
{listeners}"))
         del kafka_server_config['port']
-      
-      
+
+
     if effective_version is not None and effective_version != "" and \
       check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, 
effective_version):
       if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts:
         brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
         kafka_server_config['broker.id'] = brokerid
         Logger.info(format("Calculating broker.id as {brokerid}"))
-      
+
     # listeners and advertised.listeners are only added in 2.3.0.0 onwards.
     if effective_version is not None and effective_version != "" and \
        check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version):
@@ -215,9 +215,15 @@ def setup_symlink(kafka_managed_dir, 
kafka_ambari_managed_dir):
   if backup_folder_path:
     # Restore backed up files to current relevant dirs if needed - will be 
triggered only when changing to/from default path;
     for file in os.listdir(backup_folder_path):
-      File(os.path.join(kafka_managed_dir,file),
-           owner=params.kafka_user,
-           content = StaticFile(os.path.join(backup_folder_path,file)))
+      if os.path.isdir(os.path.join(backup_folder_path, file)):
+        Execute(('cp', '-r', os.path.join(backup_folder_path, file), 
kafka_managed_dir),
+                sudo=True)
+        Execute(("chown", "-R", format("{kafka_user}:{user_group}"), 
os.path.join(kafka_managed_dir, file)),
+                sudo=True)
+      else:
+        File(os.path.join(kafka_managed_dir,file),
+             owner=params.kafka_user,
+             content = StaticFile(os.path.join(backup_folder_path,file)))
 
     # Clean up backed up folder
     Directory(backup_folder_path,
@@ -239,7 +245,13 @@ def backup_dir_contents(dir_path, backup_folder_suffix):
   )
   # Safely copy top-level contents to backup folder
   for file in os.listdir(dir_path):
-    File(os.path.join(backup_destination_path, file),
+    if os.path.isdir(os.path.join(dir_path, file)):
+      Execute(('cp', '-r', os.path.join(dir_path, file), 
backup_destination_path),
+              sudo=True)
+      Execute(("chown", "-R", format("{kafka_user}:{user_group}"), 
os.path.join(backup_destination_path, file)),
+              sudo=True)
+    else:
+      File(os.path.join(backup_destination_path, file),
          owner=params.kafka_user,
          content = StaticFile(os.path.join(dir_path,file)))
 

Reply via email to